Как мы используем Kafka Streams в команде хранилища данных Vivid Money?

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Привет! Меня зовут Андрей Серебрянский, я дата инженер в команде Data Operations. Наша команда отвечает за наполнение нашего хранилища на Snowflake, а также за то, чтобы у остальных команд были данные в реальном времени. Например, лента операций (это покупки клиентов, их переводы, полученный ими кешбек) наполняется на основе наших данных.

Для всех этих задач мы используем Kafka, а главное Kafka Streams. Сегодня я расскажу про то, для каких задач можно применять Kafka Streams и покажу код для наших простых примеров. Это будет полезно тем, кто использует Kafka, но еще не пробовал Kafka Streams. Если вы бы хотели сохранять состояние при обработке Kafka топиков или искали простой синтаксис для обогащения одних топиков информацией из других, то сегодня я покажу, как это можно делать легко и практически из коробки.

План статьи

  1. Немного о Kafka Streams

  2. Зачем нам вообще Kafka Streams

  3. Кейс №1. Обогащаем покупки наших клиентов информацией о бренде

  4. Кейс №2. Забираем данные о клиенте из команды Origination к нам в хранилище

  5. Как все это запустить?

  6. Немного о масштабируемости Kafka Streams

  7. Выводы

Немного о Kafka Streams

Kafka Streams - это библиотека на Java. Для ее работы вам нужна только поднятая Kafka и приложение на Java/Scala.

Она позволяет вам в одну строчку в конфиге включить exactly once processing за счет kafka transactions.

С помощью Kafka Streams вы всего в несколько строчек может сохранять состояние, чтобы выполнять stateful операции (например, искать максимум или сравнивать сообщение с его предыдущей версией).

Зачем на вообще Kafka Streams?

Давайте возьмем одну из наших ежедневных задач: к нам приходит информация о том, что клиент что-то купил, а нам нужно добавить к ней информацию о магазине, о кэшбеке, который клиент получит, о состоянии его счета и о многом другом.

Мы могли бы опрашивать каждый из источников с этими данными самостоятельно один за другим или даже параллельно, когда бизнес-логика позволяет. Этот подход вполне сработает, если источников немного и объем данных, которые приходят в секунду невелик. Но если сообщений много, а источников, в которые надо сходить, десятки, тогда будут появляться задержки, а поддерживать код для такого количества интеграций станет сложно.

Эту ситуацию можно представить на таком примере: представьте, вы собираетесь в путешествие на машине и собираете друзей. Вы заезжаете за одним, за другим, третий слишком долго искал второй носок и задержался на полчаса, затем нужно забрать еще несколько человек.

Мы последовательно забираем данные из разных источников, ожидая, если в источнике что-то пошло не так
Мы последовательно забираем данные из разных источников, ожидая, если в источнике что-то пошло не так

Все это работает, но не очень быстро и при маленьком объеме друзей.

Слишком много друзей
Слишком много друзей

Теперь представьте, что вместо того, чтобы забирать каждого друга у него дома, вы просите их собраться к определенному времени на платформе скоростного поезда. И в назначенное время просто забираете всех разом. Именно так работает Kafka Streams. При запуске приложения, библиотека заранее загружает все нужные данные из других топиков , а затем поддерживает их в актуальном состоянии

Kafka Streams заранее подтягивает себе нужные данные
Kafka Streams заранее подтягивает себе нужные данные

Давайте посмотрим, как сделать такое обогащение в коде.

Кейс №1. Обогащаем покупки наших клиентов информацией о бренде

Итак, у нас есть топик с брендами. В нем по ключу (brand_id) лежит информация о бренде (в нашем примере там будет только имя).

Топик брендов
Топик брендов

И у нас есть топик с авторизациями клиентов.

Топик авторизаций
Топик авторизаций

Наша задача при получении каждого сообщения дополнить его именем бренда.

Код для такой операции выглядит так

builder.streams("authorization-events")
    .join(
        builder.globalTable("brands"), 
        auth -> auth.get("brand_id"), // функция, достающая ключ для джоина из сообщения
        (brand, auth) -> auth.set("brandName", brand.get("name")) // функция джоина
    );

Но что это за объект builder? Это инстанс библиотечного класса для описания логики ваших преобразований. Его можно создать вот так:

import org.apache.kafka.streams.StreamsBuilder;
...

StreamsBuilder builder = new StreamsBuilder();

Ну и еще раз хотелось бы напомнить, что Kafka Streams заранее загрузило себе все бренды по их id в память (или часть на диск, если брендов слишком много).

Как работает поиск по id бренда?

Kafka Streams может представить любой топик в виде таблицы, где ключом является ключ сообщения в кафке, а значением - само сообщение. Именно это делает код builder.globalTable(topicName).

Давайте рассмотрим на примере. У нас есть топик, в котором ключом является имя человека, а значением простой счетчик. Каждый раз когда нам приходит новый ключ, мы добавляем его в таблицу. Если же нам приходит сообщение с ключом, который мы уже видели, мы просто перезаписываем сообщение в таблице.

https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_duality
https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_duality

Теперь давайте посмотрим, что еще умеет Kafka Streams и как это помогает нам решать задачу поставки данных в реальном времени.

Кейс №2. Забираем данные о клиенте из команды Origination к нам в хранилище

Когда клиент впервые скачивает приложение Vivid Money, он проходит небольшой онбординг, во время которого заполняет информацию о себе. Все эти данные сохраняются в базе данных команды Origination - они занимаются процессом регистрации нового клиента в Vivid.

Информация об имени и фамилии попадает в базу данных команды Origination
Информация об имени и фамилии попадает в базу данных команды Origination

