Пишем первый ML-пайплайн на Airflow: подробный туториал

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Ждешь, когда обновятся данные, чтобы запустить переобучение модели
Ждешь, когда обновятся данные, чтобы запустить переобучение модели

В этом туториале мы пошагово разберем, как создать с нуля и запустить локально свой первый пайплайн на Airflow.

Данный пайплайн специально адаптирован под задачи машинного обучения. В этом примере мы будем загружать новости из открытого источника и использовать NLP-модель для их классификации (zero-shot classification).

План:

  1. Примеры применения Airflow в проектах с машинным обучением.

  2. Знакомство с Airflow: основные понятия и инструменты.

  3. Написание тасок для загрузки данных и получения предсказания модели.

  4. Запуск Airflow локально через Docker Compose.

  5. Знакомство с веб-интерфейсом Airflow.

Код доступен на GitHub.

Как Airflow используется в проектах с машинным обучением

Airflow хорошо подходит для задач, которые запускаются в указанное время или каждый заданный интервал времени.

Apache Airflow - популярный инструмент в проектах с машинным обучением. Он позволяет создавать эффективные и масштабируемые пайплайны, связанные с обработкой данных, обучением и развертыванием моделей.

Примеры задач, которые решает Airflow:

  • Регулярное переобучение моделей машинного обучения на новых данных.

  • Получение предсказаний модели в пакетном режиме. Например, раз в час или раз в день.

  • ETL пайплайны, которые загружают данные из разных источников и преобразуют их.

  • Автоматическая генерация отчетов.

Ниже приведен пример архитектуры рекомендательной системы, где каждая из 3 частей запускается на Airflow:

  1. Feature Engineering: данные от пользователей накапливаются и обрабатываются единоразово в заданное время.

  2. Training Pipeline: на основе актуальных данных модель переобучается, например, раз в день.

  3. Batch Prediction Pipeline: последняя версия модели используется, чтобы рассчитать новые рекомендации по всем пользователям и сохранить в базу.

Архитектура рекомендательной системы на Airflow
Архитектура рекомендательной системы на Airflow

Шаг 3 в этом пайплайне используется получение предсказания модели в пакетном формате. Когда это имеет смысл?

Преимущество пакетной обработки: нам не надо беспокоиться о latency модели. Работа с моделью в пакетном режиме, как правило, не требует дополнительной оптимизации и позволяет быстрее довести модель до прода.

Недостаток: предсказания имеют запаздывание. Например, рекомендации не будут учитывать real-time фидбек от пользователя.

Если для вашей задачи критична работа модели в real-time формате подойдут другие инструменты. В прошлом туториале мы разбирали, как написать ML веб-сервис на FastAPI, который работает с моделью в формате запрос-ответ.

Знакомство с Airflow: основные понятия и инструменты

Прежде чем начать практическую часть, познакомимся с основными понятиями Apache Airflow.

Airflow DAG - пример
Airflow DAG - пример
  1. DAG (Directed Acyclic Graph) - удобный способ организации и визуализации пайплайна. Это структура, где каждая задача представлена узлом, а зависимости между задачами представлены направленными стрелками, указывающими порядок выполнения задач.
    Граф является направленным, потому что задачи выполняются последовательно в определенном порядке, определяемом зависимостями.

  2. Task: "кирпичи", из которых состоит DAG. Задачи представляют собой конкретные шаги или операции, которые должны быть выполнены в вашем пайплайне.

  3. Operator: Каждая задача выполняется некоторым кодом или скриптом, который вы определяете. Операторы (Operators) представляют собой классы или функции, которые определяют, как будет выполнена каждая задача в пайплайне.
    Например, есть операторы, отвечающие за выполнение кода на питоне, bash-команд и SQL-запросов.

  4. Metadata Database: в этой базе хранятся метаданные о структуре пайплайнов, зависимостях между задачами, расписании выполнения и других параметрах.

  5. Webserver: удобный пользовательский интерфейс (UI), позволяющий запускать, мониторить и отлаживать пайплайны.
    Через веб-интерфейс можно просматривать статус выполнения задач, проверять логи, а также управлять пайплайнами, вносить изменения и контролировать их работу.

  6. Scheduler: следит за графом пайплайна, определяет, какие задачи уже могут быть выполнены на основе зависимостей и расписания, и запускает их в соответствии с этой логикой. Он также отслеживает выполнение задач и обновляет их статусы в метаданных.

