Отправка сообщения в несколько слушателей Kafka одновременно

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

В этой статье описывается способ отправки сообщения в несколько слушателей Kafka одновременно. Несколько слушателей будут получать одно и то же сообщение, от одного и того же отправителя сообщений, в данной реализации решения.

Логика реализована с использованием Java, Spring и Kafka.

План

  1. Создать слушатели Kafka

  2. Создать отправителя сообщений Kafka

  3. Создать решение для тестирования

  4. Провести тестирование

  5. Создать список определений для используемых в решении механизмов

1. Создать слушатели Kafka

Создадим 2 слушателя через аннотацию KafkaListener. У этих слушателей должно быть одно и то же название топика(topic) и разные название группы(group).

@Service
public class KafkaTester {

    //Listener 1
    @KafkaListener(id = "id1",
            groupId = "group-one",
            topics = "topic-one")
    public void listenServiceCall(@Payload String message) {
        //Logging
        System.out.println("GROUP ONE MESSAGE " + message);
    }

    //Listener 2
    @KafkaListener(id = "id2",
            groupId = "group-two",
            topics = "topic-one")
    public void listenServiceCall2(@Payload String message) {
        //Logging
        System.out.println("GROUP TWO MESSAGE " + message);
    }
}

Логирование сообщения в слушателях необходимо для проведения тестирования решения.

2. Создать отправителя сообщений Kafka

Создадим отправителя сообщений с применением класса KafkaTemplate.

@Service
public class KafkaTester {

    @Autowired
    private  KafkaTemplate<String, String> kafkaTemplate;

    //... LISTENERS FROM PREVIOUS EXAMPLE
}

Объект KafkaTemplate внедряется в сервис KafkTester через аннотацию Autowired.

3. Создать решение для тестирования

Создадим метод, который будет включаться, в определенные нами промежутки времени. Назовем метод scheduledSend(). Далее, напишем логику отправки сообщения через Kafka, в рамках метода scheduledSend().

Для реализации метода, который будет периодически включаться(scheduledSend()), используется аннотация Schedulled. Период для включения метода установлен в 10 секунд.

@Service
public class KafkaScheduler {

    @Autowired
    private  KafkaTemplate<String, String> kafkaTemplate;

    //10 seconds period
    @Scheduled(cron = "*/10 * * * * *")
    public void send() {
        kafkaTemplate.send("topic-one", "kafkaMessage " + new Date());
        System.out.println("MESSAGE WAS SENT");
    }

    @KafkaListener(id = "id1",
            groupId = "group-one",
            topics = "topic-one")
    public void listenServiceCall(@Payload String message) {
        System.out.println("GROUP ONE MESSAGE " + message);
    }

    @KafkaListener(id = "id2",
            groupId = "group-two",
            topics = "topic-one")
    public void listenServiceCall2(@Payload String message) {
        System.out.println("GROUP TWO MESSAGE " + message);
    }
}

В данном коде видно, что необходимо отправлять сообщение с указанием такого же названия топика(topic), как у слушателей. В данном случае, слушатели примут одно и то же сообщение.

4. Провести тестирование

  1. Запустить приложение с использованием кодом класса KafkaScheduler.

  2. Получить лог программы с одним и тем же временем в сообщениях, с точностью до секунды.

SENT MESSAGE
GROUP ONE MESSAGE kafkaMessage Tue Sep 12 15:17:40 EDT 2023
GROUP TWO MESSAGE kafkaMessage Tue Sep 12 15:17:40 EDT 2023
SENT MESSAGE
GROUP ONE MESSAGE kafkaMessage Tue Sep 12 15:17:50 EDT 2023
GROUP TWO MESSAGE kafkaMessage Tue Sep 12 15:17:50 EDT 2023

5. Список определений

Название топика(topic) - метка для группировки сообщений.

Идентификатор группы(groupId) - метка для группировки слушателей.

Источник: https://habr.com/ru/articles/811087/


Интересные статьи

Интересные статьи

Россияне стали в несколько раз чаще пользоваться отечественными сервисами для видеоконференций, в то время как активность в зарубежных Zoom и Skype в 2023 году снизилась ...
Буквально на секунду представьте, что у вас есть парочка здоровенных кластеров Apache Kafka, каждый из которых держит по нескольку миллионов rps. И тут вас попросили зеркалировать топик из одного клас...
Я (arynme) и teleX уже давно сидим в Telegram и разбираемся в нём.Около года назад мы начали тестировать лимит на сообщения в различных видах чатов Telegram.Так мы и узнали об описанной проблеме и пос...
Я учусь иллюстрировать сложные процессы с помощью комиксов. Нашла себе в копилку крутой кейс: как с помощью комиксов про милых выдр можно ребенку объяснить такую сложную штуку как Apa...
Привет, Хабр! Меня зовут Саша, я лид-разработчик в GlowByte Consulting. Мы с командой сделали неплохой стриминговый движок для одного крупного банка. Сейчас в продакшене крутится о...