Мы с помощью Kafka Connect и немного доработанного open-source коннектора к dynamodb забираем эти данные к себе в формате JSON.

Мы забираем данные из dynamodb себе в кафку
Мы забираем данные из dynamodb себе в кафку

Но нам важно, чтобы данные были в фиксированном формате. Чтобы мы точно заранее знали, какие поля есть в сообщение, а каких точно нет. Для этой цели идеально подойдет формат Apache AVRO. С его помощью можно описать схему нашего сообщения.

Avro схема нашего сообщения
{
  "type": "record",
  "name": "OriginationClient",
  "namespace": "datahub",
  "fields": [
    {
      "name": "firstName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "lastName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    ...
  ]
}

Чтобы преобразовать каждое сообщение нашего топика из одного формата в другой, достаточно вот такого кода:

Schema schema = new Schema.Parser().parse(new File("path/to/schema.avsc"));
AvroConverter avroConverter = new AvroConverter(schema);

builder.stream("origination-json-topic")
    .mapValues(val -> avroConverter.convert(val))
    .to("origination-avro-topic");

AvroConverter в данном случае - это класс, который преобразует сообщение в заданную заранее схему. В open source есть вот такой https://github.com/allegro/json-avro-converter конвертер. Мы используем его доработанную версию.

Окей, у нас есть данные в фиксированном формате. Но было бы круто прежде, чем сложить информацию о клиенте в базу, выделить, какое именно изменение произошло с нашим пользователем в этом сообщении. Эта разница (diff) может быть полезна. Например, изменение поля можно использовать как триггер для других событий.

Для этого нам нужно достать предыдущее сообщение по этому ключу, сравнить их и записать разницу дальше. Достать сообщение по ключу. Звучит знакомо. Да, Kafka Streams конечно же так умеет. На этот раз напишем отдельный трансформер, чтобы показать, что и так можно.

Наш код:

import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
...

var changes = builder.stream(sourceTopic);
var stateStoreSupplier = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore**("state-store"**), // указываем имя для нашего стора           
  Serdes.Bytes(), // указываем тип ключа сообщения во входящем топике
    new GenericAvroSerde() // указываем тип значения сообщения во входящем топике
);
builder.addStateStore(stateStoreSupplier);
changes.transform(() -> new ChangeTransformer(), "state-store") // используем имя, заданное раньше
    .to(outputTopic);

А ChangeTransformer выглядит вот так:

public class ChangeTransformer implements Transformer {
  private KeyValueStore<Bytes, GenericRecord> stateStore;

  @Override
  public void init(ProcessorContext processorContext) {
     this.stateStore = processorContext.getStateStore("state-store");
  }
  @Override
  public KeyValue<String, GenericRecord> transform(String recordKey, GenericRecord record) {
    GenericRecord prevState = stateStore.get(recordKey);
    return extractDiff(prevState, record);
  }
  ...
}

Как все это запустить?

StreamsBuilder builder = new StreamsBuilder();builder.stream("my-input-topic")
        .filter(...)
        .map(...)
        .to("my-output-topic");
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start(); // ассинхронно
...
kafkaStreams.stop();

Немного о масштабируемости Kafka Streams

Из коробки Kafka Streams позволяет параллельно обработать данные. Максимальный уровень параллельности ограничен только количеством партиций во входящем топике. Если во входящем топике 16 партиций, вы можете поднять вплоть до 16 инстансов вашего приложения. Каждое из них будет повторять ту логику, которую вы заложили.

Больше того, если вы используете state-store (как в примере выше с ChangeTransformer-ом), то стейт будет сохраняться только для той партиции, которую приложение обрабатывает! Это позволит снизить нагрузку на память и диск.

Подробнее об этом можно прочитать вот здесь: https://docs.confluent.io/platform/current/streams/architecture.html#parallelism-model

Выводы

Kafka Streams это:

  • Удобные stateful операции (join, get previous state). Мы видели это на примере обогащения операции покупки клиента информацией о бренде магазина, где была совершена покупку.

  • Понятный и короткий код. Чтобы делать преобразования типа map, filter, join есть простой и короткий DSL. Если нужны более сложные преобразования, мы всегда можем воспользоваться методом transform() и написать свой класс трансформера. Мы видели это на примере своего собственного ChangeTransformer-а, который находил разницу между текущим и предыдущим состоянием клиента.

  • Простая масштабируемость. Вы можете эффективно обрабатывать данные параллельно. Вы ограничены только количеством партиций во входящем в ваше приложение топике.

P.S. Это моя первая статья на хабре) Я буду рад вашим замечаниям, комментариям и вопросам!

Источник: https://habr.com/ru/company/vivid_money/blog/562138/


Интересные статьи

Интересные статьи

Проблемы поиска данных всегда отличались особенной сложностью и зачастую нестандартностью в подходах. Сегодня я бы хотел остановиться на одной интересной задаче, которую ...
Привет, Хабр. Сегодня хотим поделиться с вами интервью с руководителем команды разработки одного из новых продуктов ABBYY. Мы поговорили с ним про найм, принципы построения команды, р...
Сотрудник, который много знает, умеет и готов потушить любой пожар на своей поляне, конечно, молодец. Но если этот герой уходит в отпуск или вообще увольняется, наступают тяжелые времена. Оказыва...
Устраивать конкурсы в инстаграме сейчас модно. И удобно. Инстаграм предоставляет достаточно обширный API, который позволяет делать практически всё, что может сделать обычный пользователь ручками.
В конце прошлого года я делал обзор цен черного рынка на российские персональные данные, и вот пришло время его обновить и дополнить. Заодно посмотрим изменение цен и предложений на этом «рынке...