0. Описание задачи и проектирование пайплайна

Главная цель этого туториала - знакомство с Airflow и самостоятельный запуск пайплайна. То есть, по итогу туториала вы научитесь локально запускать Airflow.

Но, для использования Airflow в продакшене необходимы другие инструменты, например, Kubernetes.

Apache Airflow предоставляет возможности для создания сложных пайплайнов с множеством задач и зависимостей между ними. Однако, здесь мы ограничимся простым примером графа пайплайна.

Наш пайплайн
Наш пайплайн

Пайплайн будет состоять из 3 задач:

Task 1: Подготовка данных - загрузка данных, препроцессинг и сохранение в локальную директорию.

Task 2: Предсказание модели - загрузка модели, инференс модели на сохраненных данных.

Task 3: Подготовка отчета на основе предсказаний.

Task 1 и Task 2 мы хотим запустить в отдельных Docker-контейнерах. Для этого будем использовать DockerOperator.

Зачем может понадобиться запускать таски в отдельных контейнерах?

Task 3 содержит простой код агрегации, поэтому будем использовать PythonOperator.

В данном примере пайплайна мы выбрали упрощенный подход и не использовали дополнительные инструменты, чтобы не усложнять настройку Airflow:

Таким образом, в данном примере мы использовали упрощенные варианты для целей наглядности и понимания основных концепций. В реальных проектах нужно настраивать и использовать дополнительные инструменты и хранилища данных.

С учетом вышесказанного код проекта будет выглядеть следующим образом:

Структура проекта
Структура проекта

1. Загрузка данных, используем Docker

Для получения данных о финансовых новостях в этом примере мы будем использовать фид от cnbc.com. Когда мы запрашиваем информацию, он предоставляет нам 30 самых новых новостей из мира финансов. Таким образом, запуская пайплайн раз в день, мы сможем получать каждый день свежие новости.

Простейший код для загрузки данных в формате csv будет выглядеть следующим образом:

import feedparser
import pandas as pd

NEWS_FEED_URL = "https://www.cnbc.com/id/19746125/device/rss/rss.xml"


def data_load(data_path: str) -> None:
    news_feed = feedparser.parse(NEWS_FEED_URL)
    df = pd.DataFrame(news_feed.entries)
    df.to_csv(data_path, sep="\t", index=False)

Помимо этого в финальную версию data_load.py добавим:

Финальная версия data_load.py выглядит так:

import html
import logging

import click
import feedparser
import pandas as pd

NEWS_FEED_URL = "https://www.cnbc.com/id/19746125/device/rss/rss.xml"
COLUMNS_TO_SAVE = ["id", "published", "title", "summary"]

logging.basicConfig(level=logging.INFO)


@click.command()
@click.option("--data_path", help="Path to the input data CSV file")
def data_load(data_path: str) -> None:
    logging.info("Fetching financial news from the RSS feed...")
    news_feed = feedparser.parse(NEWS_FEED_URL)
    logging.info("News fetched successfully.")

    df = pd.DataFrame(news_feed.entries)[COLUMNS_TO_SAVE]
    df["published"] = pd.to_datetime(df["published"])
    df["title"] = df["title"].map(html.unescape)
    df["summary"] = df["summary"].map(html.unescape)

    logging.info(f"Saving the processed data to '{data_path}'...")
    df.to_csv(data_path, sep="\t", index=False)
    logging.info("Data saved successfully.")


if __name__ == "__main__":
    data_load()

Для подготовки Docker-образа осталось создать:

feedparser==6.0.10
click==8.1.3
pandas==2.0.1
FROM python:3.11

COPY requirements.txt data_load.py /workdir/
WORKDIR /workdir

