Всем привет! Основным инструментом оркестрации задач для обработки данных в Леруа Мерлен является Apache Airflow, подробнее о нашем опыте работы с ним можно прочитать тут. А также мы находимся в постоянном поиске инструментов и фреймворков для упрощения работы наших дата сайентистов и дата инженеров. Один из таких инструментов – фреймворк Ray, который позволяет создавать ML пайплайны из DAGов Airflow. В статье от Astronomer подробно рассматривается, как начать его использовать и с его помощью быстро развернуть и обучить модель.
Airflow и Ray
Мы рады анонсировать сервис Ray для Apache Airflow, который позволит перенести DAG’и в масштабируемые ML-пайплайны.
Потребность в комбинации Airflow + ML
Машинное обучение стало неотъемлемой частью дата-экосистемы во всем мире в разных индустриях и компаниях. А так как коммьюнити AirFlow постоянно растет и пополняется, мы считаем необходимым наделить команды дата-сайентистов и инженеров возможностью развивать их пайплайны и достигать еще более высоких результатов. Именно поэтому мы бросили свои силы на создание оптимальной комбинации Airflow + ML.
Один из основных показателей качества современных ML-моделей это гибкость и оперативность, которыми они наделяют дата-сайентистов и инженеров. Грамотно смоделированные фреймворки и инструменты экономят время специалиста. Чтобы подготовить обучающий датасет и сделать из него работающую модель у него уйдет несколько часов вместо нескольких дней, а внести правки или дополнения не составит большого труда.
Сам по себе Airflow это ценный инструмент для запуска качественных ML-моделей. Построение моделей с помощью DAG’ов дает свои бенефиты:
легкая параметризация;
SLA и оповещения;
data watermarking;
возможности анализа происхождения данных;
эффективная и надежная масштабируемость.
Тем не менее, это не единственные преимущества. Есть и другие, которые также важны при работе:
минимальные изменения. Дата-сайентисты должны иметь возможность переносить код из Jupyter и т.д. с минимальными изменениями;
широкие возможности обработки датасетов. ML-модели состоят из больших датасетов, перемещение которых между разными тасками должны быть удобными и простыми;
масштабируемость каждой задачи. Дата-сайентист должен иметь возможность быстро и качественно аллоцировать свои ресурсы: GPUs для PyTorch, RAM для датафреймов, или ядра CPU для XGBoost.
интеграция ML-экосистем. Дата сайентист должен иметь возможность легко интегрировать инструменты для регистрации, отслеживание и воспроизведения экспериментов, а также для регистрации и деплоя полученных моделей.
К счастью существует несколько open-source фреймворков, которые можно использовать в Airflow при разработке ML-моделей. Одна из них – Ray.
Ray
Ray это фреймворк кластерных вычислений с отличной поддержкой Python, который позволяет передавать и запускать код (даже со сложными зависимостями пакетов и библиотек) на кластерах разного размера. В отличие от систем по типу Spark, которые для продакшена требуют сложную установку кластера и поддержку Java, Ray может работать с минимальным количеством настроек с любым кодом на любой машине, будь то обычный ноутбук или высокомощная виртуальная машина.
При запуске на Ray пользователь может размещать ресурсы для каждой функции (например, можно выделить 2 CPU и 1 GPU для одной функции). Если все сделано правильно, распределенные вычисления на Ray вместе с мощными оркестрацией и шедулингом в Airflow дают идеальную основу для быстрого создания и внедрения ML-модели.
Ray показывает высокую производительность, под капотом у него C++ для быстрого автоматического перемещения Python-объектов по кластеру с использованием новых функций gRPC, в тот момент, когда требуются данные предыдущих вызовов. Но все это не видно пользователю – все, что от него требуется, это писать код.
Airflow + Ray: пара, созданная в раю дата сайенс
Airflow и Ray не требуют специальных пакетов/библиотек или настроек, чтобы работать вместе. Но мы увидели недостатки в том, как большинство пользователей интегрировали эти два инструмента. Поэтому мы захотели разработать наилучший способ, который бы помог стандартизировать использование пары Airflow + Ray в части проведения тестирований и масштабирования.
Весь код инициализации/установки фреймворка мы упаковали в декораторы вместо Оператора Airflow, расширив TaskFlow API. Эти декораторы позволяют пользователям Airflow держать код на Ray в функциях Python, а также определять зависимости тасков вынося данные в функции Python.
В итоге пользователи сервиса Airflow + Ray могут видеть свой код, одновременно имея полную свободу изменять и создавать шаблоны по своим DAG’ам, при этом используя все преимущества распределенных вычислений Ray.
Чтобы показать важность такой интеграции, давайте напишем наш DS-пайплайн там, где все пайплайны берут свое начало – Jupyter notebook.
Обучение XGBoost модели с помощью Ray в Jupyter Notebook
В этом примере мы создали простую модель на Jupyter notebook, которая выводит датасет HIGGS, разделяет обучающие и тестовые данные, а также создает / валидирует модель, используя XGBoost на Ray, который распределяет вычисления по обучению модели с помощью Ray на кластер. Данный код можно запустить локально на ноутбуке или с минимальными усилиями на удаленном кластере Ray, что отлично подходит для проведения экспериментов при создании модели. Теперь мы можем погрузиться в то, как перейти от экспериментов к продакшен решению на DAG Airflow.
Запуск Ray + Airflow
Загляните в этот репозиторий. Следуйте инструкциям и у вас все получится. Нашли баг или проблему – напишите нам.
Обратите внимание, что пример в данном репозитории находится в альфа версии, он будет улучшаться по мере перехода в бету.
От Jupyter Notebook к DAG’ам Airflow – 5 шагов
Давайте поговорим о том, как мы адаптировали наш код в Jupyter notebook в первой секции к примеру в репозитории. Чтобы конвертировать наш ML notebook в Airflow DAG с помощью декораторов Ray, мы выполняем 5 шагов:
Добавить коннектор Ray к кластеру, чтобы управлять параметрами соединения с кластером Ray.
Преобразовать каждый логический блок в отдельную функцию на Python:
@ray.remote def train_model( data ): train_df, validation_df = data evallist = [(validation_df, 'eval')] evals_result = {} config = { "tree_method": "hist", "eval_metric": ["logloss", "error"], } write("Start training") bst = xgboost_ray.train( params=config, dtrain=train_df, evals_result=evals_result, ray_params=xgb.RayParams(max_actor_restarts=1, num_actors=8, cpus_per_actor=2), num_boost_round=100, evals=evallist) return bst
Добавить
@ray_task
декоратор к каждой из этих функций:@ray_task(**task_args) def train_model( data ): train_df, validation_df = data evallist = [(validation_df, 'eval')] evals_result = {} config = { "tree_method": "hist", "eval_metric": ["logloss", "error"], } bst = xgb.train( params=config, dtrain=train_df, evals_result=evals_result, ray_params=xgb.RayParams(max_actor_restarts=1, num_actors=8, cpus_per_actor=2), num_boost_round=100, evals=evallist) return bst
Создать DAG функцию с логическим потоком данных:
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['finished-modin-example']) def task_flow_xgboost_modin(): build_raw_df = load_dataframe() data = create_data(build_raw_df) trained_model = train_model(data) task_flow_xgboost_modin = task_flow_xgboost_modin()
Загрузить DAG в Airflow, теперьу у вас есть работающий Airflow DAG! Прямо из дата пайплайна Jupyter notebook!
Всего лишь за 20 минут дата-сайентист может превратить локальный скрипт на Python в масштабируемый воспроизводимый пайплайн со всей мощью распределенных вычислений Ray и оркестрацией Airflow. Также дата-инженеры могут воспользоваться преимуществами переменных, коннекторов, интервальным шедулингом и другми фичами, которые и сделали Airflow таким популярным среди дата-команд, которым необходимо промышленное решение по оркестрации и шедулингу.
Простейший workflow на Ray в Airflow UI может выглядеть примерно вот так:
Обратите внимание! В alpha версии библиотеки, ваш Airflow потребует установки Ray XCom Backend, чтобы декоратор функционировал полностью.
Прохождение данных между тасками: быстрее с Plasma
В примере выше пользователь Airflow может заметить, что мы пропускаем целые датафреймы между тасками, не направляя напрямую эти куски данных во внешнее хранилище. С традиционным XCom это было бы практически невозможно, потому что Airflow хранит каждую часть датасета между тасками в отдельных ячейках БД метаданных.
Чтобы решить эту проблему мы обратились к одной из самых крутых фич Ray – хранение объектов в оперативке. Чтобы обеспечить качественную обработку данных для ML, Ray использует систему хранения данных, которая дает возможность быстрой передачи данных и zero-copy чтения. С использованием Ray декоратор Airflow использует систему хранения данных как систему кэширования, что позволяет большим дата-объектам оставаться внутри RAM воркеров. И больше никакого написания и чтения данных из S3 между тасками!
Пока что этот альфа-релиз осуществляет Ray plasma store для передачи данных между тасками Ray, будущие релизы упростят перемещение данных между Ray и разными хранилищами данных, и может даже расширит Ray custom Xcom Backend и позволит перемещать данные между тасками с помощью множества разных операторов Airflow.
Существующие ограничения
Как и сказано выше, Ray Provider для Airflow сейчас в альфа версии. Это значит, что в нем будут свои нюансы. Мы будем рады любому фидбеку – инфо о багах, предложениям по улучшению или продвижению. Код можете найти здесь.
Смотрим вперед: будущее Airflow и Ray
Тюнинг гиперпараметров
Airflow имеет возможности шаблонизации и динамической параметризации. В связке с Ray Tune он может оркестрировать и динамически масштабировать процесс подбора гиперпараметров моделей для любого ML фреймворка – включая PyTorch, XGBoost, MXNet, and Keras – при этом легко интегрируя инструменты для записи, запуска и воспроизведения экспериментов, также как и регистрация и деплой готовых моделей.
Пример использования XGBoost и Tune может выглядеть так:
@ray_task(**task_args)
def tune_model(data):
search_space = {
# You can mix constants with search space objects.
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
"max_depth": tune.randint(1, 9),
"min_child_weight": tune.choice([1, 2, 3]),
"subsample": tune.uniform(0.5, 1.0),
"eta": tune.loguniform(1e-4, 1e-1),
}
print("enabling aggressive early stopping of bad trials")
# This will enable aggressive early stopping of bad trials.
scheduler = ASHAScheduler(
max_t=10, grace_period=1, reduction_factor=2 # 10 training iterations
)
print("Tuning")
analysis = tune.run(
tune.with_parameters(train_model, data=data),
metric="eval-logloss",
mode="min",
local_dir=LOCAL_DIR,
# You can add "gpu": 0.1 to allocate GPUs
resources_per_trial=XGB_RAY_PARAMS.get_tune_resources(),
config=search_space,
num_samples=10,
scheduler=scheduler,
)
print("Done Tuning")
return analysis
@ray_task(**task_args)
def load_best_model_checkpoint(analysis):
print("Checking Analysis")
best_bst = xgb.Booster()
print(
f"Analysis Best Result on eval-error is: {analysis.best_result['eval-error']}")
print("Loading Model with Best Params")
best_bst.load_model(os.path.join(
analysis.best_checkpoint, "model.xgb"))
accuracy = 1. - analysis.best_result["eval-error"]
print(f"Best model parameters: {analysis.best_config}")
print(f"Best model total accuracy: {accuracy:.4f}")
# We could now do further predictions with
# best_bst.predict(...)
return best_bst
analysis = tune_model(data)
best_checkpoint = load_best_model_checkpoint(analysis)
Checkpointing данных при использовании Ray
Один из основных плюсов, который Airflow дает пользователям Ray, это возможность переписывать таски на работу с надежным хранилищем. Ray использует local plasma store на каждом процессе воркера, чтобы сохранять в памяти данные для быстрой обработки. Эта система отлично работает, когда доходит до быстрой обработки данных, но может не работать, если есть проблема на кластере Ray.
Пользователи Airflow Ray могут сохранить чекпоинт на шаге с DAG’ами, на котором данные лежат во внешнем хранилище (например, S3). Эта отказоустойчивость будет значить, что если таск переписывается, а данные больше не доступны локально, таск сможет подтягивать данные из постоянного хранилища.
@dag(default_args=default_args, schedule_interval=None, start_date=datetime(2021, 1, 1, 0, 0, 0), tags=['finished-modin-example'])
def checkpoint_data_example():
@ray_task(**task_args, checkpoint=True, checkpoint_source="s3_connection_id")
def really_long_task(data):
...
return model
@ray_task(**task_args)
def deploy_model(model):
deploy(model)
data = load_data()
model = really_long_task(data)
deploy_model(model)
Перемещение данных между Airflow и Ray
В грядущих итерациях этого декоратора мы создадим функцию, которая поможет легко перемещать данные с локального таска Airflow на таск Ray и обратно. Эта система будет работать с любым Custom Xcom Backend (включая Ray Custom Xcom backend), для нативной поддержки кода на python пользователям Airflow.
Выводы
Airflow + Ray – это сильная комбинация для написания ML или ETL пайплайнов. Эта интеграция всего лишь начало, а мы с нетерпением ждем, когда сможем поделиться новыми фичами и улучшениями.
Вы можете найти демо-код, приведенный в этой статье, в этом репозитории, а код для Airflow Ray provider в этом репозитории.