Построение пайплайна анализа данных в реальном времени с помощью Airbyte, Kafka и Pinot

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Как правило, пайплайн аналитики в реальном времени состоит из нескольких компонентов, включая пайплайны ELT, платформу потоковой передачи событий и аналитическую базу данных, способную давать ответы на многочисленные запросы. Однако создание и поддержка такой платформы стоит дорого, а также требует значительных инженерных усилий и времени.

Наличие в вашем распоряжении инструментов с открытым исходным кодом для дата инжиниринга и аналитики помогает снизить стоимость запуска пайплайна аналитики в реальном времени. Доступ к исходному коду позволяет избежать зависимости от поставщика, что предоставляет вам гибкость при настройке решения в соответствии с потребностями вашей организации. В этой статье рассматриваются три популярных продукта с открытым исходным кодом в области данных — Airbyte, Apache Kafka и Apache Pinot — для создания пользовательского дашборда электронной коммерции, который обновляется в режиме реального времени.

Airbyte — это платформа интеграции данных с открытым исходным кодом, способная перемещать данные из OLTP (On-Line Transaction Processing) баз данных, таких как MySQL, в места назначения, такие как Apache Kafka, используя захват измененных данных (CDC) с низкой задержкой. Apache Pinot — это база данных OLAP (On-Line Analytical Processing) с открытым исходным кодом, способная принимать потоковые данные из Kafka и делать их доступными для запросов в течение нескольких секунд.

Архитектура аналитического пайплайна реального времени
Архитектура аналитического пайплайна реального времени

Архитектура аналитического пайплайна реального времени

В этом разделе рассматриваются ключевые компоненты решения, а также обоснование их выбора для данного проекта.

Почему бы не сделать прямой запрос к оперативной базе данных OLTP?

Мы получаем аналитику дашборда продавца, анализируя заказы электронной коммерции. В настоящее время заказы хранятся в базе данных MySQL, что затрудняет выполнение аналитических запросов. OLAP-запросы требуют агрегирования и фильтрации большого количества записей для получения метрик, что приводит к снижению производительности MySQL, которая не предназначена для обработки таких запросов.

Поэтому нам нужно извлечь заказы из MySQL и переместить их в Apache Pinot для дальнейшего анализа. Для этого мы используем Airbyte.

CDC-пайплайн для перемещения заказов из MySQL в Kafka

Источник CDC для MySQL от Airbyte извлекает заказы из MySQL с помощью захвата измененных данных CDC (Change Data Capture).

Реляционная база данных, такая как MySQL, ведет лог транзакций для записи каждой операции, изменяющей состояние, такой как вставка, обновление и удаление. Механизм CDC обрабатывает этот лог для обнаружения объектов базы данных, которые были изменены, и передает их в виде событий изменения. Таким образом, мы можем получить инкрементные обновления, чтобы избежать дорогостоящего извлечения всей таблицы.

Airbyte запускает такое извлечение через запланированный интервал времени, например, каждый час, день, неделю или около того. Давайте зададим его через каждые пять минут, чтобы получить больше свежих данных. Извлеченные заказы записываются в тему Kafka (orders) как события в формате JSON, а оттуда передаются в Pinot.

Apache Pinot для быстрого выполнения OLAP-запросов на потоковых данных

Пайплайн Airbyte ELT, запускаемый каждые пять минут, генерирует огромное количество необработанных данных, которые необходимо проанализировать. Кроме того, проанализированные данные будут доступны для всех продавцов платформы, что потребует от нас высокой пропускной способности запросов и задержки в миллисекундах. Следовательно, движок аналитики должен быть способен выполнять аналитические запросы и возвращать результаты в режиме реального времени, чтобы обеспечить хороший пользовательский опыт. Поэтому мы будем использовать Apache Pinot в качестве движка аналитики для удовлетворения вышеперечисленных потребностей.

Надеюсь, теперь у вас есть твердое понимание того, что мы будем строить дальше. Теперь можно продолжать чтение статьи или ознакомиться с готовым решением в этом репозитории GitHub.

Шаг 1: Необходимые условия

