Таблица-справочник – генератор DAG? А что так можно было?

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Оглавление

  1. Предистория

  2. Используемые технологии

  3. Подготовка стенда

  4. Теперь всё готово для работы над проектом

  5. Создание SCD-таблицы

  6. Создание инструмента для наполнения SCD-таблицы

  7. Генерация DAG через SCD-таблицу

  8. Заключение

Предистория

Здесь будет рассказано почему это всё получилось и коротко про бизнес-кейс, если не интересно, то переходи сразу к следующему разделу :)

Существуют такие метрики, которые выглядят просто, но очень хорошо отображают суть бизнеса в какой-то момент.

Такие метрики как: MAU, DAU, сумма продаж и прочее дают стейкхолдером хорошее понимание что сейчас с бизнесом.

Для дата-аналитика это выглядит так: дата, название метрики, значение.

Для дата-инженера это выглядит как семь кругов ада: извлечение из источника, очистка, обогащение, агрегация, расчет бизнес-логики, снова агрегация, опять очистка и только тогда метрика готова попасть "на стол". (количество кругов ада различается от команды к команде и от бизнес-задачи, но суть вы поняли :))

Я постоянно пытаюсь как-то автоматизировать свою работу и упростить жизнь и себе, и людям.

И тут появился хороший шанс это сделать.

Пришел бизнес с примерно таким запросом: "У нас существует более ста метрик, которые имеют вид: дата, название метрики, значение. Мы хотели бы это всё автоматизировать, потому что это собирается частично руками, частично какими-то скриптами и у этого всего разные владельцы. А мы хотим всё хранить в одном месте. Ну и самое главное – мы хотим версионировать все свои метрики, чтобы мы знали когда, зачем и почему изменилась та или иная метрика" – на самом этот запрос звучал так: "нам нужно хранить метрики и знать когда, что и почему с ними происходило" :)

И я начал думать.

На ум пришла сразу же идея – использовать SCD.

Сразу захотелось создать такую таблицу с типом "II", в которой будет метрика, логика, комментарии и прочее. У каждой строки будет дата создания, дата "удаления".

А ещё я очень хотел не только создать такую таблицу, но и иметь возможность генерировать DAG через эту таблицу

И я это сделал.

Об этом и пойдет дальше речь.

Если что, то весь код, все коммиты и прочее находятся в данном git-репозитории.

Используемые технологии:

  • MacBook Air (M1, 2020)

    • Оперативная память 16 Gb

    • macOS Monterey 12.6

  • Docker 4.22.0

    • Docker resources:

      • CPUs: 2

      • Memory: 4 Gb

      • Swap: 4 Gb

  • Python3.11

Подготовка стенда

Для начала скачаем себе docker-compose.yaml с официального сайт Airflow:

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.3/docker-compose.yaml'

Если его запустить, то мы получим там Python3.7. Но хотелось бы что-то побыстрее, поэтому давайте сделаем себе Python3.11. Для этого нужно создать Dockerfile со следующей командой:

FROM apache/airflow:2.6.3-python3.11

Затем в docker-compose.yaml изменяем стандартный образ на наш (строка 53):

  # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.3}
  build: .

Всё, готово. Теперь давайте соберем наш проект. Для этого выполним команду:

docker-compose up -d

* Если вы уже выполняли команду docker-compose up -d, то необходимо сначала выполнить команду docker-compose build и затем снова docker-compose up -d.

Ждем, когда скачаются все образы и поднимутся все контейнеры. После сборки Airflow он будет доступен по адресу http://0.0.0.0:8080/home

Создание виртуального окружения

Теперь нужно создать виртуальное окружение, чтобы корректно работала подсветка синтаксиса и работали все подсказки.

Для этого нужно выполнить команду:

python3.11 -m venv venv && \                                                                                                                                                     ~/_code/github/scd_dag_factory  
source venv/bin/activate && \
pip install --upgrade pip && \
pip install -r requirements.txt

В наше виртуальное окружение установятся все зависимости, которые необходимы для работы данного проекта.

Создание сервиса с базой данных в docker-compose

Для нашего проекта понадобится база данных (БД). Я по классике выбрал PostgreSQL (в оригинальном проекте был развернут инстанс GreenPlum 6).

