Превращение событий PostgreSQL в события Laravel

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Краткая аннотация

В прошлый раз я описал, как можно поставить задачу (Job) в очередь Laravel из хранимой процедуры или триггера PostgreSQL.

В этой статье я расскажу, как можно преобразовать события, возникающие в PostgreSQL, в события Laravel.

Рабочий пример выложен на GitHub.

Вместо введения

Зачастую требуется знать, что происходит с данными в базе и оперативно реагировать на это.

Самый простой способ получить информацию об изменениях — регулярный опрос базы данных, с целью проверить, существуют ли изменения, на которые следует реагировать. И зачастую разработчики именно так и поступают. Это не самый эффективный метод, поскольку он может привести к излишней нагрузке на сервер и требует изменения существующих или создания новых таблиц.

Но PostgreSQL может оповещать клиентов через механизм под названием «LISTEN/NOTIFY». Документация находится здесь и здесь.

У этого механизма есть недостатки:

  • Требует долгосрочного открытого соединения с базой данных, что может быть непрактично.

  • Система уведомлений не гарантирует доставку или соблюдения порядка сообщений, не следует использовать эту технологию в качестве полнофункциональной очереди сообщений. «LISTEN/NOTIFY» следует использовать только для лёгкого взаимодействия между процессами.

  • Размер сообщения ограничен размером строки (8192 байта в PostgreSQL 13).

Для более глубокого понимания этого механизма можно обратиться к этому материалу.

Я же просто покажу, как создать команду Artisan в чем-то похожую на команду queue:work, которая будет делать своё дело и лучше в связке с Supervisor.

«Щупаем» технологию

Для быстрого ознакомления я создам docker-контейнер:

docker run -d -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test \
-p 5433:5432 --name pgsql postgres

Обратите внимание, что я использую порт 5433, потому, что у меня на родном порту крутится «стационарная» СУБД.

Теперь запускаем три терминала, подав во всех команду:

docker exec -it pgsql psql -U test

В первом и втором терминале подаём команду:

LISTEN my_event;

Эта команда «подписывает» клиента на уведомления от сервера PostgreSQL через канал с именем my_event. Если какой-либо другой клиент выполняет команду NOTIFY my_event, то все клиенты, которые выполнили команду LISTEN my_event, получат уведомление.

В третьем терминале подаём команду:

NOTIFY my_event, 'Hello, PostgreSQL!';

Затем в первом и втором терминале ещё раз подаём команду:

LISTEN my_event;

Здесь у меня есть некоторое недопонимание. Я считал, что первый и второй терминалы должны получать оповещение автоматически, но они почему-то требуют повторного вызова «LISTEN my_event».

Можете поиграться, и убедиться, что без подписки получить оповещение невозможно, также, как и прочитать его два раза.

Работает!
Работает!

Два первых терминала можно закрыть. Третий можно оставить для опытов.

Создаём приложение Laravel

composer create-project laravel/laravel listen_notify
cd listen_notify
sudo chown -R $USER:www-data storage
sudo chown -R $USER:www-data bootstrap/cache
chmod -R 775 storage
chmod -R 775 bootstrap/cache

Настройки соединения с базой данных в .env

DB_CONNECTION=pgsql
DB_HOST=127.0.0.1
DB_PORT=5433
DB_DATABASE=test
DB_USERNAME=test
DB_PASSWORD=test

Удалите все существующие миграции. Они нам не понадобятся.

Теперь создадим заготовки для команды artisan, event и listener, слушающий этот event

php atrisan make:command ListenNotifyCommand
php artisan make:event PostgresNotificationReceived
php artisan make:listener LogPostgresNotification --event=PostgresNotificationReceived

Настроим EventServiceProvider, добавим в массив $listen следующее значение:

    protected $listen = [
        Registered::class => [
            SendEmailVerificationNotification::class,
        ],
        // Добавленное значение
        PostgresNotificationReceived::class => [
            LogPostgresNotification::class,
        ]
    ];

Каркас приложения готов

Создаём слушателя событий Laravel

Не буду придумывать сложной логики. Просто запишу полученные данные в лог-файл

Содержимое файла app/Listeners/LogPostgresNotification.php
<?php

namespace App\Listeners;

use App\Events\PostgresNotificationReceived;
use Illuminate\Support\Facades\Log;

class LogPostgresNotification
{

    public function handle(PostgresNotificationReceived $event): void
    {
        Log::info('Received Postgres notification: ', $event->notification);
    }
}