Подразумевается, что на вашей машине установлен Docker Compose. Для лучшей производительности рекомендуется иметь не менее 8 ГБ оперативной памяти и достаточный объем дискового пространства. Клонируйте следующий репозиторий GitHub на свою локальную машину и перейдите в папку проекта.

git clone https://github.com/dunithd/edu-samples
cd edu-samples/airbyte-pinot

Настройка Apache Kafka и Apache Pinot

Далее мы создадим одноузловой кластер Kafka и многоузловой кластер Apache Pinot с помощью Docker Compose. Клонированный проект содержит файл docker-compose.yml. Запустите стек Docker, выполнив команду:

docker-compose up

Мы рассмотрим эту настройку в следующих разделах.

Настройка MySQL

После запуска стека Docker давайте создадим базу данных MySQL для загрузки нескольких моков (имитация) заказов электронной коммерции. Расположение базы данных не имеет значения; это может быть как размещенный (хостируемый) экземпляр MySQL, так и локальная установка. В этой статье мы будем использовать локальную инсталляцию.

Создание базы данных электронной коммерции, таблицы заказов и вставка моков 

Подключитесь к вашему экземпляру MySQL через MySQL CLI или с помощью GUI-инструмента. Выполните следующий скрипт для создания таблицы orders (заказов) в базе данных ecommerce (электронной коммерции) и вставьте несколько моков заказов. Вы можете найти этот скрипт в каталоге mysql в сопутствующем репозитории GitHub.

mysql -u {username} -p < airbyte-pinot/mysql/ecommerce-schema.sql

После завершения работы скрипта вы можете проверить содержимое таблицы заказов, выполнив команду:

select * from orders;

Создайте выделенного пользователя с доступом к таблице заказов

Всегда рекомендуется предоставлять Airbyte широкие права доступа к MySQL. Мы можем сделать это, создав выделенного пользователя MySQL с необходимыми привилегиями. Чтобы создать выделенного пользователя базы данных, выполните следующие команды для вашей базы данных.

CREATE USER 'airbyte'@'%' IDENTIFIED BY '<password>';

Необходимые разрешения зависят от метода репликации. В то время как метод репликации STANDARD требует только разрешения SELECT, репликация CDC требует разрешений SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE и REPLICATION CLIENT.

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'airbyte'@'%';

Теперь наша база данных ecommerce готова к использованию с Airbyte.

Установка Airbyte

Мы запустим Airbyte на вашей локальной машине как отдельный проект Docker Compose. Чтобы это сделать следуйте данным инструкциям.

Шаг 2: Синхронизация данных из MySQL в Kafka с помощью Airbyte

После запуска Airbyte нам нужно указать ему, откуда читать данные (source — источник), куда их перемещать (destination — место назначения) и создать соединение. Это можно сделать с помощью пользовательского интерфейса Airbyte, который работает на localhost:8000.

Настройка источника MySQL CDC

Войдите в Airbyte UI (пользовательский интерфейс), выберите Sources > new source (источники > новый источник) и выберите MySQL в качестве типа. Убедитесь, что выбрали CDC в качестве метода репликации. В данном примере мы не будем применять протокол SSH. Однако при использовании публичной интернет-сети рекомендуется использовать SSH-туннель.

Укажите следующие значения в пользовательском интерфейсе. Я подключаюсь к своей локальной установке MySQL, которая работает на дефолтном порту. Не стесняйтесь вносить корректировки значений в зависимости от вашего окружения.

Настройки источника MySQL
Настройки источника MySQL

Настройка места назначения Kafka

Далее мы настроим место назначения Kafka в Airbyte для потоковой передачи данных о заказах MySQL в режиме, близком к реальному времени (минуты вместо секунд). Для этого мы подключимся к одноузловому экземпляру Kafka, который мы запустили ранее. Сначала создадим тему Kafka под названием 'orders', которая будет использоваться для записи данных CDC. Перейдите в клонированный проект, затем выполните следующую команду с корневого уровня.

docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh 
--bootstrap-server localhost:9092 --create -topic orders

В пользовательском интерфейсе Airbyte выберите Destinations > new destination (места назначения > новое место назначения) и Kafka в качестве типа. Большинство значений можно установить по умолчанию, за исключением Topic Pattern и Bootstrap Servers.

Настройки места назначения Kafka
Настройки места назначения Kafka