Для того, чтобы корректно работала БД в нашей сборке мы добавим в конец docker-compose.yaml следующую команду:

  test_db:
    image: postgres
    restart: always
    ports:
      - "1:5432"
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres

После добавления этой команды необходимо заново перезапустить сборку командой:

docker-compose up -d

Теперь всё готово для работы над проектом

Для начала напишем простой DAG, который позволит проверить работу коннекта к нашей ранее созданной БД.

DAG, который проверяет коннект к БД в docker-сборке
from datetime import datetime, timedelta

import pendulum

from airflow import DAG

from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator

from airflow.providers.postgres.hooks.postgres import PostgresHook

# Конфигурация DAG
OWNER = 'korsak0v'
DAG_ID = 'check_pg'
LOCAL_TZ = pendulum.timezone('Europe/Moscow')

# Описание возможных ключей для default_args
# https://github.com/apache/airflow/blob/343d38af380afad2b202838317a47a7b1687f14f/airflow/example_dags/tutorial.py#L39
args = {
    'owner': OWNER,
    'start_date': datetime(2023, 1, 1, tzinfo=LOCAL_TZ),
    'catchup': True,
    'retries': 3,
    'retry_delay': timedelta(hours=1),
}


def check_pg_connect(**context):
    """"""
    pg = PostgresHook('test_db')

    df = pg.get_pandas_df('SELECT 1 AS one')

    if len(df) == 1:
        print(True)

with DAG(
        dag_id=DAG_ID,
        schedule_interval='10 0 * * *',
        default_args=args,
        tags=['check_pg_connect', 'test'],
        concurrency=1,
        max_active_tasks=1,
        max_active_runs=1,
) as dag:

    start = EmptyOperator(
        task_id='start',
    )

    check_pg_connect = PythonOperator(
        task_id='check_pg_connect',
        python_callable=check_pg_connect,
    )

    end = EmptyOperator(
        task_id='end',
    )

    start >> check_pg_connect >> end

Запустим его и посмотрим на него. Если раны горят зеленым, то всё идет по плану.

Далее давайте напишем наш типовой DAG, который будет собирать метрики по типу: дата, метрика, значение.

Типовой DAG, который будет собирать какую-то метрику (simple_dag)
from datetime import datetime, timedelta

import pendulum

from airflow import DAG

from airflow.operators.empty import EmptyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator


# Конфигурация DAG
OWNER = 'korsak0v'
DAG_ID = 'simple_dag'
LOCAL_TZ = pendulum.timezone('Europe/Moscow')

# Названия коннекторов к PG
PG_CONNECT = 'test_db'

# Используемые таблицы в DAG
PG_TARGET_SCHEMA = 'dm'
PG_TARGET_TABLE = 'fct_sales'
PG_TMP_SCHEMA = 'stg'
PG_TMP_TABLE = f'tmp_{PG_TARGET_TABLE}_{{{{ data_interval_start.format("YYYY_MM_DD") }}}}'
INDEX_KPI = 1

sql_query = '''
SELECT 
	('2023-'||((random()*11+1)::int)::varchar||'-'||((random()*27+1)::int)::varchar)::date AS date,
	(random()*100)::int AS value,
	1 AS kpi_id
'''

LONG_DESCRIPTION = '# LONG_DESCRIPTION'

SHORT_DESCRIPTION = 'SHORT_DESCRIPTION'


args = {
    'owner': OWNER,
    'start_date': datetime(2023, 1, 1, tzinfo=LOCAL_TZ),
    'catchup': True,
    'retries': 3,
    'retry_delay': timedelta(hours=1),
}