Создаём событие Laravel

Тоже не буду мудрствовать, просто передам payload через конструктор в event

Содержимое файла app/Events/PostgresNotificationReceived.php
<?php

namespace App\Events;

use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class PostgresNotificationReceived
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public function __construct(public array $notification)
    {

    }

}

Пишем логику команды listen:notify

Пойдём от простого к сложному. Для начала просто получим сообщение от PostgreSQL.

Для этого дорабатываем метод handle, не забывая доработать поля $signature и $description

Содержимое файла app/Console/Commands/ListenNotifyCommand.php
<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PDO;

class ListenNotifyCommand extends Command
{
    protected $signature = 'listen:notify';

    protected $description = 'Listen to PostgreSQL notify events';

    public function handle(): void
    {
        $pdo = DB::connection()->getPdo();

        // Listen to the 'my_channel' notifications
        $pdo->exec("LISTEN my_event");
        $this->info('Starting');
        // Forever loop
        while (true) {
            $notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);

            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
            }
        }
    }
}

Проверяем, работает ли наша команда. Запускаем в терминале команду: php artisan listen:notify и в третьем терминале, (мы его не закрывали) вновь подаём NOTIFY my_event, 'Hello, PostgreSQL!';

Отлично! Работает!
Отлично! Работает!

Начало положено. Приложение на Laravel получило событие из PostgreSQL.

При изменении кода команды, не забывайте перезапускать процесс.

Добавляем обработку сигналов

Немного о сигналах, гуру могут пропустить

Сигналы являются частью стандартов POSIX и служат для асинхронного уведомления процесса о каком-либо событии в операционных системах Unix и похожих на них, например, Linux. Приложения в этих системах могут обрабатывать поступающие сигналы, например, на остановку процесса (SIGTERM, SIGINT), перезагрузку (SIGHUP) и т.д.

Windows не поддерживает POSIX сигналы. Он использует собственные механизмы для управления процессами и потоками, включая функции Windows API для отправки и обработки сигналов управления, таких как Ctrl+C.

Однако в Windows есть некоторые среды, такие как Windows Subsystem for Linux (WSL), которые предоставляют совместимость со стандартами POSIX и поддерживают POSIX-сигналы.

В контексте PHP и командной строки нам интересны следующие сигналы:

  • SIGINT (сигнал прерывания). Этот сигнал обычно посылается при нажатии Ctrl+C в терминале. Он сообщает процессу о необходимости остановиться.

  • SIGTERM (закончить выполнение). Это стандартный сигнал для остановки процесса в Unix. Программы могут перехватить этот сигнал и выполнить любую необходимую работу перед завершением. Если программа не перехватывает SIGTERM, она завершится немедленно.

  • SIGKILL (убить процесс немедленно). Этот сигнал нельзя перехватить или игнорировать. Когда процесс получает SIGKILL, он немедленно останавливается.

Добавляем два поля ($hasPcntl, $running) типа bool в класс ListenNotifyCommand и инициализируем их. Пишем метод — обработчик сигналов

    protected bool $hasPcntl = false;
    protected bool $running = true;

    private function handleSignal(int $signal): void
    {
        switch ($signal) {
            case SIGINT:
            case SIGTERM:
                $this->info( PHP_EOL . 'Received stop signal, shutting down...');
                $this->running = false;
                break;

            default:
        }
    }

Для обработки сигналов потребуется расширение pcntl. Данное расширение недоступно для Windows, тем не менее, вполне возможно написать кроссплатформенное решение.

Дорабатываем метод handle

    public function handle(): int
    {
        // Проверка, что модуль pcntl подключён
        $this->hasPcntl = extension_loaded('pcntl');

        if ($this->hasPcntl) {
            // Если модуль pcntl подключён, назначаем обработчики сигналов
            pcntl_signal(SIGINT, [$this, 'handleSignal']);
            pcntl_signal(SIGTERM, [$this, 'handleSignal']);
        }

        $pdo = DB::connection()->getPdo();
        $pdo->exec("LISTEN my_event");
        $this->info('Start listening');

        while ($this->running) {
            $notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
            $this->info('iter');
            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
            }

            if ($this->hasPcntl) {
                // Если модуль pcntl подключён, вызываем обработчики сигналов
                pcntl_signal_dispatch();
            }
        }
        // Возвращаем 0, как код завершения
        return 0;
    }
Файл app/Console/Commands/ListenNotifyCommand.php
<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PDO;