Создание пайплайна MySQL CDC для Kafka

После настройки источника и места назначения вы можете создать соединение между MySQL и Kafka в Airbyte, чтобы построить пайплайн данных между ними. В разделе "select the data you want to sync" (выберите данные, которые вы хотите синхронизировать) укажите таблицу orders и отметьте Incremental в разделе Sync mode.

Настройки подключения
Настройки подключения

В настоящее время вы можете установить частоту синхронизации до пяти минут. Если вам нужна более низкая частота, можно запускать синхронизацию Airbyte через API или интегрировать ее в инструмент управления рабочим процессом, например Airflow, Prefect или Dagster. После настройки вы можете увидеть соединение на вкладке Connections.

Верификация данных в Kafka

После первого запуска задачи синхронизации она синхронизирует все заказы из MySQL в Kafka. Вы можете выполнить следующую команду, чтобы проверить содержимое темы orders (заказы).

docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh 
--bootstrap-server localhost:9092 --topic orders --from-beginning

Вы увидите записи в формате JSON, выходящие из Kafka в таком виде.

{
   "_airbyte_ab_id":"c3441ff2-fa23-49da-a7a2-82c7c4ecae4f",
   "_airbyte_stream":"orders",
   "_airbyte_emitted_at":1646400487271,
   "_airbyte_data":{
      "id":1,
      "store_id":100,
      "order_date":"2021-08-15",
      "channel":"STORE",
      "country":"Hungary",
      "total":173.0399932861328,
      "status":"ACTIVE",
      "_ab_cdc_updated_at":"1970-01-01T00:00:00Z",
      "_ab_cdc_log_file":"binlog.000021",
      "_ab_cdc_log_pos":156,
      "_ab_cdc_deleted_at":null
   }
}

Начиная с первого задания, Airbyte запускает задачу синхронизации каждые пять минут, чтобы синхронизировать новые или обновленные заказы из MySQL в Kafka.

Шаг 3: Синхронизация данных из Kafka в Pinot

Теперь наши заказы отображаются в Kafka. Далее мы передадим их в Apache Pinot, чтобы дашборд мог выполнять аналитические запросы.

Pinot — это распределенная система, состоящая из различных компонентов, отвечающих за ввод данных, их хранение и брокерскую обработку запросов. Pinot также зависит от Zookeeper для хранения метаданных и координации работы кластера.

Если вы помните, в предварительных условиях мы запустили Kafka, Zookeeper и остальные компоненты Pinot как контейнеры Docker. Это упрощает многие вещи для нас. Однако можно следовать этому руководству, если вы хотите настроить кластер Pinot вручную.

Создайте схему и таблицу для заказов

Перед тем как принять входящий поток заказов, Pinot требует, чтобы вы заранее определили его структуру. Это позволит Pinot оптимизировать стратегии хранения и индексирования, чтобы обеспечить более быстрый анализ данных.

Для этого мы создаем схему и таблицу для информации, поступающей от заказов. Схема обеспечивает логическую абстракцию для базовых данных, объявляя атрибуты, типы данных и другие ограничения. Таблица реализует схему, определяя конкретную информацию о требованиях к вводу, индексированию и хранению данных. Схему можно рассматривать как шаблон для таблицы.

Ниже приведена схема для таблицы заказов, которую вы можете найти в папке config в репозитории GitHub. Атрибуты схемы разделены на три части: поля dimensions (измерения), metrics (метрики) и datetime (значение даты и времени). Агрегирование выполняется по полям metrics, таким как подсчеты, итоги и средние значения, а временная фильтрация и сортировка выполняются по полям datetime. Остальные атрибуты относятся к dimensions.

{
    "schemaName": "orders",
    "primaryKeyColumns": [
      "id"
    ],
    "dimensionFieldSpecs": [
      {
        "name": "id",
        "dataType": "INT"
      },
      {
        "name": "store_id",
        "dataType": "INT"
      },
      {
        "name": "channel",
        "dataType": "STRING"
      },
      {
        "name": "country",
        "dataType": "STRING"
      },
      {
        "name": "status",
        "dataType": "STRING"
      }
    ],
    "metricFieldSpecs": [
      {
        "name": "total",
        "dataType": "FLOAT"
      }
    ],
    "dateTimeFieldSpecs": [{
      "name": "order_date",
      "dataType": "STRING",
      "format" : "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd",
      "granularity": "1:DAYS"
    }]
  }