with DAG(
        dag_id=DAG_ID,
        schedule_interval='10 0 * * *',
        default_args=args,
        tags=['dm'],
        description=SHORT_DESCRIPTION,
        concurrency=1,
        max_active_tasks=1,
        max_active_runs=1,
) as dag:
    dag.doc_md = LONG_DESCRIPTION

    start = EmptyOperator(
        task_id='start',
    )

    drop_tmp_before = PostgresOperator(
        task_id='drop_tmp_before',
        sql=f'''DROP TABLE IF EXISTS {PG_TMP_SCHEMA}.{PG_TMP_TABLE}''',
        postgres_conn_id=PG_CONNECT
    )

    create_tmp = PostgresOperator(
        task_id='create_tmp',
        sql=f'''
        CREATE TABLE {PG_TMP_SCHEMA}.{PG_TMP_TABLE} AS
        {
            sql_query.format(
                start_date="{{ data_interval_start.format('YYYY-MM-DD') }}",
                end_date="{{ data_interval_end.format('YYYY-MM-DD') }}"
            )
        };
        ''',
        postgres_conn_id=PG_CONNECT
    )

    delete_from_target = PostgresOperator(
        task_id='delete_from_target',
        sql=f'''
        DELETE FROM {PG_TARGET_SCHEMA}.{PG_TARGET_TABLE}
        WHERE 
            date IN (
                SELECT 
                    date 
                FROM 
                    {PG_TMP_SCHEMA}.{PG_TMP_TABLE}
                )
        ''',
        postgres_conn_id=PG_CONNECT
    )

    insert_from_tmp_to_target = PostgresOperator(
        task_id='insert_from_tmp_to_target',
        sql=f'''
        INSERT INTO {PG_TARGET_SCHEMA}.{PG_TARGET_TABLE}("date", value, kpi_id)
        SELECT 
            "date", 
            value,
            kpi_id
        FROM 
            {PG_TMP_SCHEMA}.{PG_TMP_TABLE}
        ''',
        postgres_conn_id=PG_CONNECT
    )

    drop_tmp_after = PostgresOperator(
        task_id='drop_tmp_after',
        sql=f'''DROP TABLE IF EXISTS {PG_TMP_SCHEMA}.{PG_TMP_TABLE}''',
        postgres_conn_id=PG_CONNECT
    )

    end = EmptyOperator(
        task_id='end',
    )

    start >> drop_tmp_before >> create_tmp >> delete_from_target >> insert_from_tmp_to_target >> drop_tmp_after >> end
    

В генерируемых значениях есть уже колонка kpi_id. Пока не обращаем на неё внимание. О ней чуть позже.

Перед запуском DAG давайте создадим таблицу, в которую будут писаться метрики.

DDL код для создания таблицы фактов
CREATE SCHEMA dm;

CREATE SCHEMA stg;

DROP TABLE IF EXISTS dm.fct_sales;

CREATE TABLE dm.fct_sales (
	id bigserial PRIMARY KEY,
	"date" date NOT NULL,
	kpi_id int2 NOT NULL,
	value int4 NOT NULL
);

После этого мы можем запустить DAG и мы увидим как появляются данные в нашей таблице.

Уже хорошо. Мы, на текущий момент, имеем DAG, который собирает какую-то метрику (давайте примем за условность, что метрика реально собирается, хоть и значения рандомного генерируются).

Теперь мы можем перейти к созданию SCD-таблицы с метриками.

Создание SCD-таблицы

Немного повторюсь, я хочу в этой таблице хранить всю информацию, которая касается метрик. Соответсвенно я хотел бы иметь полную информацию о ней: название, бизнес-логику, комментарии аналитиков, комментарии дата-инженеров и прочее.

А самое главное – я хочу из этой информации генерировать DAG, поэтому нужно определить значения, которые не могут быть пустыми (NOT NULL).

Давайте создадим такую таблицу следующим скриптом:

DDL код для SCD-таблицы
CREATE TABLE public.dim_kpi_dag_gen_config (
	id serial4 NOT NULL,
	kpi_id int4 NOT NULL,
	dag_id varchar(255) NOT NULL,
	"owner" varchar(30) NOT NULL,
	start_date varchar(40) NOT NULL,
	metric_name_en varchar(255) NOT NULL,
	sql_query text NOT NULL,
	short_description_md varchar(255) NULL,
	long_description_md text NULL,
	cron varchar(50) NOT NULL,
	sensors varchar(255) NULL,
	tags varchar(255) NOT NULL,
	metric_line varchar(255) NULL,
	"source" varchar(255) NULL,
	bi_logic varchar(255) NULL,
	comment_pa text NULL,
	comment_de text NULL,
	is_actual bool NULL DEFAULT TRUE,
	created_at timestamptz NULL DEFAULT now(),
	changed_at timestamptz NULL,
	pg_environment varchar(10) NOT NULL DEFAULT 'prod'::character varying,
	airflow_environment varchar(10) NOT NULL DEFAULT 'dev'::character varying,
	CONSTRAINT dim_kpi_dag_gen_config_pkey PRIMARY KEY (id)
)

