PVbase: compacted topic в Apache Kafka

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Подумайте о проблеме, которую вы можете решить с помощью Big Data, и задайте себе вопрос: «Что будет, если мы захотим увеличить масштаб в 100 000 раз?» Ответ прост: producer продолжит закачивать сообщения, в итоге на диске закончится место для их хранения.

Log Compaction (сжатие журнала) – стратегия, с помощью которой вы можете решить эту проблему в Apache Kafka – программном брокере сообщений, используемом для ведения журналов событий, чтения данных в непрерывном режиме с периферийных устройств, сбора информации о поведении посетителей на сайте. Большинство систем, использующих Kafka, являются распределёнными и обрабатывают большие объёмы сообщений в реальном времени.

Здесь я постараюсь описать некоторые нюансы работы compacted topic. Если вы хотите разобраться с механизмом log compaction, эта статья для вас.

Суть сжатия журнала заключается в выборочном удалении записей, в которых у нас есть последнее обновление с тем же первичным ключом.

Для простоты понимания процесса приведём такую аналогию из жизни: допустим, некий человек (Иван Иванович) каждый день смотрит на градусник и вносит записи о температуре в электронный журнал (в раздел «Температура воздуха»):

Иван Иванович

+8 градусов

2 ноября 2021, 14:40

Иван Иванович

+9 градусов

3 ноября 2021, 10:10

Иван Иванович

+10 градусов

4 ноября 2021, 15:30

Другие люди с некой периодичностью смотрят эти записи и используют их в своей деятельности. Причём в практическом смысле их интересует только самая последняя температура, данные по прошедшим дням значения не имеют.

Со временем журнал может быть переполнен ненужными записями, поэтому раз в неделю приходит Василий Петрович и чистит его, удаляя все сообщения и оставляя там только последнюю запись (потому что остальные не имеют смысла):

Иван Иванович

+10 °C

4 ноября 2021, 15:30

Таким образом, в приведённом выше описании по терминологии Kafka:

  • Ключ (key): Иван Иванович;

  • Значение (value): +10 градусов;

  • Метка времени (timestamp): 4 ноября 2021, 15:30;

  • Топик (topic): способ группировки потоков сообщений по категориям [4] – раздел журнала «Температура воздуха»;

  • Раздел (partition): последовательность сообщений топика, которые упорядочены в порядке их поступления [4];

Зная всё это, сообщения от Ивана Ивановича можно представить в следующем виде:

 

ЖУРНАЛ СООБЩЕНИЙ

Топик (topic): раздел журнала «Температура воздуха»

 

Раздел (partition)

Ключ (key)

Иван Иванович

Иван Иванович

Иван Иванович

Значение (value)

+8 градусов

+9 градусов

+10 градусов

Метка времени (timestamp)

2 ноября 2021, 14:40

3 ноября 2021, 10:10

4 ноября 2021, 15:30

 

Cтратегия Log Compaction гарантирует, что Kafka всегда будет сохранять по крайней мере последнее известное значение для каждого ключа сообщения в журнале для одного раздела топика. В основном он используется для таких сценариев, как восстановление к состоянию до сбоя приложения или системы, а также при перезагрузке кеша после перезапуска приложения. Проще говоря, Apache Kafka будет хранить последнюю версию записи и удалять старые версии с тем же ключом.

Сжатие журнала Kafka позволяет потребителям (consumer-ам) восстановить своё состояние из записи compacted topic. Этот процесс не изменит порядок сообщений, но удалит некоторые из них. Кроме того, смещение (offset) раздела для сообщения не изменится. Журнал данных состоит из головы (head) и хвоста (tail). Каждое новое сообщение добавляется в конец head. Все записи log compaction находятся в хвосте уплотнённого блока.

Рис. 1. Архитектура Kafka – смещение разделов
Рис. 1. Архитектура Kafka – смещение разделов

Базовый алгоритм удаления неактуальных данных в топиках основан на следующих свойствах:

  • cleanup.policy=delete – включает механизм удаления неактуальных сообщений;

  • retention.bytes – определяет размер партиции, после превышения которого следует начать удалять сообщения;

  • retention.ms – определяет максимальный возраст сообщения, после превышения которого следует его удалить.

Важная особенность, которую все упускают! Брокер Kafka хранит сообщения в сегментах. Они будут удаляться только тогда, когда все сообщения в конкретном сегменте соответствуют критериям retention policy.

Но если установить cleanup.policy=compact, то стратегия будет compacted topic. Чтобы понять механику, определим важные термины:

  • LogCleaner – пул потоков, которые осуществляют сжатие лога;

  • LogTail – часть лога, над которой уже была произведена процедура сжатия;

  • LogHead – часть лога, над которой процедура сжатия не производилась.