RUN pip install -r requirements.txt

Таким образом, у нас готовы все файлы для создания Docker-образа. Мы соберем образ и создадим контейнер позже - в разделе про Docker Compose.

2. Предсказание модели, zero-shot classification

В этом примере будем использовать NLP-модель для задачи zero-shot classification.

Что такое zero-shot classification?

Для начала определим список классов - тем, на которые мы будем разделять финансовые новости:

LABELS = [
    "Crypto",
    "SEC",
    "Dividend",
    "Economics",
    "Oil or Gas",
    "IPO",
    "Politics",
    "Buffet",
    "Stock",
    "Other",
]

Для получения предсказаний загрузим модель valhalla/distilbart-mnli-12-1 из Hugging Face Hub. device=-1 означает, что модель запускается на CPU.

from transformers import pipeline

model_hf = pipeline(model="valhalla/distilbart-mnli-12-1", device=-1)

Далее загрузим csv файл с новостями, который мы подготовили в предыдущем пункте. Для предсказания будем использовать объединение текста из заголовка новости (title) и ее краткого описания (summary).

import pandas as pd

df = pd.read_csv(data_path, sep="\t")
texts_for_pred = (df.title + ". " + df.summary).tolist()

Для получения предсказаний передадим модели список текстов texts_for_pred и классы LABELS:

pred = model_hf(texts_for_pred, LABELS, multi_label=False)

Флаг multi_label определяет, может ли объект быть отнесен к одному или более классам. Когда multi_label=True, модель может присваивать объектам несколько классов одновременно.

Таким образом, мы сами придумали название классов на свое усмотрение. Модель делает предсказание по нашим классам без необходимости предварительного обучения. В этом преимущество zero-shot моделей.

Благодаря библиотеки transformers код занял всего несколько строк.

Выберем предсказание лучшего класса и сохраним результат в json-файл:

df["label"] = [x["labels"][0] for x in pred]
df.T.to_json(pred_path)

На этом код предсказания готов.

Добавим логирование и использование Click по аналогии с кодом загрузки данных. Тогда финальная версия model_predict.py будет выглядеть следующим образом:

import logging

import click
import pandas as pd
from transformers import pipeline

LABELS = [
    "Crypto",
    "SEC",
    "Dividend",
    "Economics",
    "Oil or Gas",
    "IPO",
    "Politics",
    "Buffet",
    "Stock",
    "Other",
]

logging.basicConfig(level=logging.INFO)


@click.command()
@click.option("--data_path", help="Path to the input data CSV file")
@click.option("--pred_path", help="Path to save the output JSON file")
def model_predict(data_path: str, pred_path: str) -> None:
    logging.info("Loading the model...")
    model_hf = pipeline(model="valhalla/distilbart-mnli-12-1", device=-1)
    logging.info("Model loaded successfully.")

    logging.info(f"Reading data from '{data_path}'...")
    df = pd.read_csv(data_path, sep="\t")
    logging.info("Data read successfully.")
    
    texts_for_pred = (df.title + ". " + df.summary).tolist()

    logging.info("Performing model prediction...")
    pred = model_hf(texts_for_pred, LABELS, multi_label=False)
    logging.info("Prediction completed successfully.")

    df["label"] = [x["labels"][0] for x in pred]

    logging.info(f"Saving the predictions to '{pred_path}'...")
    df.T.to_json(pred_path)
    logging.info("Predictions saved successfully.")


if __name__ == "__main__":
    model_predict()

Код предсказания модели будет также запускаться в отдельном Docker-контейнере. Поэтому аналогично предыдущему пункту мы добавили свои requirements.txt и Dockerfile.

Итак, мы подготовили код для 2 компонент нашего пайплайна: загрузки данных и получения предсказания модели. Каждый будет выполняться в отдельном Docker-контейнере. Теперь все готово, чтобы мы перешли к написанию DAG на Airflow.

3. Пишем DAG

Вспомним основные компоненты нашего пайплайна:

  1. 3 последовательные таски, первые 2 из которых должны запускаться в отдельных Docker-контейнерах.

  2. Локальная директория data, которую мы будем использовать для сохранения результатов и обмена данными между тасками.