И давайте сразу же создадим таблицу, в которой будут храниться все наши собранные метрики:

DDL код для таблицы фактов (таблица для всех собираемых метрик)
CREATE TABLE public.fct_dm_kpi (
	"date" date NULL,
	value float8 NULL,
	kpi_id int4 NULL
);

Создание инструмента для наполнения SCD-таблицы

Ниже я покажу MVP-вариант, который может заполнять SCD-таблицу. На своем проекте, с помощью другой команды, мы прикрутили к нему веб-интерфейс.

SCD-инструмент, который позволяет актуализировать и заполнять таблицу
from connectors_to_databases import PostgreSQL


pg = PostgreSQL(
    port=1
)


TABLE = 'dim_kpi_dag_gen_config'


def gen_insert_sql_for_kpi_id(dict_kpi: dict = None) -> str:
    """
    Генерирует скрипт для вставки данных в SCD.

    Определяет есть ли такой ключ.
    Если нет, то делает вставку с нужными данными, указанными в dict_kpi.
    Если есть, то делает вставку с нужными данными, указанными в dict_kpi и дублирует информацию из прошлых строк.

    @param dict_kpi: Словарь с описанием kpi.
    @return: Строку для вставки значений в SCD.
    """

    # Проверка наличия kpi_id в таблице
    df_check = pg.execute_to_df(f'''
    SELECT
        kpi_id
    FROM
        {TABLE}
    WHERE
        kpi_id = {dict_kpi['kpi_id']}
    ''')

    # Проверяем есть ли такой kpi_id в таблице
    if len(df_check) >= 1:
        # В запросе исключаем те поля, которые генерируется сами через `DEFAULT`
        query = f'''
            SELECT 
                column_name 
            FROM 
                information_schema.columns 
            WHERE 
                table_name = '{TABLE}'
                AND column_name NOT IN (
                    'id', 'created_at', 'changed_at', 'is_actual', 
                    {', '.join(f"'{i}'" for i in dict_kpi)}
                )
        '''

        df = pg.execute_to_df(query)  # noqa: PD901

        insert_sql_column_current = ', '.join(value for value in df.column_name)
        insert_sql_column_modified = insert_sql_column_current + f''', {', '.join(i for i in dict_kpi)}'''

        list_values = []

        for value in dict_kpi.values():
            # Обработка одинарных кавычек в значениях. Они встречаются при указании дат.
            if "'" in str(value):
                value = value.replace("'", "''")
                list_values.append(f"'{value}'")
            elif value is None:
                list_values.append('NULL')
            else:
                list_values.append(f"'{value}'")

        insert_sql_column_values = insert_sql_column_current + f''', {', '.join(list_values)}'''

        sql_insert = f'''
        INSERT INTO {TABLE}
        (
            {insert_sql_column_modified}
        )
        SELECT 
            {insert_sql_column_values} 
        FROM 
            {TABLE}
        WHERE
            is_actual IS TRUE
            AND kpi_id = {dict_kpi['kpi_id']};
        '''
    else:
        # Если нет такого kpi_id в таблице, то генерируем вставку значений из словаря
        columns = ', '.join(value for value in dict_kpi)

        list_values = []

        for value in dict_kpi.values():
            if "'" in str(value):
                value = value.replace("'", "''")
                list_values.append(f"'{value}'")
            elif value is None:
                list_values.append('NULL')
            else:
                list_values.append(f"'{value}'")

        values = ', '.join(list_values)

        sql_insert = f'''
            INSERT INTO {TABLE}({columns})
            VALUES ({values});
            '''

    return sql_insert