Ниже приведено определение таблицы, которая является таблицей REALTIME. Блок конфигурации streamConfigs определяет настройки Kafka и Zookeeper, необходимые для приема данных в реальном времени.

{
    "tableName": "orders",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "order_date",
      "schemaName": "orders",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "ingestionConfig": {
      "batchIngestionConfig": {
        "segmentIngestionType": "APPEND",
        "segmentIngestionFrequency": "DAILY"
      },
      "transformConfigs": [
        {"columnName": "id", "transformFunction": "JSONPATHLONG(_airbyte_data, '$.id')" },
        {"columnName": "store_id", "transformFunction": "JSONPATHLONG(_airbyte_data, '$.store_id')" },
        {"columnName": "channel", "transformFunction": "JSONPATHSTRING(_airbyte_data, '$.channel')" },
        {"columnName": "country", "transformFunction": "JSONPATHSTRING(_airbyte_data, '$.country')" },
        {"columnName": "total", "transformFunction": "JSONPATHDOUBLE(_airbyte_data, '$.total')" },
        {"columnName": "status", "transformFunction": "JSONPATHSTRING(_airbyte_data, '$.status')" },
        {"columnName": "order_date", "transformFunction": "JSONPATHSTRING(_airbyte_data, '$.order_date')" }
      ]
    },
    "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.topic.name": "orders",
          "stream.kafka.broker.list": "kafka:9093",
          "stream.kafka.consumer.type": "lowlevel",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
        }
    },
    "tenants": {},
    "metadata": {},
    "routing": {
      "instanceSelectorType": "strictReplicaGroup"
    },
    "upsertConfig": {
      "mode": "FULL"
    }
  }

Преобразование поступающих данных

Записанные в Kafka результаты синхронизации форматируются в JSON в соответствии со спецификацией Airbyte. Поэтому во время получения данных каждое событие Kafka нормализуется и сопоставляется с соответствующим атрибутом схемы. Это происходит внутри блока transformConfigs.

Обработка дубликатов записей с помощью апсертов

В реальности заказ может проходить множество стадий жизненного цикла, таких как OPEN, PROCESSING, IN_TRANSIT и CANCELLED (ОТКРЫТ, ОБРАБОТКА, В_ТРАНЗИТЕ И ОТМЕНЕН). Мы определили поле статуса, чтобы отразить это в MySQL. Изменения статуса заказа фиксируются Airbyte и публикуются в Kafka, в конечном итоге попадая в Pinot в виде дублированных записей заказов. Но мы знаем, что они относятся к одному и тому же заказу.

Чтобы избежать этого, мы можем включить фичу апсертов в таблице заказов, которая позволяет осуществлять мерджинг записей данных, имеющих одинаковый первичный ключ. Мы уже определили id (идентификатор) как первичный ключ в схеме заказов. Следующий блок конфигурации в вышеприведенном определении таблицы определяет FULL (ПОЛНЫЙ) апсерт в таблице заказов, полностью перезаписывая старый заказ последней поступившей записью заказа.

"upsertConfig": {
      "mode": "FULL"
}

Наконец, выполните следующую команду, чтобы создать схему и таблицу заказов внутри Pinot.

docker-compose exec pinot-controller bin/pinot-admin.sh AddTable -schemaFile /config/orders_schema.json -tableConfigFile /config/orders_table.json -exec

Верификация процесса получения данных из Kafka в Pinot

Pinot начинает получать информацию из темы заказов сразу после завершения команды и заполняет таблицу orders входящими заказами.

Консоль запросов Pinot с таблицей заказов
Консоль запросов Pinot с таблицей заказов

Мы также можем верифицировать фичу апсертов, обновив статус заказа в MySQL.

update orders set status='CANCELLED' where id=10;

Выполнение следующего запроса в Pinot приведет к возврату записи заказа с обновленным полем статуса CANCELLED (ОТМЕНЕН).

select * from orders where id = 10;

Написание запросов для анализа данных о заказах

