Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
Привет! На связи Артемий – Analytics Engineer из Wheely.
В условиях постоянно растущей сложности аналитических инструментов и распределенной команды не просто возможно, но и необходимо повышать скорость поставки (T2M) и качество (Quality) выводимого в продуктив функционала. Фокус сегодняшней публикации – внедрение практик интеграционного тестирования с учетом современного аналитического стека.
С практическими примерами и рекомендациями будут рассмотрены следующие аспекты:
Специфика аналитических приложений и пространство для DevOps практик
Рецепт для внедрения Continuous Integration шаг за шагом
Slim CI: оптимизируем и ускоряем процессы
Велком под кат, если словосочетания Continuous Integration, Integration Testing, Time to Market вызывают у вас здоровый энтузиазм и интерес.
Современные аналитические приложения нуждаются в практиках DevOps
Прежде чем приступить к настройке Continuous Integration, предлагаю проникнуться спецификой современного аналитического стека и ощутить потребность в этих практиках.
Аналитические приложения сегодня – сложные системы, состоящие из набора взаимосвязанных компонент, основные группы которых включают:
Пайплайны загрузки исходных данных – Extract & Load (EL в ELT)
Хранилища данных – мощные аналитические СУБД (Redshift, Snowflake, Bigquery и другие)
Оркестратор скриптов и преобразований – T в ELT (dbt, Airflow, Dagster)
BI-инструменты – доступ к данным, дашбординг, визуализация (Looker, Metabase, Tableau)
Наиболее важным с точки зрения аналитики является этап моделирования данных и сбора витрин. Чаще всего Хранилище Данных можно представить в виде направленного ациклического графа (DAG). На листьях графа располагаются витрины данных, оптимизированные под чтение и содержащие в себе бизнес-метрики.
А теперь представьте, что при выпуске релиза была допущена ошибка в одном из узлов графа. Суть в том, что эта ошибка повлечет последствия для всех нижележащих зависимых узлов – они либо будут собраны с ошибками, либо не будут рассчитаны совсем.
Также очевидно, что потребность в аналитике никогда не уменьшается, но только усиливается с развитием проекта и внедрением новых инициатив. Результатом этого является взрывной рост витрин, метрик, дашбордов и отчетов, которые постоянно эволюционируют, дополняются, актуализируются.
Посмотрите на разнообразие метрик в одной из витрин данных. По сути это куб, который позволяет отследить динамику изменения 70+ показателей по разрезе различных измерений.
Как результат, уже само сопровождение этой бизнес-логики становится весьма непростой задачей. Добавьте к этому тот факт, что за разные группы метрик несут ответственность разные члены команд, при этом метрики могут каскадно зависеть друг от друга.
Не менее значим элемент командной работы. Аналитические приложения сегодня – плод совместного труда инженеров данных, аналитиков данных, BI-инженеров, data scientists. Лучшей практикой является работа с приложениями как кодом (code-based) и версионирование изменений (version controlled).
Процессы, основанные на работе с git-репозиториями позволяют значительно ускорить и облегчить совместную работу, однако и здесь не стоит забывать про конфликты и всевозможные баги, которые могут повлиять на работу других членов команды.
Вывод изменений в продуктив может стать непредсказуемым
Все вышеперечисленные особенности только добавляют элементы сложности в структуру аналитических приложений. Без должной уверенности и контроля, в рамках релиза изменений и фиксов, остается только надеяться на лучшее и предполагать, что патчи были протестированы должным образом.
Одно из самых распространенных и неприятных последствий – ухудшение (или отсутствие) сервиса и прямое влияние на бизнес-пользователей.
Как выяснилось в ходе разбора инцидента – ошибка в последнем патче напрямую повлияла на сбор витрин. Одна из ветвей графа была попросту исключена (skipped) из регулярного расчета по причине ошибки в вышележащем узле.
Рецепт для Continuous Integration
Итак, с остротой проблемы разобрались. Теперь ближе к практикам Continuous Integration.
Прежде всего, что такое Continuous Integration? Традиционно считается, что CI – это регулярное слияние веток разработки с новыми фичами или фиксами багов и основной ветки разработки.
Учитывая специфику аналитических приложений, чего мы хотим добиться в рамках CI?
Создание независимого окружения на каждую попытку слияния веток
Всестороннее тестирование качества новой версии кода (витрин данных)
В случае успеха – высвобождение ресурсов и удаление окружения
А теперь непосредственно сама рецептура для внедрения Continuous Integration в ваши приложения:
Запуск (trigger) в рамках Pull Request
Создание независимого окружения
Ограничение объема используемых данных
Выполнение расчетов и запуск тестов
Отчет о статусе (ok / fail)
Удаление окружения (clean up)
1. Запуск (trigger) в рамках Pull Request
Любой современный инструмент автоматизации Continuous Integration предоставляет гибкие настройки для условий срабатывания скрипта:
Запуск по расписанию
Ручной запуск
Запуск по событию (Webhook)
Предлагаю рассмотреть на примере набирающего сегодня популярность инструмента автоматизации Github Actions.
Самый простой и действенный пример – срабатывание при создании pull request к ветке master.
# Controls when the action will run.
on:
# push:
# branches: [ master ]
pull_request:
branches: [ master ]
# workflow_dispatch:
# schedule:
# - cron: '0 5 * * *'
Набор триггеров Github Actions достаточно богат и не ограничивается только pull_request
.
2. Создание независимого окружения
Что такое независимое окружение? Это полноценная среда, в рамках которой вы сможете развернуть уменьшенную копию вашего Хранилища Данных.
В моем случае достаточно создать отдельную схему в базе данных и построить в ней все необходимые объекты.
Для меня работает макрос dbt + Jinja, который автоматически создаст новую схему данных, если запуск осуществляется в рамках джобы ci. При этом название схемы будет содержать порядковый номер pull request в постфиксе.
macro generate_schema_name():
{#
Renders a schema name given a custom schema name. If the custom
schema name is none, then the resulting schema is just the "schema"
value in the specified target. If a schema override is specified, then
the resulting schema is the default schema concatenated with the
custom schema.
This macro can be overriden in projects to define different semantics
for rendering a schema name.
Arguments:
custom_schema_name: The custom schema name specified for a model, or none
node: The node the schema is being generated for
#}
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'ci' -%}
{{ default_schema }}_{{ env_var('GITHUB_PR_NUMBER') | trim }}
{%- else -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endif -%}
{%- endmacro %}
3. Ограничение объема используемых данных
Нет смысла делать расчет на всем объеме данных. Это невыгодно как с точки зрения использования ресурсов кластера, так и с точки зрения времени.
Что в таком случае можно сделать? Нам будет достаточно взять исходные таблицы-справочники без изменений, и ограниченный объем данных из таблиц-фактов, например, 3-е последних суток.
Опять же, на помощь приходит компилируемый код dbt + Jinja, который сможет ограничить выборку в случае запуска джобы под флагом ci. Макрос динамически собирает набор фильтров в конструкции WHERE запросов к таблицам с исходными данными:
{#- prepare expression to filter rows to last 'development_days_of_data' (e.g. last 3 days) -#}
{% set get_dev_watermark_query = 'select dateadd(day, ' ~ -1 * var('development_days_of_data') ~ ', current_timestamp::date)' %}
{# {{ log("get_dev_watermark_query: " ~ get_dev_watermark_query, info=True) }} #}
{% if execute %}
{% set dev_watermark = "'" ~ run_query(get_dev_watermark_query).columns[0][0] ~ "'" %}
{% endif %}
{%- set dev_rows_expression = timestamp_column ~ ' >= ' ~ dev_watermark
if target.name in ['dev', 'ci'] and last_n_days_of_data == true else '1 = 1' -%}
В результате работы макроса будет исполнен следующий sql-запрос:
select
...
from "sandbox"."fines"
where 1 = 1
and __hevo__marked_deleted is not true
and created_at >= '2021-07-11 00:00:00' -- 1 = 1 if not ci
and 1 = 1
Изменение значения переменной development_days_of_data в конфиграции проекта отразится на всех таблицах, подлежащих усечению:
vars:
development_days_of_data: 3
4. Выполнение расчетов и запуск тестов
Основная задача Continuus Integration – идентифицировать проблемы и ошибки на ранних этапах, минимизировав возможные негативные последствия. Для этого потребуется полноценная сборка уменьшенной версии Хранилища Данных и запуск тестов.
При этом тестированию могут быть подвержены любые ваши ожидания и допущения (expectations) о данных. Они могут включать уникальность (unqiueness), наличие значения (not null), ссылочную целостность (referential integrity), допустимые значения атрибута (accepted values) и многие другие. Вопроса тестирования я касался в одном из предыдущих вебинаров – Практическое Качество Данных.
Сам запуск может потребовать наличия библиотек и зависимостей, поэтому удобным может представляться создание отдельного Action который будет вызван в рамках запуска ci. Примером такого Action может служить mybi-dbt-action. По сути, это последовательный запуск команд в контейнере (см. Dockerfile) с предустановленными зависимостями и библиотеками:
#!/bin/sh -l
# set logging
set -ex
# cd to project directory which is ./ by default
cd ${INPUT_PROJECT_DIR:-.}
# perform actions
echo "dbt --version" && dbt --version
echo "dbt debug" && dbt debug
echo "dbt deps" && dbt deps
echo "dbt seed" && dbt seed
echo "dbt run" && dbt run
echo "dbt test" && dbt test
echo 'dbt run-operation "clean_up"' && dbt run-operation "clean_up"
Вызов такого действия c передачей параметров в виде переменных окружения становится тривиальным (dbt_mssql_ci.yml):
jobs:
dbt_deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: dbt mssql action # Invoke dbt action with parameters as ENV variables
uses: kzzzr/mybi-dbt-action@v2
env:
DBT_MSSQL_TARGET: ci
DBT_MSSQL_SERVER: ${{ secrets.DBT_MSSQL_SERVER }}
DBT_MSSQL_USER: ${{ secrets.DBT_MSSQL_USER }}
DBT_MSSQL_PASSWORD: ${{ secrets.DBT_MSSQL_PASSWORD }}
DBT_MSSQL_DATABASE: ${{ secrets.DBT_MSSQL_DATABASE }}
DBT_MSSQL_SCHEMA: dbo
GITHUB_PR_NUMBER: ${{ github.event.number }}
5. Отчет о статусе (ok / fail)
Любая ошибка в ходе выполнения задания будет расценена как fail, т.е. провал тестов и свидетельство о том, что такой патч не следует выводить в продуктив до того, как все ошибки будут исправлены.
При настройке джобы ci как необходимого условия для слияния с мастер-веткой Github любезно покажет статус, и не даст выполнить merge до момента, когда все тесты будут завершены успешно.
6. Удаление окружения (clean up)
После успешных тестов необходимо очистить временную схему и удалить все созданные в рамках тестов объекты. Можно назвать это специфическим garbage collection. Иначе говоря применить принцип временной схемы – transient schema, которая создается только лишь для прогона тестов.
При этом в случае возникновения ошибки схема не удаляется и существует возможность обратиться непосредственно к объектам с данными и детально разобраться, в чем суть и корневая причина бага. Это элемент debugging.
Макрос очистки – clean_up() довольно прост. Он каскадно удаляет схему, использованную в рамках запуска, и все объекты в ней:
-- Delete objects from database after CI tests
{% macro clean_up() %}
{%- if target.name in ['ci'] -%}
{% set schema = target.schema ~ '_' ~ env_var('GITHUB_PR_NUMBER') | trim %}
{% do adapter.drop_schema(api.Relation.create(database=target.database, schema=schema)) %}
{%- endif -%}
{% endmacro %}
Slim CI: оптимизируем и ускоряем
Даже после внедрения полноценного интеграционного тестирования, преждевременным будет считать работу полностью законченной. Всегда можно найти место для элементов оптимизации.
Пропорционально сложности проекта и объему графа витрин растет и время, затрачиваемое на каждый запуск Continuous Integration.
С другой стороны, чем быстрее цикл обратной связи – тем лучше. Чем быстрее мы тестируем и выводим изменения в продуктив – тем быстрее бизнес-пользователи получают результат. Речь о том самом time to market (T2M). В иных случаях это может иметь business critical приоритет.
Как мы можем ускорить время, затрачиваемое на запуски CI?
Идентифицировать, какие конкретно узлы меняются в патче
Запускать тесты только на подграфе Хранилища Данных
Внутренние механизмы dbt позволяют отследить узлы, претерпевшие изменения. В рамках этапа компиляции каждый узел графа идентифицируется хеш-суммой, которая меняется даже при малейшем изменении в коде. Это похоже на работу git, который оперирует хеш-суммами файлов.
Флаги --defer и --state в рамках запуска джобов dbt позволят запустить Continuous Integration в режиме Slim, а именно построение и тестирование только измененных узлов и их нижележащих зависимостей. Ниже краткий и лаконичный комментарий из официальной документации:
Результаты после внедрения Slim CI не могли не обрадовать команду и бизнес-пользователей. Теперь после внесения небольшого изменения в одну из витрин-листьев графа выполняется прогон только этой витрины, а не сборка всего Хранилища из 300+ моделей целиком.
Оседлать волну Modern Analytics Stack
Работа с данными – одно из наиболее востребованных и бурно развивающихся направлений. Каждый день я нахожу новые интересные задачи и придумываю решения для них. Это захватывающий и интересный путь, расширяющий горизонты.
Самые горячие, интересные и актуальные темы и технологии я включил в новый курс Analytics Engineer на платформе OTUS:
ELT – источники данных и пайплайны загрузки
Аналитические СУБД и моделирование данных в DWH
Объемлющая работа с Data Build Tool (dbt)
Аналитические витрины: KPI, Time-series, Cohort, Funnels, Retention, RFM
Установка и работа с инструментами Business Intelligence
Продвинутые техники: External & semi-structured data, Reverse ETL, UDF, ML
Тема настройки Slim Continuous Integration – лишь одна из частей, которые будут рассмотрены в модуле 5 – Продвинутые техники работы с DWH. С детальной программой можно ознакомиться на лендинге Data Warehouse Analyst.
Для тех кто дочитал до конца – запись вебинара Slim Continuous Integration для Хранилища Данных доступна на YouTube.
Обсудить публикацию и задать вопросы можно в моем телеграм-канале Technology Enthusiast.
УЗНАТЬ ПОДРОБНЕЕ О КУРСЕ "Data Warehouse Analyst"
Благодарю за внимание.