def scd_dim_kpi(dict_kpi: dict = None) -> None:
    """
    Основная функция, которая принимает на вход словарь с описанием kpi.

    Каждый ключ – это поле в таблице SCD.
    Каждое значение – это значение поля в таблице SCD.

    @param dict_kpi: Словарь с описанием kpi по выбранным колонкам.
    @return: Ничего не возвращает, выполняет SQL-скрипт на вставку данных в SCD.
    """

    # Обновление changed_at в предыдущей актуальной записи
    update_changed_at_for_kpi_id = f'''
    UPDATE {TABLE} 
    SET 
        changed_at = NOW() 
    WHERE 
        kpi_id = {dict_kpi['kpi_id']} 
        AND is_actual IS TRUE;
    '''

    # Вставка новой записи с обновленными значениями полей
    insert_new_values_for_kpi_id = gen_insert_sql_for_kpi_id(dict_kpi=dict_kpi)

    # Обновление is_actual для каждого kpi_id
    update_is_actual_for_kpi_id = f'''
    UPDATE {TABLE} 
    SET 
        is_actual = false 
    WHERE 
        kpi_id = {dict_kpi['kpi_id']} 
        AND id <> (
            SELECT MAX(id) 
            FROM {TABLE} 
            WHERE kpi_id = {dict_kpi['kpi_id']}
        );
    '''

    # Собираем SQL-скрипт из разных кусков, чтобы он прошел в одной транзакции
    sql_query = update_changed_at_for_kpi_id + insert_new_values_for_kpi_id + update_is_actual_for_kpi_id

    print(sql_query)  # noqa: T201
    pg.execute_script(sql_query)

И теперь давайте вызовем нашу основную функцию scd_dim_kpi, чтобы получить первую запись с метрикой в нашей таблице.

# Пример использования
new_values = {
    'kpi_id': 1,
    'dag_id': 'test_1',
    'metric_name_en': 'test_1',
    'owner': 'korsak0v',
    'start_date': '2021-01-01',
    'cron': '10 0 * * *',
    'tags': '''['dm', 'pg', 'gen_dag', 'from_pg']''',
    'sql_query': '''
    SELECT
        date,
        count(values) AS value
    FROM
        fct_some_table_with_random_values
    WHERE
        date BETWEEN '{start_date}' AND '{end_date}'
    GROUP BY
        1
    ''',
}


# Вызов функции
scd_dim_kpi(
    dict_kpi=new_values
)

Вот что мы увидим в БД:

Первая созданная метрика в БД
Первая созданная метрика в БД

Отлично, метрика находится в БД и с этого момента можно сказать начинается её версионирование. Каждое изменение будет фиксироваться в БД отдельной строкой и мы сможем отследить все изменения.

Давайте для примера изменим поле comment_de:

new_values = {
    'kpi_id': 1,
    'comment_de': 'Сделали так, потому что потому'
}

# Вызов функции
scd_dim_kpi(
    dict_kpi=new_values
)

Получим в БД такую информацию: kpi_id не изменился, изменилась информация в dim_kpi_dag_gen_config и также видно, когда была изменена метрика, ну и самое главное – флаг is_actual переместился на последнюю запись по данному kpi_id.

Теперь мы готовы генерировать DAG из этой таблицы.

Генерация DAG через SCD-таблицу

Давайте оставим simple_dag без изменений, но возьмем его за основу и сделаем из него функцию, которая будет возвращать DAG на основании полученных атрибутов.

DAG генератор DAG
from datetime import datetime, timedelta

import pendulum

from airflow import DAG

from airflow.sensors.external_task import ExternalTaskSensor

from airflow.operators.empty import EmptyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

from airflow.providers.postgres.hooks.postgres import PostgresHook


