Данная статья будет полезна тем, кто столкнулся с проблемой интеграции kafka и rabbitmq. Материал не претендует на подробный туториал, но поможет помочь настроить рабочий процесс. Я расскажу, как отправить сообщение в rabbitmq и получить его на стороне kafka, а также обратный процесс, с которым, спойлер, всё оказалось сложнее.
Путешествие «туда и обратно».
Зачем?
Ситуация максимально банальна и проста. Когда разные команды, делают свои проекты с использованием разных message broker’ов, рано или поздно наступает момент, когда появляется необходимость интеграции (и да, я в курсе, что kafka - нечто большее, чем просто mb). Первое, что приходит на ум – это сделать интеграцию через REST, но мы получаем сразу несколько значительных минусов. Например, long pulling. Как поставить на расчет долгий процесс? Как дождаться ответа? Итог: реализации на REST очень далеки от идеала. Внедрение REST так же повлечет танцы с бубном, так как текущая архитектура уже заточена под message broker.
Следующей технологией для интеграции можно рассмотреть grpc. Хороший вариант, но тоже имеет изъян, так как не очень хорошо работает с асинхронными запросами, а нас интересовали именно длительные запросы.
Итак, мы пришли к решению использовать брокер сообщений, но тут появилась главная проблема: одна команда использовала rabbitmq, а другая apache kafka. Первой мыслью было выбрать одну технологию и использовать ее, но оценив трудозатраты по переписыванию pipleline взаимодействия в любой из систем решено было искать альтернативные варианты. Да, конечно, apache kafka – это нечто большее, чем просто брокер сообщений, но в данной ситуации нам требовалась именно эта его функция.
Просматривая интернеты, я наткнулся на интересную статью, в которой описывалась именно та ситуация, в которую я попал. Воодушевленный находкой, я подумал, вот она, «моя прелесть», начал изучать туториал и пытаться развернуть сервис локально.
Был собран и использован контейнер
from kafka-connect:6.2.0
Далее во всех туториалах была обозначена schema-registry
schema-registry:
image: confluentinc/cp-schema-registry:6.2.0
container_name: schema-registry
ports:
- 8081:8081
hostname: schema-registry
depends_on:
- kafka
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://server-name.local:9092
SCHEMA_REGISTRY_CUB_KAFKA_TIMEOUT: 300
Ее я тоже поднял, но в дальнейшем она оказалась для меня бесполезной, ее предназначение – это использование дополнительных сериализаторов, меня полностью устроили дефолтные.
Из kafka в rabbitmq
Следующим этапом была настройка коннекторов. Для успешного создания коннектора нужно сначала подготовить kafka и rabbitmq.
На стороне kafka создаем топик kafka_result. Со стороны rabbitmq нужно создать очередь kafka_to_rabbit, еще понадобиться exchange kafka_to_rabbit_exch и, конечно, выполнить binding (операцию привязки очереди к exchange) с ключом kafka-connectors.
Kafka, в отличие от rabbitmq, не удаляет сообщение из топиков, и для контроля прочитанных сообщений нам необходимы конфигурационные топики, которые наш сервис создаст самостоятельно, нам остается только при старте в окружение добавить названия этих топиков. В некоторых ситуациях создание топиков запрещено извне, поэтому придется обратиться к администраторам kafka.
В нашем случае это
CONNECT_OFFSET_STORAGE_TOPIC _kafka-connect-group-01-offsets
CONNECT_STATUS_STORAGE_TOPIC _kafka-connect-group-01-status
CONNECT_CONFIG_STORAGE_TOPIC _kafka-connect-group-01-configs
После чего, запускаем наш сервис и выполняем POST запрос.
https://service_addres/connectors
{
"name": "kafka-to-rabbit",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector",
"tasks.max" : "1",
"topics" : "kafka_result",
"rabbitmq.queue" : "kafka_to_rabbit",
"rabbitmq.username": "guest",
"rabbitmq.password": "guest",
"rabbitmq.host": "host-rabbitmq",
"rabbitmq.port": "5672",
"rabbitmq.exchange": "kafka-to-rabbit-exch",
"rabbitmq.routing.key": "kafka-connectors",
"rabbitmq.delivery.mode": "PERSISTENT",
"confluent.topic.bootstrap.servers":"addres_kafka_server:9092",
"rabbitmq.virtual.host": "/",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
В случае удачного выполнения запроса, вернется 201 ответ. Проверить создавшийся коннектор можно в браузере по тому же адресу (или через get запрос)
https://service_addres/connectors
Можно увидеть массив коннекторов.
Отлично, если все получилось. Отправляем сообщение в топик kafka и получаем это сообщение на стороне rabbitmq, при чем конфигурационные топики будут контролировать, чтобы нам не приходило дубликатов, и сообщения не потерялись.
Прекрасно, но есть один нюанс. Почти все англоязычные туториалы на этом заканчиваются, как будто всех интересует только перепушивание сообщений из kafka rabbitmq.
А обратно то как? Как перепушить сообщение из rabbitmq в kafka?
Из rabbitmq в kafka
Вернемся к созданию нашего коннектора. Там есть строчка.
"connector.class" : "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector"
RabbitMQSinkConnector – отвечает за перепушивание из rabbitmq в kafka.
Теперь нам нужен RabbitMQSourceConnector. Пытаемся создать новый коннектор и получаем ошибку, что такой коннектор не найден в нашем контейнере. Неприятно.
Решение проблемы заключается в том, что при сборке нашего контейнера нужно дополнительно поставить плагин. Исправляем наш докер файл и добавляем туда строчкуconfluent-hub install --no-prompt --verbose confluentinc/kafka-connect-rabbitmq:1.5.2
Пересобираем контейнер и пробуем создать обратный коннектор
{
"name": " rabbit-to-kafka",
"config": {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"kafka.topic" : " kafka_task",
"rabbitmq.queue" : "rabbitmq_to_kafka",
"rabbitmq.username": "guest",
"rabbitmq.password": "guest",
"rabbitmq.host": "hostname",
"rabbitmq.port": "5672",
"rabbitmq.virtual.host": "/",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"kafka_host:9092",
"confluent.topic.replication.factor":1,
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Выполняем POST запрос и дожидаемся 201 ответа, проверяем коннекторы и видим успешно создавшийся
Отлично, теперь визуализируем нашу схему взаимодействия rabbitmq с kafka.
1. Отправляем сообщение в очередь rabbit_to_kafka и получаем его в топике kafka_task
2. Отправляем сообщение в топик kafka_result и получаем сообщение в очереди kafka_to_rabbit
В данной статье описано двусторонне взаимодействие, из rabbitMQ в kafka и обратно, в большинстве примеров найденных в сети описаны ситуации одностороннего взаимодействия, я потратил значительное время чтобы соединить все в одно.