Всем привет, меня зовут Максим Крупенин, я работаю Data & Analytics Solution Architect в EPAM Systems. За 4 года работы в EPAM мне пришлось поработать в разных проектах, связанных с BI, Big Data, Data warehouse и другими технологиями. В этой статье поделюсь одним из клиентских проектов, где мы реализовали кастомное решение для near real-time аналитики на базе Snowflake. Надеюсь, статья будет полезной, оставляйте фидбек в комментариях.
Если вы интересуетесь аналитической платформой Snowflake, то вы, вероятно, знаете, что этот продукт сейчас в тренде. Компания Snowflake Inc. вышла на IPO в сентябре 2020 года, что стало крупнейшим IPO в истории среди компаний-разработчиков программного обеспечения. Даже инвестиционный гуру Уоррен Баффет инвестировал в Snowflake (это его первое участие в IPO со времен Ford в 1956 году). Тем не менее, эта статья не об инвестициях, а о технологиях. Snowflake — действительно отличная аналитическая платформа, которая позволяет реализовывать современные интегрированные приложения для работы с данными.
За последние несколько лет компания EPAM внедрила много интересных решений, в которых мы использовали Snowflake для различных бизнес-задач наших клиентов. Об архитектуре одной из таких систем мы и расскажем в этой статье.
Бизнес-требования
Наш клиент принял решение разработать аналитический продукт нового поколения для своих клиентов в отрасли образования. Данный продукт собирает и обрабатывает данные из различных источников, позволяя клиентам принимать обоснованные решения. Функции аналитической платформы включают настраиваемые интерактивные дашборды, подключение к различным типам источников данных, self-service data discovery и аналитику в режиме реального времени.
Архитектура
Не все исходные системы, с которыми нам необходимо интегрироваться, обеспечивают механизмы для извлечения данных в режиме реального времени. Поэтому нам пришлось разработать решение, позволяющее комбинировать пакетную и потоковую обработку данных.
Snowflake и Tableau, которые являются основными технологиями нашей аналитической платформы, должны были быть дополнены другими инструментами, чтобы реализовать все функциональные и нефункциональные требования. Также важно отметить, что продукт необходимо было разработать на облачной платформе Amazon Web Services (AWS). В результате, наша команда спроектировала и внедрила решение, схожее по дизайну с подходом Lambda Architecture:
Если обобщить, то архитектура состоит из следующих трёх логических уровней:
BATCH LAYER извлекает и обрабатывает данные один раз в день, используя API систем источников. Данный слой нацелен на подготовку точных и качественных данных за предыдущий день. Это означает, что на этом уровне исправляются ошибки, данные согласовываются с другими системами-источниками, и проверяется качество данных. Выходные данные хранятся в Snowflake в слоях Core DWH и Data Mart и заменяют данные предыдущего дня, которые были сформированы в Speed Layer.
SPEED LAYER обрабатывает потоки данных из исходных систем в режиме реального времени (где это возможно), без требований к полноте и высокому качеству данных. Этот уровень направлен на минимизацию задержек, предоставляя доступ к данным в реальном времени. По сути, Speed Layer отвечает за заполнение «пробелов» — данных за текущий день отсутствующих в Batch Layer. Представления (table views) этого уровня могут быть не такими точными и полными, как те, которые, в конечном счёте, создаются на Batch Layer, но данные для аналитики доступны почти сразу по мере обработки событий в системах источниках.
SERVING LAYER объединяет данные из Batch Layer и Speed Layer через Snowflake DB views и отвечает на ad-hoc запросы конечных пользователей, поступающие из Tableau.
Детали реализации нашего решения
Далее, я подробнее остановлюсь на реализации каждого из уровней.
Извлечение и обработка пакетных данных (Batch Layer)
Ниже представлен пошаговый процесс работы с данными:
Извлечение данных (data ingestion). В качестве первого шага, данные поступают из исходной системы в Data Lake (AWS S3), используя API исходной системы. Скрипты Python запускаются с помощью Apache Airflow, который является инструментом оркестрации для всего пайплайна обработки данных. Amazon Elastic Kubernetes Service (EKS) используется для управления ресурсами для сервисов Airflow.
Загрузка данных в Snowflake. «Сырые» (raw) данные из Data Lake копируются в Snowflake в слой Landing Area. Для этого Apache Airflow запускает команды Snowflake copy, которые выполняются на Snowflake Virtual Warehouse.
Преобразование в слой Core DWH. Исходные данные из Snowflake Landing Area преобразуются в Core DWH слой, где у нас строится каноническая доменная модель данных (здесь используется подход Data Vault). Apache Airflow запускает SQL-скрипты преобразования данных, которые выполняются на Snowflake Virtual Warehouse.
Преобразование в слой Data Mart. Данные из слоя Core DWH преобразуются в data marts (тут моделируем данные по схеме “звезда”) в соответствии с конкретными бизнес-требованиями. Здесь есть два варианта в зависимости от сложности преобразований:
Виртуальные витрины данных с использованием представлений (Snowflake DB views): в этом варианте данные не сохраняются физически.
Snowflake таблицы: Apache Airflow запускает SQL-скрипты, которые выполняются на Snowflake Virtual Warehouse. Скрипты вычисляют сложные бизнес-правила и записывают информацию в таблицы витрин данных.
Обработка данных в реальном времени (Speed Layer)
На Speed Layer данные проходят следующие этапы:
Сервис асинхронных событий системы источника выступает как фильтр для захвата трафика из исходной системы в виде сообщений в формате JSON. Затем он направляет сообщения в топики (topics) Amazon MSK (сервис Kafka в AWS).
Инструмент StreamSets Data Collector (SDC) используется для извлечения событий из топиков Kafka и их обработки:
фильтрует необходимые сообщения JSON;
обогащает данные на лету с помощью API исходной системы (например, получает имя объекта, передав его ID);
применяет другие необходимые преобразования к данным (например, маскирование конфиденциальных данных и фильтрацию);
конвертирует данные в CSV-формат и помещает файл в Data Lake S3 bucket – Online Data (current date) на диаграмме;
параллельно отфильтрованные исходные сообщения JSON помещаются в архив (Data Lake S3 bucket – Online Raw Data Archive).
Используется Snowflake external table (Live View на диаграмме), которая позволяет нам запрашивать информацию непосредственно из файлов в Data Lake S3 buckets. Таким образом, данные не хранятся в базе данных Snowflake, а проходят через Snowflake Virtual Warehouse только при запросе данных с сервера Tableau.
Serving Layer
Serving Layer реализован в виде набора представлений (Snowflake DB views), которые объединяют (union SQL operator) информацию из витрин данных (подготовленных на этапе пакетной обработки) и из Snowflake external table (Live View на диаграмме).
В результате, фактические данные готовы для запросов пользователей. Tableau сервер в режиме Live Connection формирует запросы непосредственно к Snowflake.
Альтернативные варианты реализации аналитики в реальном времени с помощью Snowflake
Если у вас есть опыт работы со Snowflake, вам может быть интересно, почему мы не использовали Snowpipe для реализации непрерывной загрузки данных в базу данных. Действительно, Snowpipe позволяет загружать данные небольшими пакетами (micro-batches) из файлов, как только они становятся доступными в AWS S3. Но при таком варианте данные попадают в базу с задержкой около минуты.
В нашем же случае было требование сделать данные доступными для пользователей с меньшей задержкой (до 5 секунд). Еще одним аргументом в пользу Snowflake external tables была стоимость непрерывной загрузки данных в Snowflake. С помощью модели бессерверных (serverless) вычислений Snowpipe, Snowflake хоть и управляет эффективно выделением необходимых ресурсов за вас, но итоговая стоимость может оказаться достаточно высокой, если события формируются непрерывно.
В случае с external tables, мы не загружаем данные в Snowflake (они хранятся в S3 bucket). Соответственно, нам не нужно платить за загрузку данных. Однако также важно помнить, что по мере добавления файлов с новыми событиями в S3, внешние таблицы должны обновляться (alter external table … refresh).
Выводы
Сегодня всё больше организаций стремятся стать data-driven. В результате, требования бизнеса усложняются. Например, компании больше не хотят ждать дни, чтобы анализировать события в операционных системах, а хотят действовать проактивно, основываясь на информации, полученной почти в реальном времени.
Это приводит к необходимости внедрять новые возможности анализа данных. Snowflake — аналитическая платформа нового поколения, которая позволяет эффективно реализовывать различные сложные сценарии, об одном из которых вы узнали из этой статьи.