def create_kpi_of_dag(
        owner: str = None,
        dag_id: str = None,
        pg_target_schema: str = None,
        pg_target_table: str = None,
        index_kpi: str = None,
        pg_environment: str = None,
        airflow_environment: str = None,
        long_description: str = None,
        short_description: str = None,
        start_date: str = None,
        cron_expression: str = None,
        tags: str = None,
        sensors: str = None,
        sql_query: str = None,
) -> DAG:
    """
    Функция, которая генерирует типовой DAG для получения метрики.

    Вся логика описана и прокомментирована внутри функции.

    Некоторые моменты обработаны исключительно для функции, чтобы обработать какие-то атрибуты и получить желаемый
    эффект.

    @param owner: Владелец DAG.
    @param dag_id: Название DAG.
    @param pg_target_schema: Целевая схема в PostgreSQL.
    @param pg_target_table: Целевая таблица в PostgreSQL.
    @param index_kpi: ID показателя.
    @param pg_environment: Окружение PostgreSQL.
    @param airflow_environment: Окружение Airflow.
    @param long_description: Полное описание отчета.
    @param short_description: Короткое описание отчета.
    @param start_date: Дата начала работы DAG.
    @param cron_expression: Cron.
    @param tags: Tags.
    @param sensors: Sensors.
    @param sql_query: SQL-запрос для получения метрики.
    @return: Возвращает DAG.
    """
    # Конфигурация DAG
    local_tz = pendulum.timezone('Europe/Moscow')

    # Используемые таблицы в DAG
    pg_target_schema = pg_target_schema
    pg_target_table = pg_target_table
    pg_tmp_schema = 'stg'
    pg_tmp_table = f'tmp_{dag_id}_{{{{ data_interval_start.format("YYYY_MM_DD") }}}}'

    # Названия коннекторов к GP
    pg_connect = 'test_db_dev' if pg_environment == 'dev' else 'test_db'

    # Сделана заглушка атрибута. Это можно использовать для указания разных сценариев в зависимости от окружения
    airflow_environment = airflow_environment

    # Дата приходит в формате str и после парсинга, мы можем получить дату и любые элементы даты
    parse_date = pendulum.parse(start_date)

    args = {
        'owner': owner,
        'start_date': datetime(parse_date.year, parse_date.month, parse_date.day, tzinfo=local_tz),
        'catchup': True,
        'depends_on_past': True,
        'retries': 3,
        'retry_delay': timedelta(hours=1),
    }

    # Tags приходят в str формате, поэтому нужно их правильно "разобрать" и превратить в list
    raw_tags = list(tags.split(','))
    tags_ = []

    for i in raw_tags:
        tags_.append(  # noqa: PERF401
            i.replace("'", "")
            .replace(" ", '')
            .replace("[", "")
            .replace("]", "")
        )

    # Sensors приходят в str формате, поэтому нужно их правильно "разобрать" и превратить в list
    if sensors:
        raw_sensors = list(sensors.split(','))
        sensors_ = []
        for i in raw_sensors:
            sensors_.append(  # noqa: PERF401
                i.replace("'", "")
                .replace(' ', '')
                .replace("[", "")
                .replace("]", "")
            )
    else:
        sensors_ = None

    with DAG(
            dag_id=dag_id,
            schedule_interval=cron_expression,
            default_args=args,
            tags=tags_,
            description=short_description,
            concurrency=1,
            max_active_tasks=1,
            max_active_runs=1,
    ) as dag:
        dag.doc_md = long_description

        start = EmptyOperator(
            task_id='start',
        )

        # Если есть sensors, то мы создаем задачи с сенсорами, иначе создаем одну пустышку
        if sensors_:
            sensors_task = [
                ExternalTaskSensor(
                    task_id=f'sensor_{dag}',
                    external_dag_id=dag,
                    allowed_states=['success'],
                    mode='reschedule',
                    timeout=360000,  # длительность работы сенсора
                    poke_interval=600  # частота проверки
                ) for dag in sensors_
            ]
        else:
            sensors_task = [EmptyOperator(task_id=f'empty_{value}') for value in range(1)]

        drop_tmp_before = PostgresOperator(
            task_id='drop_tmp_before',
            sql=f'''DROP TABLE IF EXISTS {pg_tmp_schema}.{pg_tmp_table}''',
            postgres_conn_id=pg_connect
        )

        create_tmp = PostgresOperator(
            task_id='create_tmp',
            sql=f'''
            CREATE TABLE {pg_tmp_schema}.{pg_tmp_table} AS
            {
                sql_query.format(
                    start_date="{{ data_interval_start.format('YYYY-MM-DD') }}",
                    end_date="{{ data_interval_end.format('YYYY-MM-DD') }}"
                )
            };
            ''',
            postgres_conn_id=pg_connect
        )

        delete_from_target = PostgresOperator(
            task_id='delete_from_target',
            sql=f'''
            DELETE FROM {pg_target_schema}.{pg_target_table}
            WHERE 
                date IN (
                    SELECT 
                        date 
                    FROM 
                        {pg_tmp_schema}.{pg_tmp_table}
                    WHERE
                        kpi_id = {index_kpi}
                    )
            AND kpi_id = {index_kpi}
            ''',
            postgres_conn_id=pg_connect
        )

        insert_from_tmp_to_target = PostgresOperator(
            task_id='insert_from_tmp_to_target',
            sql=f'''
            INSERT INTO {pg_target_schema}.{pg_target_table}("date", value, kpi_id)
            SELECT 
                "date", 
                value, 
                {index_kpi} AS kpi_id 
            FROM 
                {pg_tmp_schema}.{pg_tmp_table}
            ''',
            postgres_conn_id=pg_connect
        )

        drop_tmp_after = PostgresOperator(
            task_id='drop_tmp_after',
            sql=f'''DROP TABLE IF EXISTS {pg_tmp_schema}.{pg_tmp_table}''',
            postgres_conn_id=pg_connect
        )

        end = EmptyOperator(
            task_id='end',
        )

        start >> sensors_task >> drop_tmp_before >> create_tmp >> delete_from_target >>\
        insert_from_tmp_to_target >> drop_tmp_after >> end

    return dag