Теперь, когда мы получили информацию о заказах в Pinot, то можем сделать несколько специальных SQL-запросов, чтобы найти ответы на вопросы, которые у нас были в начале. Для этого воспользуемся встроенной консолью запросов Pinot.

В нашем примере каждый заказ имеет поле store_id для идентификации магазина, к которому он относится. Когда продавец авторизуется в дашборде, заказы могут быть отфильтрованы по его store_id. Пока что давайте используем 100 в качестве образца store_id.

Этот запрос возвращает общее количество продаж за последнюю неделю.

select sum(total) as total_sales
from orders
where store_id=100 and
ToEpochSeconds(FromDateTime(order_date, 'YYYY-MM-dd')) > ToEpochSeconds(now()- 86400000)

Этот запрос возвращает вклад каналов продаж на основе их дохода.

select channel, sum(total) as total_sales
from orders
where store_id=100 and
ToEpochSeconds(FromDateTime(order_date, 'YYYY-MM-dd')) > ToEpochSeconds(now()- 86400000)
group by channel
order by total_sales desc

Шаг 4: Постройте дашборд для аналитики, ориентированный на пользователя 

Теперь мы выполнили самую сложную часть решения — перенесли заказы из MySQL в Pinot. После того, как данные попали в Pinot, пользовательский аналитический дашборд может быть создан с помощью любой фронтенд-технологии, такой как React, Node.JS или даже Python. Затем дашборд получает данные из Pinot через REST API или соответствующие интерфейсы драйверов.

Pinot REST API позволяет вам отправить любой из SQL-запросов, которые мы выполнили выше, в виде HTTP POST-запроса. В ответ вы получите ответы в JSON-формате. Например, для запроса общего объема продаж:

curl -H "Content-Type: application/json" -X POST -d '{"sql":"select sum(total) as total_sales from orders"}' http://localhost:8000/query/sql‍

Создание дашборда выходит за рамки данной статьи. Поэтому мы рассмотрим это в будущем.

Резюме

Аналитический дашборд, ориентированный на пользователя, требует выполнения сложных OLAP-запросов к базовому датасету. Не рекомендуется запускать их на оперативных OLTP базах данных, так как это может привести к снижению производительности. Следовательно, оперативные данные должны быть перенесены в базу данных OLAP.

В этой статье мы узнали о построении аналитического пайплайна для работы в режиме реального времени с Airbyte для перемещения заказов электронной коммерции из MySQL в Kafka. Полученные заказы передаются в Apache Pinot для ответа на OLAP-запросы с низкой задержкой, поступающие с дашборда, ориентированного на пользователя. В результате продавцы получают аналитику магазина в режиме реального времени, которую можно использовать для принятия обоснованных решений.


Сегодня вечером пройдет открытый урок «Schema Registry в Apache Kafka». На встрече познакомимся с использование реестра схем при работе с Kafka. Вы узнаете, что такое Kafka, эволюция схем и как реестр схем облегчает работу с Kafka при эволюции схем.

Записаться на открытый урок можно на странице курса "Apache Kafka".

Источник: https://habr.com/ru/companies/otus/articles/739940/


Интересные статьи

Интересные статьи

Всем привет, данная статья является, своего рода моей первой, но все же постараюсь максимально просто рассказать вам о том, как создать бота, прикрутив к нему все обещанные выше свистелки-тарахтелки.С...
За месяц поиска работы собрал много полезной информации и хочу с вами ею поделиться.В начале немного статистики по прохождению этапов отбора в компанию:1 этап "Собеседование с HR-ом": 120+ HR-ов (рекр...
Всем здравствуйте! Прошло уже какое-то время с моей первой статьи на хабре. Спасибо всем. Статья, насколько я могу судить, принята сообществом достаточно тепло. Сегодня хотелось бы погов...
Структурирование неструктурированных данных с помощью GROK Если вы используете стек Elastic (ELK) и заинтересованы в сопоставлении пользовательских журналов Logstash с Elasticsearch,...
Корни цифрового звука уходят глубоко в 70-е годы. Тогда японская телевизионная компания NHK проспонсировала производство PCM-рекордеров, которые задействовали во время эфиров. Уже позже с расп...