На новом проекте, на котором я работаю в качестве PHP Tech Lead. Команда столкнулась с вопросом наведения разного род:
единый тип реляционной базы данных
единый брокер сообщений
единый фреймворк в разрезе языка программирования
etc.
В этой статье пойдет речь о выборе брокера сообщений.
Kafka на данный момент уже используется в компании и эта технология является более масштабируемой чем тот же RabbitMQ, поэтому мы и смотрим в ее сторону. Перед тем как начать внедрять Kafka в наши проекты, мы протестируем закрывает ли эта технология наши потребности.
Предъявляемые требования к брокеру сообщений:
Совместимость с PHP
Стабильная работа, отказоустойчивость, высокий SLA(>99,95%)
Масштабируемость
Быстрая скорость отправки сообщения в очередь
Уверенность в доставке сообщения
Уверенность в получении и обработке сообщения(ack/nack)
По сути это все все и так есть и отлично работает(на малых и средних объемах) у RabbitMQ.
Но как я уже писал выше - мы не хотим плодить зоопарк технологий.
1. Выбор клиента для PHP
На официальном сайте Kafka есть несколько ссылок на репозиториев интеграции.
https://github.com/EVODelavega/phpkafka
https://github.com/arnaud-lb/php-rdkafka
https://github.com/quipo/kafka-php
https://github.com/michal-harish/kafka-php
Только 1н репозиторий "живой" на текущий момент и имеет большое количество звезд на GitHub - arnaud-lb/php-rdkafka.
2. Сетап окружения
В 21м году Docker для разработчика это как кнут для Индианы Джонс.
Я зашел на hub.docker.com и нашел самый популярный официальный образ kafka.
Дальше поиск в гугл готового примера docker-compose файла с PHP, возможно даже с kafka. И о сюрприз - первая строка выдачи поискового результата - Phillaf/php-kafka-demo. Тут уже присутствует образ из первого пункта.
git clone <https://github.com/Phillaf/php-kafka-demo>
После клонирования репозитория, запускаем:
docker-compose up -d
С первого раза конечно же проект не собрался(как минимум у меня на MacOs BigSur). Для того чтобы испраить ошибку было достаточно просто убрать чётко заданую версию Kafka.
Все готово для работы приожения, но а как же без доп. бонуса? Для того чтобы лучше понимать что происходит с неизвестным мне инструментом, я решил сразу же поискать визуальный менеджер. Было несколько платных версий которые нужно устанавливать на ПК. Но это же не трушно. Поэтому я установил opensource admin panel
Для этого добавляем несколько строк в наш docker-compose.yml
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8080:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
- KAFKA_CLUSTERS_0_READONLY=false
На старт, внимание, пуск....
docker-compose up -d
Go to http://localhost - Добавляет сообщения в нашу очередь(топик).
Go to http://localhost:8080 - откроет нашу admin panel - где мы увидим наш топик и первые сообщения в нем.
заходим в контейнер (лично я предпочитаю пользоватьтся docker в phpStorm), но можно и черз GUI docker так и с помощью консоли(не буду тут на этом останавливаться, думаю что тем кому уже интересна Kafka - точно умеют смогут зайти в контейнер Docker) и запускаем consumer
php ./public/consumer_low.php
В консоли мы увидим наши сообщения.
Поздравляю вас теперь вы адепты Kafka.
3. Настраиваем конфигурацию под наши требования
Как я писал выше, нас интересует полный контроль над сообщениями, наше приложение хочет получать сообщение ровно 1н раз, не больше и не меньше. При этом мы хотим быть уверенными что если в процессе обработки сообщения консюмер внезапно завершит свою работу(в следствии деплоя, возникновения ошибки или падения сервера, ...) то после перезапуска - это сообщение будет обработано, а не канет в небытие.
Для этого нам потребуется отключить автоматическую синхронизацию кафки о сохраненном оффсете.
$conf->set('enable.auto.commit', 'false');
Для того чтобы севрвер мог сохранять offset мы должны назначить consumer'у уникальный идентификатор чтобы при переподключении кафка знала на каком месте мы остановлись.
$conf->set('group.id', 'group_1');
Начинаем прослушивать очередь:
$topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);
1й параметр - номер партиции для чтения(по дефолту топик имеет 1у партицию и значение переменной будет = 0). 2й параметр - число(int) - offset c которого стоит начать читать сообщения. Существует 3 предустановленных режима:
RD_KAFKA_OFFSET_BEGINNING - начать чтение с первого сообщения в партиции
RD_KAFKA_OFFSET_END - читать только новые сообщения, которые появятся в очереди только после подключения консюмера
RD_KAFKA_OFFSET_STORED - читать с того места где остановились в прошлый раз(этот флаг возможно использовать только в случае указания - 'group.id')
Получение сообщения из очереди
$msg = $topic->consume($partition, 1000);
После успешной обработки сообщения(сохранения в БД или тп) помечаем сообщение как прочитанное(инкрементируем оффсет)
$topic->offsetStore($partition, $msg->offset);
Вуаля, мы реализовали консистентную работу c Apache Kafka.
Ссылка на репозиторий.
Буду рад вашим комментариям.