# build a dag from dag config
def generator_of_morning_kpi_dag_to_gp() -> None:
    """
    Функция получает список config из БД и генерирует DAG's на основании функции `create_kpi_of_dag`.

    Итерируется по config и каждый раз выполняет функцию `create_kpi_of_dag`, которая возвращает DAG.

    @return: None
    """
    pg_hook = PostgresHook(postgres_conn_id='test_db')

    df = pg_hook.get_pandas_df(  # noqa: PD901
        '''
        SELECT 
            kpi_id,
            dag_id,
            "owner",
            sql_query,
            start_date,
            pg_environment,
            airflow_environment,
            short_description_md,
            long_description_md,
            cron,
            sensors,
            tags
        FROM 
            dim_kpi_dag_gen_config
        WHERE 
            is_actual IS TRUE 
        ORDER BY 
            id;
        '''
    )

    for i in range(len(df)):
        create_kpi_of_dag(
            owner=df.iloc[i].owner,
            dag_id=df.iloc[i].dag_id,
            pg_target_schema='public',
            pg_target_table='fct_dm_kpi',
            index_kpi=df.iloc[i].kpi_id,
            pg_environment=df.iloc[i].pg_environment,
            airflow_environment=df.iloc[i].airflow_environment,
            long_description=df.iloc[i].long_description_md,
            short_description=df.iloc[i].short_description_md,
            start_date=df.iloc[i].start_date,
            cron_expression=df.iloc[i].cron,
            tags=df.iloc[i].tags,
            sensors=df.iloc[i].sensors,
            sql_query=df.iloc[i].sql_query,
        )


generator_of_morning_kpi_dag_to_gp()

После добавления данного DAG мы сразу же видим результат – в нашем веб-интерфейсе появился DAG с ранее созданной метрикой:

Давайте запустим наш сгенерированный DAG и посмотрим на результат, но в начале отредактируем SQL-запрос под необходимый нам формат.

Изменение sql_query для DAG test_1
new_values = {
    'kpi_id': 1,
    'sql_query': '''
    SELECT 
        ('2023-'||((random()*11+1)::int)::varchar||'-'||((random()*27+1)::int)::varchar)::date AS date,
        (random()*100)::int AS value
    ''',
}


# Вызов функции
scd_dim_kpi(
    dict_kpi=new_values
)

Всё, DAG работает и что-то собирает. Давайте сделаем для примера ещё несколько DAG.

Я создал пять дополнительных DAG по следующему шаблону (для демонстрации):

Hidden text
# Пример использования
new_values = {
    'kpi_id': 6,
    'dag_id': 'test_6',
    'metric_name_en': 'test_6',
    'owner': 'korsak0v',
    'start_date': '2000-01-01',
    'cron': '15 14 1 * *',
    'tags': '''['dm', 'pg', 'gen_dag', 'from_pg']''',
    'sql_query': '''
    SELECT 
        ('2023-'||((random()*11+1)::int)::varchar||'-'||((random()*27+1)::int)::varchar)::date AS date,
        (random()*100)::int AS value
    ''',
}


