Предыдущие: ч.1 Основы и расписания, ч.2 Операторы и датчики
3. Проектирование DAG
Поскольку Airflow — это на 100% код, знание основ Python - это все, что нужно, чтобы начать писать DAG. Однако написание эффективных, безопасных и масштабируемых DAG требует учета некоторых моментов, специфичных для Airflow. В этом разделе мы рассмотрим некоторые передовые методы разработки DAG, которые максимально используют возможности Airflow.
В целом, большинство лучших практик, которые мы здесь рассматриваем, относятся к одной из двух категорий:
Проектирование DAG
Использование Airflow в качестве оркестратора
Проверка идемпотентности
Прежде чем мы перейдем к рекомендациям, специфичным для Airflow, нам нужно рассмотреть одну концепцию, которая применима ко всем конвейерам данных.
Идемпотентность является основой для многих методов вычислений, включая рекомендации Airflow, приведенные в этом разделе. Это особое качество: вычислительная операция считается идемпотентной, если она всегда выдает один и тот же результат.
В контексте Airflow DAG считается идемпотентным, если каждый запуск DAG генерирует одни и те же результаты, даже при многократном запуске. Проектирование идемпотентных DAG сокращает время восстановления после сбоев и предотвращает потерю данных.
Проектирование DAG
Следующие принципы проектирования DAG помогут сделать ваши DAG удобными, эффективными и удобочитаемыми.
Придерживайтесь правила атомарности каждой задачи
При разбивке вашего конвейера на отдельные задачи в идеале каждая задача должна быть атомарной. Это означает, что каждая задача должна отвечать за одну операцию, которая может быть повторно запущена независимо от других. Другими словами, в автоматизированной задаче успех в части задачи означает успех всей задачи.
Например, в конвейере ETL вы в идеале хотели бы, чтобы ваши Extract, Transform, и Load - операции покрывались бы тремя отдельными задачами. Атомизация, разделение этих задач позволяет вам повторно запускать каждую операцию в конвейере независимо, что поддерживает идемпотентность.
Используйте поля шаблона, переменные и макросы
С помощью полей шаблона в Airflow вы можете извлекать значения в DAG с помощью переменных среды и шаблонов jinja. По сравнению с использованием функций Python, использование полей шаблона помогает сохранить ваши DAG идемпотентными и гарантирует, что вы не выполняете функции при каждом событии планировщика (подробнее об оптимизации планировщика см. “Избегайте кода верхнего уровня в вашем DAG” ниже).
Вопреки нашим рекомендациям, в следующем примере переменные определяются на основе datetime
-функций Python :
# Variables used by tasks
# Bad example - Define today’s and yesterday’s date using date-time module
today = datetime.today()
yesterday = datetime.today() - timedelta(1)
Если этот код находится в файле DAG, эти функции будут выполняться при каждом событии планировщика, что может ухудшить производительность. Более того, это не приводит к идемпотентносьи DAG: если бы вам нужно было повторно запустить ранее неудачный запуск DAG для прошлой даты, вы бы не смогли этого сделать, потому что date-time.today()
относится к текущей дате, а не к дате выполнения DAG.
Лучшим способом реализации этого является использование переменной Airflow:
# Variables used by tasks
# Good example - Define yesterday’s date with an Airflow variable
yesterday = {{ yesterday_ds_nodash }}
Вы можете использовать одну из множества встроенных переменных и макросов Airflow или создать собственное поле шаблона для передачи информации во время выполнения. Для получения дополнительной информации по этой теме ознакомьтесь с нашим руководством по созданию руководством по созданию шаблонов и макросам в Airflow.
Инкрементальная фильтрация записей
Идеально разбивать ваши конвейеры на инкрементные извлечения и загрузки везде, где это возможно. Например, если у вас есть DAG, который выполняется ежечасно, каждый запуск DAG должен обрабатывать только записи за этот час, а не весь набор данных. Когда результаты каждого запуска DAG представляют собой лишь небольшое подмножество вашего общего набора данных, сбой в одном подмножестве данных не помешает остальной части вашего DAG запуститься и успешно завершиться. И если ваши DAG являются идемпотентными, вы можете повторно запустить DAG только для данных, которые завершились с ошибкой, вместо повторной обработки всего набора данных.
Существует несколько способов создания инкрементных конвейеров. Два лучших и наиболее распространенных метода описаны ниже.
Дата последнего изменения
Использование даты “последнего изменения” является золотым стандартом для инкрементных загрузок. В идеале каждая запись в вашей исходной системе должна иметь столбец, содержащий время последнего изменения записи. При таком дизайне прогон DAG ищет записи, которые были обновлены в течение определенных дат из этого столбца.
Например, при ежечасном запуске DAG каждый запуск DAG будет отвечать за загрузку любых записей, которые попадают между началом и концом его часа. Если какой-либо из этих запусков завершится неудачей, это не повлияет на другие запуски.
Идентификаторы (ID) последовательностей
Если дата последнего изменения недоступна, ID последовательностей или приращения может использоваться для инкрементных нагрузок. Эта логика лучше всего работает, когда исходные записи только добавляются и никогда не обновляются. Хотя мы рекомендуем по возможности использовать в ваших записях систему дат “последнего изменения”, основывать вашу инкрементную логику на идентификаторе последовательности может быть разумным способом фильтрации записей конвейера без даты последнего изменения.
Избегайте использования кода верхнего уровня в Вашем файле DAG
В контексте Airflow мы используем “код верхнего уровня” для обозначения любого кода, который не является частью ваших DAG или экземпляров операторов.
Airflow выполняет весь код в папке dags_folder
при каждом min_file_process_interval
, который по умолчанию равен 30 секундам (подробнее об этом параметре можно прочитать в документации Airflow). Из-за этого код верхнего уровня, который выполняет запросы к внешним системам, таким как API или база данных, или вызовы функций вне ваших задач, могут вызвать проблемы с производительностью. Кроме того, включение в файл DAG кода, который не является частью вашего DAG или экземпляров операторов, затрудняет чтение, обслуживание и обновление DAG.
Относитесь к своему DAG-файлу как к конфигурационному файлу и оставьте всю тяжелую работу хукам и операторам, которые вы создаете в файле. Если вашим DAG требуется доступ к дополнительному коду, такому как SQL-скрипт или функция Python, сохраните этот код в отдельном файле, который можно прочитать при запуске DAG.
Как пример того, чего не следует делать, в приведенном ниже DAG PostgresOperator
выполняет SQL-запрос, который был сброшен непосредственно в файл DAG:
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
#Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
#Instantiate DAG
with DAG('bad_practices_dag_1',
start_date=datetime(2021, 1, 1),
max_active_runs=3,
schedule_interval='@daily',
default_args=default_args,
catchup=False
) as dag:
t0 = DummyOperator(task_id='start')
#Bad example with top level SQL code in the DAG file
query_1 = PostgresOperator(
task_id='covid_query_wa',
postgres_conn_id='postgres_default',
sql='''with yesterday_covid_data as (
SELECT * FROM covid_state_data
WHERE date = {{ params.today }}
AND state = ‘WA’
),
two_day_rolling_avg as (
SELECT AVG(a.state, b.state) as two_day_avg
FROM yesterday_covid_data a
JOIN yesterday_covid_data b
ON a.state = b.state
)
SELECT a.state, b.state, c.two_day_avg
FROM yesterday_covid_data a
JOIN today_covid_data b
ON a.state=b.state
JOIN two_day_rolling_avg c
ON a.state=b.two_day_avg;''',
params={‘today’: today, ‘yesterday’:yesterday}
)
Сохранение запроса в файле DAG затрудняет чтение и обслуживание DAG. Вместо этого в приведенном ниже DAG мы вызываем файл с именем covid_state_query.sql в нашем экземпляре оператора Postgres, - и это является наилучшим способом:
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
#Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
#Instantiate DAG
with DAG('good_practices_dag_1',
start_date=datetime(2021, 1, 1),
max_active_runs=3,
schedule_interval='@daily',
default_args=default_args,
catchup=False,
template_searchpath='/usr/local/airflow/include' #include path to look for external files
) as dag:
query = PostgresOperator(
task_id='covid_query_{0}'.format(state),
postgres_conn_id='postgres_default',
sql='covid_state_query.sql', #reference query kept in separate file
params={'state': "'" + state + "'"}
)
Используйте определенный метод для зависимостей задач
В Airflow зависимости задач могут быть установлены несколькими способами. Вы можете использовать функцииset_upstream()
и set_downstream()
, либо вы можете использовать операторы << и >>. Какой метод вы используете, зависит от личных предпочтений, но для удобства чтения лучше всего выбрать один метод и придерживаться его.
Например, вместо смешивания методов, подобных этому:
task_1.set_downstream(task_2)
task_3.set_upstream(task_2)
task_3 >> task_4
Постарайтесь быть последовательным - например, так:
task_1 >> task_2 >> [task_3, task_4]
Используйте возможности Airflow
Следующая категория рекомендаций относится к использованию Airflow для чего он был изначально разработан: оркестратор. Использование Airflow в качестве оркестратора упрощает масштабирование и использование нужных инструментов в зависимости от ваших потребностей.
Используйте пакеты поставщиков
Один из основных факторов в пользу использования Airflow - это его надежное и активное сообщество, которое помогло интегрировать Airflow с другими инструментами, известными как пакеты поставщиков.
Пакеты поставщиков позволяют вам организовывать задания по обработке данных сторонних производителей непосредственно из Airflow. Везде, где это возможно, лучше всего использовать эти интеграции, а не писать функции Python самостоятельно (не нужно изобретать велосипед). Это упрощает внедрение Airflow для команд, использующих существующие инструменты, - вам придется писать меньше кода.
Чтобы найти все доступные пакеты провайдеров, загляните в Реестр.
Определите, где выполнять Задания по обработке данных
Поскольку DAG написаны на Python, у вас есть много вариантов реализации обработки данных. Для рабочих нагрузок малого и среднего масштаба обычно безопасно выполнять обработку данных в рамках Airflow, если вы выделяете достаточно ресурсов для своей инфраструктуры Airflow. Большие задания по обработке данных, как правило, лучше всего выгружать в среду, специально оптимизированную для этих случаев использования, такую как Apache Spark. Затем вы можете использовать Airflow для организации этих заданий.
Мы рекомендуем вам учитывать размер ваших данных сейчас и в будущем при принятии решения о том, следует ли обрабатывать данные в Airflow или загружать их во внешний инструмент. Если ваш вариант использования хорошо подходит для обработки данных в рамках Airflow, то мы бы рекомендовали следующее:
Убедитесь, что ваша инфраструктура воздушного потока располагает необходимыми ресурсами.
Используйте Kubernetes Executor для изоляции обработки задач и большего контроля над ресурсами на уровне задач.
Используйте пользовательский серверный интерфейс XCom, если вам нужно передавать какие-либо данные между задачами, чтобы не перегружать базу данных метаданных.
Используйте Промежуточное Хранилище Данных
Поскольку для этого требуется меньше кода и меньше фрагментов, может возникнуть соблазн написать свои DAG для перемещения данных непосредственно из источника в пункт назначения. Однако это означает, что вы не можете по отдельности повторно запускать части извлечения или загрузки конвейера. Разместив промежуточный уровень хранения, такой как промежуточные таблицы S3 или SQL, между источником и местом назначения, вы можете разделить тестирование и повторный запуск извлечения и загрузки.
В зависимости от вашей политики хранения данных вы можете изменить логику загрузки и повторно запустить весь конвейер без необходимости повторного запуска извлечений. Это также полезно в ситуациях, когда у вас больше нет доступа к исходной системе (например, вы достигли лимита обращений к API).
Используйте фреймворк ELT
Всякий раз, когда это возможно, старайтесь реализовать шаблон конвейера данных ELT (извлечение, загрузка, преобразование) с вашими DAG. Это означает, что вы должны стремиться загрузить как можно больше логики преобразования в исходные системы или системы назначения, что позволяет использовать сильные стороны всех инструментов в вашей экосистеме данных. Многие современные инструменты хранилища данных, такие как Snowflake, обеспечивают легкий доступ к вычислениям для поддержки платформы ELT и легко используются в сочетании с Airflow.
Еще лучшие практики
Наконец, вот несколько других заслуживающих внимания лучших практик, которые не подпадают под две вышеперечисленные категории.
Используйте согласованную файловую структуру
Наличие согласованной файловой структуры для проектов Airflow позволяет организовать работу и упростить ее внедрение. В Astronomer мы используем:
├── dags/ # Where your DAGs go
│ ├── example-dag.py # An example dag that comes with the initialized project
├── Dockerfile # For Astronomer’s Docker image and runtime overrides
├── include/ # For any other files you’d like to include
├── plugins/ # For any custom or community Airflow plugins
├── packages.txt # For OS-level packages
└── requirements.txt # For any Python packages
Правильно используйте Имя DAG и Начальную дату
Вы всегда должны использовать статический start_date
с вашими DAG. Динамическое значение start_date вводит в заблуждение и может привести к сбоям при очистке неудачных экземпляров задач и пропущенных повторных запусков.
Кроме того, если вы измените start_date
своего DAG, вам также следует изменить имя DAG. Изменение start_date
DAG создает новую запись в базе данных Airflow, что может сбить с толку планировщика, поскольку будут два DAG с одинаковым именем, но разными расписаниями.
Изменение имени DAG также создает новую запись в базе данных, которая приводит в действие панель мониторинга, поэтому следуйте согласованному соглашению об именовании, поскольку изменение имени DAG не приводит к удалению записи в базе данных со старым именем.
Установите повторные попытки на уровне DAG
Даже если ваш код идеален, случаются сбои. В распределенной среде, где контейнеры задач выполняются на общих хостах, возможно неожиданное завершение выполнения задач. Когда это произойдет, вы можете увидеть, что в журналах Airflow упоминается процесс-зомби.
Подобные проблемы можно решить с помощью установки повторных попыток выполнения задачи. Наилучшей практикой является установка повторных попыток в качестве default_arg
, чтобы они применялись на уровне DAG и были более детализированы для конкретных задач только там, где это необходимо. Рекомендуется установить 2-4 повторных попытки.