Как spring-kafka обрабатывает сообщения и не мешает ли этому auto-commit?

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

В предыдущей статье мы рассмотрели как работает KafkaConsumer и как реализован механизм auto-commit.

В этой статье я хочу остановиться на том как получает и обрабатываются сообщения spring-kafka.

Стоит оговориться, что сейчас мы рассматриваем ситуацию с enable.auto.commit = true. Согласно документации начиная с версии 2.3 настройка auto.commit по-умолчанию выставлена в false, хотя раньше это значение было аналогично значению по-умолчанию в kafka-clients, т.е. true. Это связанно с тем что контейнер KafkaMessageListenerContainer имеет собственные механизмы управления коммитом. Насколько это удобнее и какие тут есть плюсы и минусы - пожалуй тема отдельной статьи.

Because the listener container has it’s own mechanism for committing offsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to be false. Starting with version 2.3, it unconditionally sets it to false unless specifically set in the consumer factory or the container’s consumer property overrides.

Я постараюсь ответить на следующие вопросы:

  1. Как в spring-kafka построена работа с KafkaConsumer?

  2. Какие есть возможности для параллельной обработки сообщений?

  3. Что происходит при возникновении ошибок при обработки сообщений?

Рассмотрим типичный минимальный пример для получения сообщений из топика. Прежде всего нам понадобится конфигурация для подключения к топику:

@Configuration
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        myListenerContainerFactory() {
      ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory());
	  	factory.setConcurrency(3);
	  	...
      return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
       Map<String, Object> props = new HashMap<>();
	  	props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “true”);
        ...
        return props;
    }
}

И код который будет совершать какую-то работу с полученными сообщениями:

@KafkaListener(topics = "myTopic", containerFactory=”myListenerContainerFactory”)
public void listen(String data) {
    ...
}

Сперва остановимся подробнее на конфигурации. Тут в игру вступают две сущности:

  1. DefaultKafkaConsumerFactory

  2. ConcurrentKafkaListenerContainerFactory

Первая фабрика по сути своей принимает от нас все необходимые свойства конфигурации и создает уже знакомый нам KafkaConsumer из библиотеки kafka-clients в методе createRowConsumer(...):

protected Consumer<K, V> createRawConsumer(Map<String, Object> configProps) {
return new KafkaConsumer<>(configProps, this.keyDeserializerSupplier.get(),
	this.valueDeserializerSupplier.get());
}

ConcurrentKafkaListenerContainerFactory также прост по своей сути. Он создаёт объект ConcurrentKafkaListenerContainer, который в свою очередь создает KafkaMessageListenerContainer в количестве указанном в поле concurrent. Если наш топик имеет партиций меньше чем указанно в поле concurrent, то значение поля изменяется на количество партиций. Это сделано ввиду бесполезности создания большего числа слушателей, чем у топика есть партиций, т.к. kafka на своей стороне позволяет подключиться к одной партиции только одному слушателю в пределах одной группы слушателей, все остальные слушатели из этой группы будут распределены по другим партициям либо останутся незадействоваными. Все это можно наглядно увидеть в методе doStart()

Итак, мы добрались наконец добрались до KafkaMessageListenerContainer.

В методе doStart() этого класса мы получаем ссылку на метод в классе на который мы повесили аннотацию @KafkaListener и указали в параметре containerFactory название нашего ConcurrentKafkaListenerContainerFactory из конфигурации.

 Object messageListener = containerProperties.getMessageListener();   

Далее в коде мы видим получение объекта AsyncListenableTaskExecutor и если этого объекта не существует будет создан объект с типом SimpleAsyncTaskExecutor, который создаст отдельный поток для нашего слушателя.

AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
if (consumerExecutor == null) {
	consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");
  containerProperties.setConsumerTaskExecutor(consumerExecutor);
}

В конце концов мы создаем обертку над полученным слушателем из класса ListenerConsumer() объявленного тут же в KafkaMessageListenerContainer, а вот он уже в свою очередь создаст в конструкторе экземпляр KafkaConsumer с помощью фабрики DefaultKafkaConsumerFactory объявленной нами в конфигурации

this.consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
   this.consumerGroupId,
   this.containerProperties.getClientId(),
   KafkaMessageListenerContainer.this.clientIdSuffix,
   consumerProperties);