Сам процесс будет выглядеть следующим образом:

  1. LogCleaner определяет лог с наибольшим отношением logHead/logTail.

  2. Он определяет все offset'ы по каждому ключу в logHead.

  3. Копирует самый старый сегмент, удаляя сообщения, ключи которых присутствуют дальше в логе.

  4. Подменяет оригинальный сегмент на сжатый.

Важные параметры настройки logCleaner:

  • log.cleanup.policy=compact – основное свойство, которое включает log compaction;

  • log.cleaner.max.compaction.lag.ms – время жизни сообщения, ключ которого существует дальше по логу;

  • log.cleaner.threads – кол-во потоков logCleaner.

Эта стратегия удаляет не только повторяющиеся записи, но также ключи с нулевым значением.

Детали алгоритма работы

Log compaction гарантирует, что Kafka всегда будет сохранять по крайней мере последнее сообщение в каждом разделе топиĸа. Этот ĸейс применим в случае key-value-сообщения.
В доĸументации рассматривается пример user_id -> email.

Базовый алгоритм удаления неактуальных данных в топиĸах основан на следующих свойствах:

  • cleanup.policy=delete – включает механизм удаления неактуальных сообщений;

  • retention.bytes – определяет размер раздела, после превышения которого следует начать удалять сообщения;

  • retention.ms – определяет максимальный возраст сообщения, после превышения которого нужно его удалить.

Посмотрим на живом примере, ĸаĸ работает log compaction.

Сĸачав последнюю версию Apache Kafka (в моём случае 2.7.0), поднимем zookeeper и броĸера с default-настройĸами:

./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.propertie

Затем создадим топиĸ:

./bin/kafka-topics.sh \
--bootstrap-server localhost:9092\
  --create \
--topic test – topic-1 \
  --config "segment.bytes=1024" \
  --config "cleanup.policy=compact" \
  --config "cleaner.max.compaction.lag.ms =60000"

Запустим consumer'a:

./bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --property "print.key=true" \
  --property "key.separator=    " \
  --from-beginning \
  --topic test topic-1

Запустим producer'а:

./bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --property "parse.key=true" \
  --property "key.separator=:" \
  --topic test topic-1

В оĸне producer'а напишем сообщения:

> a:1
> b:1
> c:1
> a:2

В оĸне consumer'a вы найдёте все те же сообщения. Теперь через 60–90 секунд перезапустите consoleconsumer, и вы увидите:

b   1
c   1
a   2

Сжатые разделы требуют ресурсов памяти и ЦП ваших брокеров. Для успешного завершения сжатия журналов нужна как куча (память), так и циклы ЦП на брокерах, а неудачное сжатие журналов подвергает брокеров риску из-за неограниченного роста раздела.

Вы можете настроить log.cleaner.dedupe.buffer.size и log.cleaner.threads на своих брокерах, но имейте в виду, что эти значения влияют на использование кучи. Если брокер выдаст исключение OutOfMemoryError, он отключится и потенциально может потерять данные.

Размер буфера и количество потоков будут зависеть как от количества очищаемых разделов темы, так и от скорости передачи данных и размера ключа сообщений в этих разделах.

Сжатие журнала – хороший вариант для сценариев кеширования, из которых вы можете просто прочитать последнее состояние compacted topic'а.

Использованные источники:

1.      https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

2.      https://towardsdatascience.com/log-compacted-topics-in-apache-kafka-b1aa1e4665a7

3.      http://cloudurable.com/blog/kafka-architecture-log-compaction/index.html

4.      https://kafka.apache.org/documentation/#intro_concepts_and_terms

Источник: https://habr.com/ru/company/sberbank/blog/590045/


Интересные статьи

Интересные статьи

Статья о том, как упорядочить найм1. Информируем о вакансии2. Ведём до найма3. Автоматизируем скучное4. Оформляем и выводим на работу5. Отчитываемся по итогам6. Помогаем с адаптацией...
У некоторых бизнес-тренеров в области е-коммерса и консультантов по увеличению интернет-продаж на многие вопросы часто можно слышать универсальную отмазку — «надо тестировать» или другую (чтобы не...
Если в вашей компании хотя бы два сотрудника, отвечающих за работу со сделками в Битрикс24, рано или поздно возникает вопрос распределения лидов между ними.
Эта статья для тех, кто собирается открыть интернет-магазин, но еще рассматривает варианты и думает по какому пути пойти, заказать разработку магазина в студии, у фрилансера или выбрать облачный серви...
В 1С Битрикс есть специальные сущности под названием “Информационные блоки, сокращенно (инфоблоки)“, я думаю каждый с ними знаком, но не каждый понимает, что это такое и для чего они нужны