Разберем основные шаги при написании нашего DAG:

from docker.types import Mount

dockerops_kwargs = {
    "mount_tmp_dir": False,
    "mounts": [
        Mount(
            source="<path_to_your_airflow-ml_repo>/data",
            target="/opt/airflow/data/",
            type="bind",
        )
    ],
    ...
}
raw_data_path = "/opt/airflow/data/raw/data__{{ ds }}.csv"
pred_data_path = "/opt/airflow/data/predict/labels__{{ ds }}.json"
result_data_path = "/opt/airflow/data/predict/result__{{ ds }}.json"
from airflow.decorators import dag
from airflow.utils.dates import days_ago

# Create DAG
@dag("financial_news", start_date=days_ago(0), schedule="@daily", catchup=False)
def taskflow():
  ...
    # Task 1
    news_load = DockerOperator(
        task_id="news_load",
        container_name="task__news_load",
        image="data-loader:latest",
        command=f"python data_load.py --data_path {raw_data_path}",
        **dockerops_kwargs,
    )

    # Task 2
    news_label = DockerOperator(
        task_id="news_label",
        container_name="task__news_label",
        image="model-prediction:latest",
        command=f"python model_predict.py --data_path {raw_data_path} --pred_path {pred_data_path}",
        **dockerops_kwargs,
    )
    # Task 3
    news_by_topic = PythonOperator(
        task_id="news_by_topic",
        python_callable=aggregate_predictions,
        op_kwargs={
            "pred_data_path": pred_data_path,
            "result_data_path": result_data_path,
        },
    )
news_load >> news_label >> news_by_topic
taskflow()

Файл с описанием DAG имеет расширение .py и лежит в директории dags. При запуске Airflow, он сканирует эту директорию (или другую настроенную директорию) в поисках файлов с определением DAG. Когда Airflow обнаруживает файл с определением DAG, он регистрирует его и делает доступным для выполнения по расписанию.

Мы закончили писать наш пайплайн, теперь перейдем к настройке и запуску Airflow.

4. Запуск Airflow с помощью Docker Compose

Для локального запуска Airflow мы будем использовать Docker Compose. Он помогает запустить Apache Airflow с минимальными усилиями, предоставляя унифицированный и изолированный способ запуска всех компонентов Airflow.

У Airflow есть отличная инструкция по запуску с помощью Docker Compose. Там же есть загрузка готового файла docker-compose.yml. Инструкция позволяет запустить Airflow в пару строк.

Файл docker-compose.yml имеет раздел services, где определены различные сервисы, которые являются частями кластера Airflow. Каждый сервис имеет свою секцию с настройками, где указывается образ Docker, команда для запуска сервиса, порты, зависимости от других сервисов и другие параметры. В частности здесь описаны сервисы для Metadata Database, Webserver и Scheduler, которые мы упомянали в разделе Знакомство с Airflow.

Модификация docker-compose.yml для запуска тасок в отдельных Docker-контейнерах

Поскольку мы усложнили настройку Airflow, когда решили запускать таски в отдельных контейнерах, будем использовать свой модифицированный docker-compose.yml. Его можно посмотреть тут.

Основные моменты, которые мы изменили для поддержки запуска тасок в Docker-контейнерах:

_PIP_ADDITIONAL_REQUIREMENTS: apache-airflow-providers-docker==3.6.0
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/data/:/opt/airflow/data/
    - /var/run/docker.sock:/var/run/docker.sock
  data-loader:
    build:
      context: ml_pipeline/data_loader
    image: data-loader
    restart: "no"

  model-prediction:
    build:
      context: ml_pipeline/model_prediction
    image: model-prediction
    restart: "no"
  # Required because of DockerOperator. For secure access and handling permissions.
  docker-socket-proxy:
    image: tecnativa/docker-socket-proxy:0.1.1
    environment:
      CONTAINERS: 1
      IMAGES: 1
      AUTH: 1
      POST: 1
    privileged: true
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock:ro
    restart: always