class ListenNotifyCommand extends Command
{
    protected $signature = 'listen:notify';

    protected $description = 'Listen to PostgreSQL notify events';

    protected bool $hasPcntl = false;
    protected bool $running = true;

    public function handle(): int
    {
        $this->hasPcntl = extension_loaded('pcntl');

        if ($this->hasPcntl) {
            pcntl_signal(SIGINT, [$this, 'handleSignal']);
            pcntl_signal(SIGTERM, [$this, 'handleSignal']);
        }

        $pdo = DB::connection()->getPdo();
        $pdo->exec("LISTEN my_event");
        $this->info('Start listening');

        while ($this->running) {
            $notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
            $this->info('iter');
            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
            }

            if ($this->hasPcntl) {
                pcntl_signal_dispatch();
            }
        }
        return 0;
    }

    private function handleSignal(int $signal): void
    {
        switch ($signal) {
            case SIGINT:
            case SIGTERM:
                $this->info( PHP_EOL . 'Received stop signal, shutting down...');
                $this->running = false;
                break;

            default:
        }
    }

}

Можете запустить команду, а также вызвать в соседней консоли NOTIFY, всё должно работать. Если команда запущена в Linux и модуль pcntl подключён, то при нажатии Ctrl+C будет выведено сообщение: Received stop signal, shutting down... Это значит, что скрипт корректно обрабатывает сигналы и останавливается, вместо того, чтобы быть принудительно остановленным.

Настраиваем Supervisor для наблюдения за скриптом

Supervisor — это удобный инструмент для управления фоновыми процессами в операционных системах, похожих на Unix. Он отслеживает работу скрипта, автоматически перезапускает его при сбое и даёт возможность управлять его состоянием, такими как запуск, остановка и перезагрузка.

Supervisor также совместим с сигналами Unix, что дает возможность настраивать поведение процесса в зависимости от разных сигналов. Mы настроили скрипт на обработку сигналов SIGINT и SIGTERM, чтобы правильно завершить его, что хорошо согласуется с Supervisor.

Когда Supervisor отправляет сигнал SIGTERM процессу, он ожидает, что процесс завершит свою работу и передаст управление обратно системе.

Если процесс успешно обработал SIGTERM и корректно завершил свою работу, обычно возвращается код завершения 0.

Если процесс не вернул управление за разумное время (по умолчанию 10 секунд), Supervisor посылает SIGKILL. Это время можно изменить в настройках, опция stopwaitsecs.

Примерный файл конфигурации Supervisor

[program:postrgres_laravel]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/your/laravel/artisan listen:notify
autostart=true
autorestart=true
user=www-data
numprocs=1
redirect_stderr=true
stdout_logfile=/var/log/postrgres_laravel.log

Подробнее про Supervisor в документации Laravel.

Сериализуем полезную нагрузку

Аргумент для NOTIFY — всегда строка. Т.е. если мы хотим передать что-либо cложное, нам нужно это сложное — сериализовать. PostgreSQL умеет работать с JSON, давайте используем это умение.

Создадим хранимую функцию, которая принимает на вход json, и отправляет его в канал my_event.

Файл миграции 2024_03_05_125805_create_send_notify_function.php
<?php

use Illuminate\Database\Migrations\Migration;

return new class extends Migration {
    public function up(): void
    {
        DB::unprepared('
            CREATE OR REPLACE FUNCTION send_notify(data json) RETURNS VOID AS $$
            BEGIN
                PERFORM pg_notify(\'my_event\', data::text);
            END;
            $$ LANGUAGE plpgsql;
        ');
    }

    public function down(): void
    {
        DB::unprepared('DROP FUNCTION IF EXISTS send_notify(json);');
    }
};

Выполняем миграцию

php artisan migrate

У меня всё прошло успешно. Теперь нужно чуть-чуть доработать handle

            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
                $payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
                $this->info('Decoded payload: ' . print_r($payload, true));
            }

Здесь я выделяю $payload и вывожу её в терминал.

Давайте проверим, всё ли у нас работает. Запустите команду php artisan listen:notify. На этот раз в терминале psql подадим следующую конструкцию:

select send_notify(json_build_object('key1', 'Hello, PostgreSQL!'));

Смотрим в терминал, работает. Давайте передадим что-либо менее тривиальное:

select send_notify(json_build_object('key1', 'Hello, PostgreSQL!', 'key2', json_build_object('key2_inner', 2, 'key3_inner', 3)))
И опять работает!
И опять работает!

Главное, не увлекаться, и помнить об ограничении на 8192 байта.

Сводим воедино

Теперь осталось совсем немного. В нашей команде отправить событие, чтобы его могли слушать все слушатели, которые на него подписаны. Для этого добавляем всего одну строчку:

            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
                $payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
                $this->info('Decoded payload: ' . print_r($payload, true));
                // Новая строка
                Event::dispatch(new PostgresNotificationReceived($payload));
            }
