Привет! Меня зовут Андрей Серебрянский, я дата инженер в команде Data Operations. Наша команда отвечает за наполнение нашего хранилища на Snowflake, а также за то, чтобы у остальных команд были данные в реальном времени. Например, лента операций (это покупки клиентов, их переводы, полученный ими кешбек) наполняется на основе наших данных.
Для всех этих задач мы используем Kafka, а главное Kafka Streams. Сегодня я расскажу про то, для каких задач можно применять Kafka Streams и покажу код для наших простых примеров. Это будет полезно тем, кто использует Kafka, но еще не пробовал Kafka Streams. Если вы бы хотели сохранять состояние при обработке Kafka топиков или искали простой синтаксис для обогащения одних топиков информацией из других, то сегодня я покажу, как это можно делать легко и практически из коробки.
План статьи
Немного о Kafka Streams
Зачем нам вообще Kafka Streams
Кейс №1. Обогащаем покупки наших клиентов информацией о бренде
Кейс №2. Забираем данные о клиенте из команды Origination к нам в хранилище
Как все это запустить?
Немного о масштабируемости Kafka Streams
Выводы
Немного о Kafka Streams
Kafka Streams - это библиотека на Java. Для ее работы вам нужна только поднятая Kafka и приложение на Java/Scala.
Она позволяет вам в одну строчку в конфиге включить exactly once processing за счет kafka transactions.
С помощью Kafka Streams вы всего в несколько строчек может сохранять состояние, чтобы выполнять stateful операции (например, искать максимум или сравнивать сообщение с его предыдущей версией).
Зачем на вообще Kafka Streams?
Давайте возьмем одну из наших ежедневных задач: к нам приходит информация о том, что клиент что-то купил, а нам нужно добавить к ней информацию о магазине, о кэшбеке, который клиент получит, о состоянии его счета и о многом другом.
Мы могли бы опрашивать каждый из источников с этими данными самостоятельно один за другим или даже параллельно, когда бизнес-логика позволяет. Этот подход вполне сработает, если источников немного и объем данных, которые приходят в секунду невелик. Но если сообщений много, а источников, в которые надо сходить, десятки, тогда будут появляться задержки, а поддерживать код для такого количества интеграций станет сложно.
Эту ситуацию можно представить на таком примере: представьте, вы собираетесь в путешествие на машине и собираете друзей. Вы заезжаете за одним, за другим, третий слишком долго искал второй носок и задержался на полчаса, затем нужно забрать еще несколько человек.
Все это работает, но не очень быстро и при маленьком объеме друзей.
Теперь представьте, что вместо того, чтобы забирать каждого друга у него дома, вы просите их собраться к определенному времени на платформе скоростного поезда. И в назначенное время просто забираете всех разом. Именно так работает Kafka Streams. При запуске приложения, библиотека заранее загружает все нужные данные из других топиков , а затем поддерживает их в актуальном состоянии
Давайте посмотрим, как сделать такое обогащение в коде.
Кейс №1. Обогащаем покупки наших клиентов информацией о бренде
Итак, у нас есть топик с брендами. В нем по ключу (brand_id) лежит информация о бренде (в нашем примере там будет только имя).
И у нас есть топик с авторизациями клиентов.
Наша задача при получении каждого сообщения дополнить его именем бренда.
Код для такой операции выглядит так
builder.streams("authorization-events")
.join(
builder.globalTable("brands"),
auth -> auth.get("brand_id"), // функция, достающая ключ для джоина из сообщения
(brand, auth) -> auth.set("brandName", brand.get("name")) // функция джоина
);
Но что это за объект builder? Это инстанс библиотечного класса для описания логики ваших преобразований. Его можно создать вот так:
import org.apache.kafka.streams.StreamsBuilder;
...
StreamsBuilder builder = new StreamsBuilder();
Ну и еще раз хотелось бы напомнить, что Kafka Streams заранее загрузило себе все бренды по их id в память (или часть на диск, если брендов слишком много).
Как работает поиск по id бренда?
Kafka Streams может представить любой топик в виде таблицы, где ключом является ключ сообщения в кафке, а значением - само сообщение. Именно это делает код builder.globalTable(topicName)
.
Давайте рассмотрим на примере. У нас есть топик, в котором ключом является имя человека, а значением простой счетчик. Каждый раз когда нам приходит новый ключ, мы добавляем его в таблицу. Если же нам приходит сообщение с ключом, который мы уже видели, мы просто перезаписываем сообщение в таблице.
Теперь давайте посмотрим, что еще умеет Kafka Streams и как это помогает нам решать задачу поставки данных в реальном времени.
Кейс №2. Забираем данные о клиенте из команды Origination к нам в хранилище
Когда клиент впервые скачивает приложение Vivid Money, он проходит небольшой онбординг, во время которого заполняет информацию о себе. Все эти данные сохраняются в базе данных команды Origination - они занимаются процессом регистрации нового клиента в Vivid.
Мы с помощью Kafka Connect и немного доработанного open-source коннектора к dynamodb забираем эти данные к себе в формате JSON.
Но нам важно, чтобы данные были в фиксированном формате. Чтобы мы точно заранее знали, какие поля есть в сообщение, а каких точно нет. Для этой цели идеально подойдет формат Apache AVRO. С его помощью можно описать схему нашего сообщения.
Avro схема нашего сообщения
{
"type": "record",
"name": "OriginationClient",
"namespace": "datahub",
"fields": [
{
"name": "firstName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "lastName",
"type": [
"null",
"string"
],
"default": null
},
...
]
}
Чтобы преобразовать каждое сообщение нашего топика из одного формата в другой, достаточно вот такого кода:
Schema schema = new Schema.Parser().parse(new File("path/to/schema.avsc"));
AvroConverter avroConverter = new AvroConverter(schema);
builder.stream("origination-json-topic")
.mapValues(val -> avroConverter.convert(val))
.to("origination-avro-topic");
AvroConverter в данном случае - это класс, который преобразует сообщение в заданную заранее схему. В open source есть вот такой https://github.com/allegro/json-avro-converter конвертер. Мы используем его доработанную версию.
Окей, у нас есть данные в фиксированном формате. Но было бы круто прежде, чем сложить информацию о клиенте в базу, выделить, какое именно изменение произошло с нашим пользователем в этом сообщении. Эта разница (diff) может быть полезна. Например, изменение поля можно использовать как триггер для других событий.
Для этого нам нужно достать предыдущее сообщение по этому ключу, сравнить их и записать разницу дальше. Достать сообщение по ключу. Звучит знакомо. Да, Kafka Streams конечно же так умеет. На этот раз напишем отдельный трансформер, чтобы показать, что и так можно.
Наш код:
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
...
var changes = builder.stream(sourceTopic);
var stateStoreSupplier = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore**("state-store"**), // указываем имя для нашего стора
Serdes.Bytes(), // указываем тип ключа сообщения во входящем топике
new GenericAvroSerde() // указываем тип значения сообщения во входящем топике
);
builder.addStateStore(stateStoreSupplier);
changes.transform(() -> new ChangeTransformer(), "state-store") // используем имя, заданное раньше
.to(outputTopic);
А ChangeTransformer выглядит вот так:
public class ChangeTransformer implements Transformer {
private KeyValueStore<Bytes, GenericRecord> stateStore;
@Override
public void init(ProcessorContext processorContext) {
this.stateStore = processorContext.getStateStore("state-store");
}
@Override
public KeyValue<String, GenericRecord> transform(String recordKey, GenericRecord record) {
GenericRecord prevState = stateStore.get(recordKey);
return extractDiff(prevState, record);
}
...
}
Как все это запустить?
StreamsBuilder builder = new StreamsBuilder();builder.stream("my-input-topic")
.filter(...)
.map(...)
.to("my-output-topic");
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start(); // ассинхронно
...
kafkaStreams.stop();
Немного о масштабируемости Kafka Streams
Из коробки Kafka Streams позволяет параллельно обработать данные. Максимальный уровень параллельности ограничен только количеством партиций во входящем топике. Если во входящем топике 16 партиций, вы можете поднять вплоть до 16 инстансов вашего приложения. Каждое из них будет повторять ту логику, которую вы заложили.
Больше того, если вы используете state-store (как в примере выше с ChangeTransformer-ом), то стейт будет сохраняться только для той партиции, которую приложение обрабатывает! Это позволит снизить нагрузку на память и диск.
Подробнее об этом можно прочитать вот здесь: https://docs.confluent.io/platform/current/streams/architecture.html#parallelism-model
Выводы
Kafka Streams это:
Удобные stateful операции (join, get previous state). Мы видели это на примере обогащения операции покупки клиента информацией о бренде магазина, где была совершена покупку.
Понятный и короткий код. Чтобы делать преобразования типа map, filter, join есть простой и короткий DSL. Если нужны более сложные преобразования, мы всегда можем воспользоваться методом
transform()
и написать свой класс трансформера. Мы видели это на примере своего собственного ChangeTransformer-а, который находил разницу между текущим и предыдущим состоянием клиента.Простая масштабируемость. Вы можете эффективно обрабатывать данные параллельно. Вы ограничены только количеством партиций во входящем в ваше приложение топике.
P.S. Это моя первая статья на хабре) Я буду рад вашим замечаниям, комментариям и вопросам!