Запуск Airflow

Возможно, в вашем случае запуск потребует выделения больше памяти для Docker Engine.

Перед первым запуском Airflow нужно подготовить окружение.

Если вы работаете на Linux перед запуском нужно указать AIRFLOW_UID:

echo -e "AIRFLOW_UID=$(id -u)" > .env

Далее независимо от вашей ОС необходимо выполнить миграцию базы данных и создать первую учетную запись пользователя. Для этого выполните команду:

docker compose up airflow-init

Созданная учетная запись имеет логин airflow и пароль airflow.

Запуск и остановка Airflow

Для создания и запуска всех необходимых контейнеров, определенных в файле docker-compose.yml, используется команда:

docker compose up

В нашем случае также необходимо предварительно собрать образы data-loader и model-prediction, которые также указаны в файле docker-compose.yml. Поэтому модифицируем команду:

docker compose up --build

Когда вы закончите работу и захотите очистить свое окружение, выполните:

docker compose down --volumes --rmi all

После запуска Airflow продолжим работу в веб-интерфейсе.

5. Веб-интерфейс Airflow, запуск пайплайна

Пользовательский интерфейс Airflow упрощает мониторинг и запуск пайплайнов. Он доступен по адресу http://localhost:8080. На странице входа нужно ввести логин и пароль от учетной записи, в нашем случае airflow и airflow.

Страница авторизации
Страница авторизации

Запуск и мониторинг пайплайна

На домашней странице вы увидете список всех дагов, включая дефолтные от Airflow. Здесь можно найти и выбрать созданный нами DAG financial_news.

На странице нашего DAG доступна разная информация о пайплайне: графическое представление, время ближайшего запуска, логи запуска, код и многое другое.

Графическое представление нашего пайплайна
Графическое представление нашего пайплайна

Не дожидаясь планового запуска пайплайна, запустим DAG, нажав на кнопку старта. Для отслеживания выполнения отдельных тасок, можно нажать на нужную таску и посмотреть ее логи:

Логи таски 1
Логи таски 1

Здесь мы можем видеть логи, которые добавили специально для отслеживания выполнения отдельных шагов.

После того, как выполнение пайплайна успешно завершилось, посмотрим на результаты.

Смотрим результат пайплайна. Как классифицировались новости?

Результаты выполнения пайплайна сохранились в data/predict/result__<date>.json. Изначально мы ставили задачу написать пайплайн, которые будет автоматически загружать актуальные новости из финансового мира и группировать их по заданным нами темам. Посмотрим, что у нас получилось.

Результат работы пайплайна
Результат работы пайплайна

Таким образом, мы справились с поставленной задачей. Новости успешно загружаются и классифицируются. Напомним, что темы заданы нами произвольно, без предварительного обучения модели.

Заключение


Если формат туториалов по инструментам MLOps окажется полезным, буду планировать темы для следующих статей. А пока подписывайтесь на мой телеграм-канал. Там будут анонсы новых статей, а также советы для работы и более короткие мысли по DS/ML/AI.

Источник: https://habr.com/ru/articles/737046/


Интересные статьи

Интересные статьи

Одна из довольно сильных сторон любого программного обеспечени — это возможность единожды написанной программы быть использованной многократно как в виде отдельных частей, так и целиком, что и при...
Всем привет! Я Ruby on Rails Developer и еще совсем недавно я начинал свой путь в этой области. Я уже прошел первые шаги (о них я писал в данной статье), как выбор языка, изучение его основ, знакомств...
Аннотация JUnit 5 @BeforeAll обозначает метод, который является методом жизненного цикла. @BeforeAllэто замена @BeforeClassаннотации в JUnit 4.
Изобретаем велосипед или пишем персептрон на C++. Часть 1 Напишем простую библиотеку для реализации персептрона на C++ Читать дальше →
Балансировщики нагрузки играют в веб-архитектуре ключевую роль. Они позволяют распределять нагрузку по нескольким бэкендам, тем самым улучшая масштабируемость. А поскольку у нас сконфигурирован...