Полный листинг файла app/Console/Commands/ListenNotifyCommand.php
<?php

namespace App\Console\Commands;

use App\Events\PostgresNotificationReceived;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Event;
use PDO;

class ListenNotifyCommand extends Command
{
    protected $signature = 'listen:notify';

    protected $description = 'Listen to PostgreSQL notify events';

    protected bool $hasPcntl = false;
    protected bool $running = true;


    public function handle(): int
    {
        $this->hasPcntl = extension_loaded('pcntl');

        if ($this->hasPcntl) {
            pcntl_signal(SIGINT, [$this, 'handleSignal']);
            pcntl_signal(SIGTERM, [$this, 'handleSignal']);
        }

        $pdo = DB::connection()->getPdo();
        $pdo->exec("LISTEN my_event");
        $this->info('Start listening');

        while ($this->running) {
            $notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);

            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
                $payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
                $this->info('Decoded payload: ' . print_r($payload, true));
                Event::dispatch(new PostgresNotificationReceived($payload));
            }

            if ($this->hasPcntl) {
                pcntl_signal_dispatch();
            }
        }

        return 0;

    }

    private function handleSignal(int $signal): void
    {
        switch ($signal) {
            case SIGINT:
            case SIGTERM:
                $this->info( PHP_EOL . 'Received stop signal, shutting down...');
                $this->running = false;
                break;

            default:
        }
    }

}

Запускаем команду: php artisan listen:notify и подаём в соседнем терминале команду

select send_notify(json_build_object('key1', 'Hello, PostgreSQL!', 'key2', json_build_object('key2_inner', 2, 'key3_inner', 3)));

По нашей задумке, слушатель события пишет полезную нагрузку в лог. Смотрим в лог.

В логе полезная нагрузка
В логе полезная нагрузка

Запускаю виртуальную машину и проверяю, как скрипт отрабатывает сигналы в Linux

Видно, что сигнал отработал и приложение нормально завершилось
Видно, что сигнал отработал и приложение нормально завершилось

Видно, что в ответ на Ctrl+C скрипт выводит сообщение: Received stop signal, shutting down...

Что можно улучшить?

Скорее всего, обработку полученного сообщения, следует тотчас же закинуть, как задачу, в асинхронную очередь Laravel, чтобы обработка сообщения не тормозила «бесконечный» цикл, что может привести к пропуску сообщений или аварийному завершению скрипта процессом Supervisor.

Дальнейшее принятие решений о диспетчеризации событий выполнять в этой задаче.

Заключение

Вы можете использовать функцию send_notify в других хранимых функциях или триггерах PostgreSQL, передавая из триггеров значения переменных TG_TABLE_NAME и TG_OP для принятия решения, что и как обработать. В статье показана всего лишь основа для получения событий из PostgreSQL. Как применять их на практике, зависит только от воображения разработчика.

Рабочее приложение можно найти на GitHub

Источник: https://habr.com/ru/articles/798203/


Интересные статьи

Интересные статьи

В этой статье я поделюсь скриптом для создания бэкапов БД PostgreSQL за определенный период (например: 1, 2, 3 дня, 1 неделя, 1 месяц, 6 месяцев, каждый год).Объясню как запустить скрипт с помощью рас...
Django — гибкий фреймворк для быстрого создания приложений на Python. По умолчанию в качестве базы данных он использует SQLite. Это хорошо работает при небольших нагрузках, однако традиционная система...
В интернете не так много информации о том, как можно разместить приложение .NET на облачном сервисе Heroku, в том числе немного сказано о приложениях с БД. Как без мороки и элегантно разместить ваш .N...
Привет, Хабр! Меня зовут Алексей, я тимлид команды инфраструктурных разработчиков и инженеров PostgreSQL. У нас небольшая команда — всего 6 человек, но при этом довольно большая инфраструктура, на кот...
Продолжаю тему моего коллеги о Keycloak. Кому не нужна вода, а просто пример кода, прыгайте сразу сюда.Keycloak довольно часто используется в качестве решения для управле...