Мы установили связь между кодом, который мы пометили аннотацией @KafkaListener и непосредственно классом KafkaConsumer и разобрались для чего используются классы в конфигурации.

Теперь посмотрим как же происходит получение и обработка сообщения.

Итак у ListenerConsumer есть метод pollAndInvoke() в котором происходит вызов метода в котором в свою очередь непосредственно происходит вызов метода poll() у KafkaConsumer для получения новых сообщений (и коммита offset в случае enable.auto.commit = true) Полученный сообщения передаются в метод invokeListener() для непосредственной обработки.

private void invokeListener(final ConsumerRecords<K, V> records) {
	if (this.isBatchListener) {
		invokeBatchListener(records);
	} else {
		invokeRecordListener(records);
	}
}

В методе помеченным @KafkaListener мы можем обрабатывать сообщения как по одному так и пачкой (все полученные из топика при вызове KafkaConsumer->poll()) Если мы остановились на первом варианте и обрабатываем сообщения в нашем @KafkaListener по одному, то ListenerConsumer будет просто с помощью итератора идти по всему полученному набору передавая сообщения на обработку (метод doInvokeWithRecords(...)), который в свою очередь через цепочку вызовов передает сообщение на обработку нашему методу помеченному аннотацией @KafkaListener.

А что же происходит если в нашем коде при обработке сообщения выбрасывается исключение?

При получении и обработки сообщений ListenerConsumer отлавливает все возможные типы исключений в методе doRun() При обработке неспецифичных исключений используются два разных механизма в зависимости от версии spring-kafka. В версиях младше 2.5 мы можем наблюдать следующее поведение: если мы самостоятельно не настроили обработчик ошибок, то будет создан LoggingErrorHandler, который просто напросто залогирует ошибку и обработка продолжится.

Начиная с версии 2.5 обработчиком по-умолчанию становится SeekToCurrentErrorHandler в котором произойдет 10 попыток обработать сообщение без задержки и если все они закончатся неудачей ошибка также будет залогирована и мы перейдем к обработке следующего сообщения.

Это необходимо учитывать при оценки гарантий обработки сообщений, которые поддерживает наше приложение, т.к. со своей стороны kafka выполнила все обязательства по доставке сообщения.

У нас остался еще один момент, который стоит прояснить: как часто spring-kafka будет вызывать метод poll() у KafkaConsumer ? В нашей конфигурации при создании KafkaListenerContainerFactory мы можем указать следующий параметр:

factory.getContainerProperties().setPollTimeout(3000);  

что будет означать следующее: при вызове метода poll() у KafkaConsumer ему в качестве аргумента будет передаваться это значение (само значение задает время в миллисекундах, т.е. в нашем случае 3 секунды). Именно это происходит в методе doPoll()

 this.consumer.poll(this.pollTimeout);

KafkaConsumer же в свою очередь при вызове метода poll будет ждать переданное ему количество времени пока не наберется столько сообщений сколько мы указали в параметре max.poll.records (значение по-умолчанию 500 записей). Если pollTimeout будет равен 0, то вызов метода poll будет происходить без задержек возвращая пустой результат.

Подведем итог.

  1. Под капотом spring-kafka использует все тот же KafkaConsumer из библиотеки kafka-clients и работа с ним осуществляется в отдельном потоке.

  2. Мы можем смело использовать механизм auto-commit, но этот параметр лучше всего явно прописывать в конфигурации.

  3. Следует внимательно отнестись к обработчикам ошибок по-умолчанию и учитывать их поведение при расчетах надежности нашей системы.

Источник: https://habr.com/ru/post/570674/


Интересные статьи

Интересные статьи

SWAP (своп) — это механизм виртуальной памяти, при котором часть данных из оперативной памяти (ОЗУ) перемещается на хранение на HDD (жёсткий диск), SSD (твёрдотельный накоп...
Ранее в одном из наших КП добавление задач обрабатывалось бизнес-процессами, сейчас задач стало столько, что бизнес-процессы стали неуместны, и понадобился инструмент для массовой заливки задач на КП.
Каждый лишний элемент на сайте — это кнопка «Не купить», каждая непонятность или трудность, с которой сталкивается клиент — это крестик, закрывающий в браузере вкладку с вашим интернет-магазином.
Практически все коммерческие интернет-ресурсы создаются на уникальных платформах соответствующего типа. Среди них наибольшее распространение получил Битрикс24.