# Вызов функции
scd_dim_kpi(
    dict_kpi=new_values
)

Всё работает корректно. В БД записи есть:

В Airflow DAG корректно отображаются и работают.

Давайте добавим ещё сенсоры для демонстрации. Я добавил новый DAG и если посмотреть на его граф, то мы можем увидеть EmptyOperatorempty_0. Сейчас это просто заглушка на случай если у нас нет сенсоров на другие DAG. При дагране он выполнится менее чем за секунду и наш пайплайн продолжит корректно работать.

Давайте добавим список сенсоров:

Добавление списка сенсоров для изменения графа DAG test_7
new_values = {
    'kpi_id': 7,
    'sensors': '''['test_1', 'test_2', 'test_3', 'test_4', 'test_5', 'test_6']''',
}


# Вызов функции
scd_dim_kpi(
    dict_kpi=new_values
)

И получим такой граф. Теперь вместо нашей заглушки мы имеем список сенсоров, которые будут проверять отработали ли какие-то DAG или нет.

Ну, и в конце давайте посмотрим, что насобирали наши сгенерированные DAG:

В дальнейшем мы сможем сделать JOIN между таблицей фактов и справочником, чтобы получить информацию по каждой из метрик.

Заключение

Я постарался описать максимально подробно свой кейс. Возможно, он является узконаправленным, но тут я рассказал об очередном генераторе DAG.

Также я не отрицаю, что это не идеально, но и идеала не существует ;)

Мог какие-то моменты упустить/упростить/не описать подробно. За всем этим можете обращаться в комментарии.

Важные моменты, на которые стоит обратить внимание: 

  1. Данная реализация не генерирует файл, а генерирует только объекты, поэтому во всех DAG будет исходный код нашего DAG, в котором происходит вся генерация. Если вам нужны физически созданные DAG, то можете прочитать мою предыдущую статью.

  2. Если некорректно передать какие-то атрибуты в функцию, то сломаются все DAG. Если к примеру вместо даты попадет какая-то строка, то произойдет исключение, функция не отработает и ни один DAG не будет создан. Но вы можете это улучшить при помощи обработки исключений и прочего.

  3. В данной реализации kpi_id нужно подставлять самостоятельно. В моем варианте не реализован автоинкремент. Но как я говорил ранее этот продукт доработан и другая команда реализовала уже это на backend.

  4. Я показал самую простую реализацию данного инструмента. Вы можете настраивать его под себя, с обработкой каждого атрибута, использовать фреймворки, использовать самописные обработчики и прочие.

  5. Можно сделать дополнительную нормализацию данных, сделать дополнительные справочники, привязать ORM-модели и реализовать сразу какой-то backend, чтобы не заполнять это руками.

  6. В демонстрационном DAG реализована идемпотентность, которая позволяет грузить метрики снапшотами за каждый день. Вы можете это регулировать также под свои нужды.

  7. Мой кейс решает определенную бизнес-задачу. Бизнес рад и я рад :)

Источник: https://habr.com/ru/articles/756978/


Интересные статьи

Интересные статьи

Камиль Камалетдинов, младший эксперт по реагированию на инциденты Angara Security, подготовил обзор полезных утилит для triage. В материале практическая польза и небольшой опрос для вас в самом конце....
Эта статья из серии постов на тему демистификации ИИ, в которых предпринимаются пытытки устранить двусмысленность жаргона и разоблачить мифы связанные с технологиями ИИ. ChatGPT и другие больши...
Команда VK Cloud перевела конспект доклада с конференции Data+AI Summit 2022. В своём выступлении Чжоу Цзян и Ааруна Годти из Apple описывают, как построили централизованный кластер Apache Spark н...
Конференция Heisenbug («по тестированию, но не только для тестировщиков») близится: программа уже готова, и пришло время поделиться с Хабром всеми подробностями. Во-первых, напоминаем формат. Кон...
AI на минималках 2: Генератор стихов на Prolog На картинке — четверостишье, сгенерированное моей программой. Оказывается "стихи" писать легко, нужно только знать несколько н...