Symfony Messenger: объединение сообщений в пакеты

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Иногда требуется сделать так, чтобы сообщения в Symfony Messenger отправлялись потребителю пакетами, а не поодиночке. Недавно нам потребовалось отправлять через Messenger обновленные строки текста из наших программ поставщику переводческих услуг.

Но из-за жесткого ограничения на интенсивность передачи данных со стороны переводческой фирмы мы не можем отправлять сообщения по одному. Следовательно, необходимо реализовать следующий алгоритм отправки: сперва сохранять все полученные сообщения, предназначенные данному потребителю, а затем отправлять все сообщения, если время ожидания новых сообщений превысило десять секунд или если сохранено более 100 сообщений.

Покажем, как мы это сделали:

// Symfony Messenger Message:
class TranslationUpdate
{
    public function __construct(
        public string $locale,
        public string $key,
        public string $value,
    ) {
    }
}
class TranslationUpdateHandler implements MessageHandlerInterface
{
    private const BUFFER_TIMER = 10; // in seconds
    private const BUFFER_LIMIT = 100;
    private array $buffer = [];

    public function __construct(
        private MessageBusInterface $messageBus,
    ) {
        pcntl_async_signals(true);
        pcntl_signal(SIGALRM, \Closure::fromCallable([$this, 'batchBuffer']));
    }

    public function __invoke(TranslationUpdate $message): void
    {
        $this->buffer[] = $message;

        if (\count($this->buffer) >= self::BUFFER_LIMIT) {
            $this->batchBuffer();
        } else {
            pcntl_alarm(self::BUFFER_TIMER);
        }
    }

    private function batchBuffer(): void
    {
        if (0 === \count($this->buffer)) {
            return;
        }

        $translationBatch = new TranslationBatch($this->buffer);
        $this->messageBus->dispatch($translationBatch);
        $this->buffer = [];

    }
}

Здесь мы имеем дело с сообщением Messenger, которое отправляется каждый раз, когда у нас появляется обновленный текст на перевод (тот же принцип можно применить и к любым другим сообщениям).

Наш обработчик сообщений будет получать все сообщения и помещать их в буфер, представляющий собой массив. Если количество элементов в буфере достигает 100 или если не появляется новых элементов в течение десяти секунд, срабатывает метод batchBuffer.

Для реализации десятисекундного таймера мы используем функцию pcntl_alarm, которая позволяет асинхронно вызывать метод batchBuffer по мере необходимости.

Для обработки системных сигналов в нашем PHP-коде мы используем функции PCNTL (прочитать о них подробнее можно в документации PHP, а также в нашем блоге, если владеете французским). Мы установили таймер, который будет посылать процессу сигнал SIGALRM через заданное количество секунд. Затем, когда сигнал будет принят процессом, запустится функция обратного вызова, которую мы указали в качестве второго аргумента pcntl_signal. Обратный вызов установлен для всего приложения, поэтому мы можем использовать этот трюк с объединением сообщений в пакеты только один раз.

Затем в методе batchBuffer мы используем новую передачу в Messenger (см. вызов dispatch), чтобы отслеживать сообщения на случай возникновения проблем, а поскольку метод реализован через PCNTL, компонент Messenger не будет повторять попытку обработки при исключении.

class TranslationBatch
{
    /**
     * @param TranslationUpdate[] $notifications
     */
    public function __construct(
        private array $notifications,
    ) {
    }
}
class TranslationBatchHandler implements MessageHandlerInterface
{
    public function __invoke(TranslationBatch $message): void
    {
      // handle all our messages
    }
}

Итак, теперь у нас есть обработчик пакетов, который всегда будет получать список сообщений для отправки. С его помощью мы можем легко объединять наши сообщения Messenger в пакеты, не прибегая к использованию cron.

Дополнение. Этот подход — всего лишь доказательство концепции. Если вы хотите применить его в рабочей среде, рекомендую использовать более устойчивое хранилище для реализации буфера, такое как Redis.


Перевод материала подготовлен в рамках курса "Symfony Framework". Всех желающих приглашаем на двухдневный онлайн-интенсив «Создание системы статистики для онлайн-магазина». На интенсиве мы:
— начнем знакомство с Symfony и ClickHouse (если точнее, то построим систему сбора статистики в ClickHouse). На базе подобной системы в будущем вы сможете строить и развивать решения Business Intelligence-систем и операционной статистики,
— затем развернем API,
— и с его помощью посмотрим на инструменты самой статистики.
Регистрация на первый день здесь.

Источник: https://habr.com/ru/company/otus/blog/560496/


Интересные статьи

Интересные статьи

Пошаговое руководство о том, как в TypeScript написать такой generic-тип, который рекурсивно объединяет произвольные вложенные key-value структуры данных. Это может оказа...
На работе я занимаюсь поддержкой пользователей и обслуживанием коробочной версии CRM Битрикс24, в том числе и написанием бизнес-процессов. Нужно отметить, что на самом деле я не «чист...
Задача При разработке нашей игры The Unliving, мы поставили перед собой задачу по отображению различных сообщений, таких, как нанесенный урон, нехватка здоровья или энергии, величина награды...
Добрый день, в данной статье я покажу как развернуть Symfony 4 приложение на AWS. В официальной документации есть пример подобного процесса, однако мой вариант не столь тривиален, как загрузка zi...
Эта статья посвящена одному из способов сделать в 1с-Битрикс форму в всплывающем окне. Достоинства метода: - можно использовать любые формы 1с-Битрикс, которые выводятся компонентом. Например, добавле...