Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
В предыдущей статье мы достаточно подробно рассмотрели архитектуру Apache Kafka, из каких компонентов состоит данное решение, что для чего предназначено. И в этой статье мы рассмотрим процесс развертывания данного решения.
Для запуска Kafka можно использовать решения для управления кластером, такие как ZooKeeper или KRaft. В статье мы рассмотрим работу с ZooKeeper. Zookeper это распределенное приложение для управления кластером, состоящим из большого количества узлов.
Перед установкой Zookeeper не забудьте поставить Java, например вот так
apt -y install openjdk-8-jre
Для установки zookeeper сначала перейдем на официальную страницу данного решения https://zookeeper.apache.org/releases.html и выберем последнюю версию.
Загрузим данную версию с помощью wget
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
Далее создадим каталог и распакуем в него архив:
mkdir /opt/zookeeper
tar zxvf apache-zookeeper-3.8.1.tar.gz -C /opt/zookeeper --strip-components=1
Для работы нам также потребуются дополнительные каталоги
mkdir -p /opt/zookeeper/data /var/log/zookeeper
Далее мы переходим к конфигурированию системы. Для этого нам потребуется файл /opt/zookeeper/conf/zoo.cfg.
Этот файл необходимо наполнить следующим содержимым:
tickTime = 2000
maxSessionTimeout = 50000
syncLimit = 5
initLimit = 300
autopurge.purgeInterval = 1
autopurge.snapRetainCount = 5
snapCount = 200000
clientPort = 2181
maxClientCnxns = 100
4lw.commands.whitelist=stat
dataDir = /opt/zookeeper/data
dataLogDir = /var/log/zookeeper
Мы не будем в этой статье подробно останавливаться на значении каждого из параметров, так как это несколько выходит за рамки статьи.
Далее мы настроим запуск приложения в качестве сервиса. Для этого создадим специальную учетку, настроим ей права и создадим файл юнита в systemd.
useradd -r -c 'Zookeeper service' zookeeper
chown -R zookeeper:zookeeper /opt/zookeeper /var/log/zookeeper
Файл юнита /etc/systemd/system/zookeeper.service будет иметь следующую структуру:
[Unit]
Description=ZooKeeper Service
Documentation=https://zookeeper.apache.org/
Requires=network.target
After=network.target
[Service]
Type=forking
User=zookeeper
Group=zookeeper
WorkingDirectory=/opt/zookeeper
ExecStart=/opt/zookeeper/bin/zkServer.sh start
ExecStop=/opt/zookeeper/bin/zkServer.sh stop
ExecReload=/opt/zookeeperbin/zkServer.sh restart
TimeoutSec=30
Restart=on-failure
[Install]
WantedBy=default.target
Затем нам необходимо просто перезапустить system и запустить zookeper
systemctl daemon-reload
systemctl enable zookeeper --now
Проверим статус zookepeer
systemctl status zookeeper
Базовые настройки Zookeeper мы выполнили. Настройка управления отдельными узлами кластера опять таки не входит в тему данной статьи, поэтому далее мы перейдем к установке Kafka.
Ставим Кафку
Для начала нам потребуется загрузить последнюю версию продукта. Далее распаковываем архив и переходим в созданный каталог.
$ tar -xzf kafka_2.13-3.4.0.tgz
$ cd kafka_2.13-3.4.0
Далее нам необходимо запустить Zookeper с настройками, необходимыми для работы Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
При успешном запуске мы получим примерно такую "портянку".
Далее откроем еще одну SSH сессию в которой собственно запустим Kafka
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
Здесь нас никакой особой псевдографикой не порадовали, так что просто смотрим вывод на предмет ошибок.
Как видно, мы создали базовую конфигурацию для работы Kafka. Для того, чтобы не плодить окна командной строки или SSH сессии я бы посоветовал перезапустить две последние команды в фоновом режиме, т.е. добавить &.
Создаем топик
Напомню из материалов предыдущей статьи, что Kafka - это платформа распределенной потоковой передачи событий, которая позволяет вам читать, записывать, хранить и обрабатывать сообщения на многих узлах.
Для начала в качестве примера создадим топик quickstart-events с помощью команды:
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Давайте посмотрим статистику по созданному топику:
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Как видно, пока наш топик пуст и самое время начать записывать в него сообщения. Для записи сообщений в топик quickstart-events воспользуемся следующей командой:
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
После запуска в интерактивном режиме вводим строки (Ctrl+C для завершения):
Как видите, я не переключил кодировку в первой строке и отправил строку на кириллице. Посмотрим как ее сохранит Kafka.
Для чтения событий из топика quickstart-events применим команду:
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Как видите кириллица нормально сохранилась и прочиталась, это говорит о том, что у нас не должно быть проблем при использовании различных кодировок в сообщениях. Сообщения будут храниться в Kafka столько времени, сколько потребуется. При повторных подключениях мы можем прочитать текущий поток сообщений, а также добавлять новые.
Подключаем источники
Естественно, представленный вариант работы с сообщениями “вручную” на практике мало применим. Гораздо интереснее попробовать в автоматическом режиме прочитать сообщения из источника и поместить их в топик Kafka. В качестве примера мы импортируем данные из файла в раздел Kafka и затем экспортируем данные из раздела Kafka в файл.
Для этого нам потребуется указать в свойствах plugin.path файла config/connect-standalone.properties файл connect-file-3.4.0.jar. Откроем это файл и добавим в него строку
plugin.path=libs/connect-file-3.4.0.jar
Собственно все, далее создаем текстовый файл с сообщениями, которые мы будем читать
echo -e "event1\nevent2\nevent3" > test.txt
Далее нам необходимо запустить коннектор, который будет читать сообщения из этого файла и коннектор, который будет сохранять сообщения в sink файле. В наборе файлов, идущих вместе с Kafka есть файлы config/connect-file-source.properties и config/connect-file-sink.properties. В первом файле указаны настройки для считывания файла test txt, а втором настройки для файла хранения сообщений test.sink.txt. Когда команда ниже будет выполнена коннектор, считывающий сообщения из исходного файла test.txt будет загружать их в топик connect-test (который также есть в конфигурационных файлах), а sink коннектор начнет читать сообщения из топика connect-test и запишет их в файл test.sink.txt.
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
Убедиться в работе коннектора можно с помощью просмотра содержимого test.sink.txt
Заключение
На этом, я полагаю сегодняшнюю статью можно завершить. Мы развернули Apache Kafka и Zookeeper, посмотрели как можно вручную создавать топики, добавлять и читать сообщения, а также развернули коннектор, читающий данные из текстового файла. В следующей статье мы продолжим изучение практических аспектов работы с Kafka. А сейчас хочу пригласить вас на бесплатный урок, в рамках которого рассмотрим как в приложениях на Spring Boot можно работать с Kafka. Узнаем, что предоставляет платформа Spring для ускоренной разработки приложений, работающих с Kafka. Посмотрим, какие есть настройки, как это все конфигурируется. Проведем границу между "родным функционалом" Kafka api и "добавками" от Spring Boot.
Зарегистрироваться на бесплатный урок