Как создать легко воспроизводимый DS проект

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Аналитику или исследователю данных приходится разрабатывать множество алгоритмов по обработке и анализу различных данных. Большинство алгоритмов разрабатываются для многоразового использования, а значит, код либо запускается разработчиком с определенной периодичностью, либо код передается другим пользователям для обработки своих данных. При этом алгоритмы имеют множество параметров и зависимостей, которые необходимо индивидуально настраивать под определенные данные.

Для того чтобы сделать процесс развертывания, использования и доработки алгоритма интуитивно понятным воспользуемся инструментом Kedro. Основная концепция kedro заключается в модульной структуре, где весь цикл работы с данными формируется из отдельных блоков в единый рабочий процесс. Проект на kedro имеет следующую структуру:

Где директория conf содержит конфигурационные файлы для подключения к внешним источникам, учетные данные, настройки ml-алгоритма и прочие параметры. data необходима для хранения данных на различных этапах обработки, docs предназначена для документации проекта, а logs для хранения журнала событий работы алгоритмов. Notebooks – предназначен для хранения jupyter-блокнотов и src, в которой размещается основной код для формирования pipeline проекта.

Для примера, возьмем хорошо известную задачу: предсказание выживших при кораблекрушении Титаника. Полный код, представленный в статье, можно найти в репозитории.

В первую очередь инициализируем git, создадим виртуальное окружение и установим kedro. Далее создадим файл config.yml со следующим содержимым:

output_dir: .
project_name: titanic
repo_name: titanic
python_package: titanic

А затем выполним команду kedro new —config config.yml для формирования оболочки будущего проекта.

Чтобы использовать функционал kedro необходимо установить зависимости с помощью команды: kedro install. Следующий шаг – регистрация входных данных, которые будут анализироваться. Для этого поместим данные в titanic\data\01_raw. Все данные, которые будут загружаться kedro, необходимо указать в конфигурационном файле titanic\conf\base\catalog.yml следующим образом:

input_data:
    type: pandas.CSVDataSet
    filepath: data/01_raw/train.csv

Где,

input_data – имя объекта данных, по которому будем обращаться в kedro;

type – модуль kedro, с помощью которого будем загружать данные;

filepath – расположение исходных данных.

Kedro поддерживает множество типов данных: csv, xlsx, json, parquet, sql-query и т.д. Следующий этап – настройка pipeline, который будет выполнять весь цикл работы алгоритма, от предобработки данных до оценки качества модели. Pipeline в Kedro выполняется с помощью nodes – блока модулей, которые выполняются последовательно, а полученные данные передаются от одного модуля к другому. Для начала напишем модуль предобработки данных. В директории src\titanic\pipelines создадим папку с именем data_processing, в которую поместим файл nodes.py со следующим содержимым:

import pandas as pd

def cat_to_num(arr):
    return {i:c for c, i in enumerate(arr)}
    
def preprocess_dataset(df: pd.DataFrame) -> pd.DataFrame:
    df = df.dropna(subset=["Embarked"])
    d_sex = cat_to_num(df["Sex"].unique())
    d_embarked = cat_to_num(df["Embarked"].unique()) 
    df["Sex"] = df["Sex"].map(d_sex)
    df["Embarked"]= df["Embarked"].map(d_embarked)
    return df

Для примера преобразовали 2 столбца («Sex» и «Embarked») к числовому типу и удалили строки, с пропущенными значениями по столбцу Embarked. Далее для инициализации обработки данных в pipeline нам необходимо создать файл pipeline.py рядом с nodes.py в директории src\titanic\pipelines\data_processing со следующим содержимым:

from kedro.pipeline import Pipeline, node
from .nodes import preprocess_dataset

def create_pipeline(**kwargs):
    return Pipeline(
                    [node(
                          func=preprocess_dataset,
                          inputs="input_data",
                          outputs="preprocessed_input_data",
                          name="preprocessing_input_data_node"
                          )
                    ]
                   )

Где,

func – метод для обработки входных данных, который реализован в nodes.py;

inputs – входные данные, имя указано в conf\base\catalog.yml;

outputs – задаем имя для обработанных данных;

name – задаем имя для модуля в pipeline. А также инициализируем pipeline в проектном пространстве, для этого создадим файл __init__.py в директории src\titanic\pipelines\data_processing\, в котором пропишем метод create_pipeline:

from .pipeline import create_pipeline

Заключительный момент создания отдельного модуля pipeline – регистрация в файле src\titanic\ pipeline_registry.py:

from typing import Dict
from kedro.pipeline import Pipeline
from titanic.pipelines import data_processing as dp

def register_pipelines() -> Dict[str, Pipeline]:
    
    data_processing_pipeline = dp.create_pipeline()
    
    return {"__default__": data_processing_pipeline,
            "dp": data_processing_pipeline}

Запустим проверку, чтобы убедиться в правильном составлении модуля обработки входных данных, для этого в терминале запустим команду, что запустит весь pipeline:

kedro run

или запустим отдельный модуль, указав имя через аргумент –node:

kedro run --node=preprocessing_input_data_node

В результате должны получить подобный вывод:

Обработка выполнена, однако результат обработки хранится в памяти (MemoryDataSet) до завершения pipeline. Для того чтобы зафиксировать промежуточный результат необходимо дополнить файл titanic\conf\base\catalog.yml:

………………………….
preprocessed_input_data:
    type: pandas.CSVDataSet
    filepath: data/02_intermediate/preprocessed_input_data.csv

Таким образом, результат метода preprocess_dataset будет сохранен в csv файл, в директорию data/02_intermediate/, к которому можно будет обратиться в любой момент. Далее приступим к DS составляющей pipeline. Для начала необходимо установить пакеты по работе с ml-алгоритмами, для этого отредактируем файл titanic\src\requirements.in:

………
scikit-learn==0.23.1

И вызовем в терминале команду:

kedro install –build-reqs

По подобию с обработкой данных, создадим папку data_science в директории src\titanic\pipelines\ и добавим файл nodes.py со следующим содержимым:

import logging
from typing import Dict, Tuple

import pandas as pd
import numpy as np

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn import metrics

def get_score(name_metric: str, y_true: pd.Series, y_pred: (np.ndarray, np.array)) -> float:
    ## проверка на правильность названия метрики
    name_metrics = [i for i in dir(metrics) if callable(getattr(metrics, i)) and not i.startswith("__")]
    if name_metric not in name_metrics:
        print("Invalid metric name")
        return np.nan
    score = getattr(metrics, name_metric)(y_true, y_pred)
    return round(score, 2)

def split_data(df: pd.DataFrame, parameters: Dict) -> Tuple:
    X = df[parameters["features"]]
    y = df["Survived"]
    X_train, X_test, y_train, y_test = train_test_split(X, 
                                                        y,
                                                        test_size=parameters["test_size"],
                                                        random_state=parameters["random_state"])
    return X_train, X_test, y_train, y_test
    
def train_model(X_train: pd.DataFrame, y_train: pd.Series, parameters: Dict) -> RandomForestClassifier:
    rndm_forest = RandomForestClassifier(n_estimators=parameters["n_estimators"])
    rndm_forest.fit(X_train, y_train)
    return rndm_forest

def evaluate_model(rndm_forest: RandomForestClassifier, X_test: pd.DataFrame, y_test: pd.Series, parameters: Dict):
    y_pred = rndm_forest.predict(X_test)
    score = get_score(parameters["metric"], y_test, y_pred)
    logger = logging.getLogger(__name__)
    logger.info(f"Model has a value {score} for {parameters['metric']}")

Метод split_data разбивает обработанные данные на обучающую и тестовую выборку, train_model – обучение модели, evaluate_model -проверяет качество модели, метод get_score возвращает значение по заданной метрике.

Как вы уже заметили, на вход методов передается аргумент parameters, который мы не импортируем и не извлекаем из других методов. Данный объект будет подтягиваться из конфигурационного файла titanic\conf\base\parameters.yml, из которого будем брать необходимые значения для настройки гиперпараметров модели, размера тестовой выборки и т.д. Метод get_score реализован для примера, чтобы показать возможность настройки любых параметров, которые будут полезны при настройке вашего проекта. Таким образом вам не нужно искать параметры в исходном коде, нужно лишь подправить файл parameters.yml и вызвать команду kedro run, для обучения модели с новыми параметрами. Внесем необходимые значения в файл parameters.yml:

test_size: 0.2
random_state: 17
n_estimators: 100
metric: "accuracy_score"
features:
    - Pclass
    - Sex
    - SibSp
    - Parch
    - Embarked

Параметр features (используем в методе split_data) необходим для отбора только тех признаков, которые будем использовать для обучения и предсказания, также можно указать и целевой признак, если данные берутся из различных источников. Далее повторяем операции сохранения данных, и инициализации pipeline. Для начала дополним файл titanic\conf\base\catalog.yml для сохранения обученной модели:

………….
classificator:
    type: pickle.PickleDataSet
    filepath: data/06_models/classificator.pickle
    versioned: true

Отличием от предыдущих способов сохранения данных является аргумент versioned, если укажем значение true, модель будет сохраняться каждый раз при запуске pipeline. Далее создадим data_science модуль для pipeline, для этого поместим в директорию src\titanic\pipelines\data_science файл pipeline.py со следующим содержимым:

from kedro.pipeline import Pipeline, node
from .nodes import split_data, train_model, evaluate_model

def create_pipeline(**kwargs):
    return Pipeline([
                      node(
                            func=split_data,
                            inputs=["preprocessed_input_data", "parameters"],
                            outputs=["X_train", "X_test", "y_train", "y_test"]
                            name="split_data_node"
                           ),
                      node(
                            func=train_model,
                            inputs=["X_train", "y_train", "parameters"],
                            outputs="classificator",
                            name="train_model_node",
                          ),
                      node(
                            func=evaluate_model,
                            inputs=["classificator", "X_test", "y_test", "parameters"],
                            outputs=None,
                            name="evaluate_model_node"
                           )
                    ])

Данный модуль состоит из 3-х функций, где использование входных данных зависит от выполнения предыдущих функций. Не забываем подавать на вход функций объект parameters, для доступа к параметрам, влияющих на обучающую способность модели.

Обратите внимание, каким образом функции обращаются к данным. В kedro указывается не физическое расположение данных, а используется имя объекта, которое хранится либо в памяти (MemoryDataSet — пока выполняется pipeline), либо ссылается на уже существующий объект, указанный в titanic\conf\base\catalog.yml. Также инициализируем data_science модуль в проектном пространстве, для этого создадим файл __init__.py в директории src\titanic\pipelines\data_science, в котором укажем следующую строку:

from .pipeline import create_pipeline

Также зарегистрируем data_science модуль в общий pipeline проекта, для этого редактируем файл \src\titanic\ pipeline_registry.py:

from typing import Dict
from kedro.pipeline import Pipeline
from titanic.pipelines import data_processing as dp
from titanic.pipelines import data_science as ds

def register_pipelines() -> Dict[str, Pipeline]:

    data_processing_pipeline = dp.create_pipeline()
    data_science_pipeline = ds.create_pipeline()
    
    return {"__default__": data_processing_pipeline + data_science_pipeline,
            "dp": data_processing_pipeline,
            "ds": data_science_pipeline
           }

В данном случае знак «+» для двух pipelines используется для того, чтобы kedro понимал очередность выполнения модулей. Т.к. предобработку данных уже запускали раннее, можно запустить только ds часть с помощью аргумента –pipeline:

kedro run –pipeline=ds

В результате должны получить подобный вывод:

В консоли logger выводит сообщение о том, что точность модели на валидационных данных составляет 0.78 для метрики accuracy_score. По завершению pipeline, обученная модель появится в директории data/06_models/.

На этом закончим формирование pipeline. Создание модуля предсказания на тестовых данных, с лучшей моделью оставим для домашнего задания)))

Код в статье может показаться немного запутанным. Однако у kedro подробная и последовательная документация, которая ответит на любые вопросы по формированию проекта.

Подведем итоги:

Kedro – очень удобен для командной разработки, т.к. имеет определенную структуру, и не требуется поиск определенного параметра для настройки алгоритма в папке «temp» с названием файла «my_functions.py».

Для того чтобы прогнать новые данные, требуется лишь поместить данные в директорию data\01_raw и запустить команду в терминале — kedro run. Но главное – это наличие множества интересных плагинов для kedro, которые упрощают работу, как разработчика, так и исследователя данных. В следующей части статьи уделим внимание возможностям kedro, для визуального представления работы Вашего проекта.

Источник: https://habr.com/ru/post/587810/


Интересные статьи

Интересные статьи

Хедлайнерами конференции «Умные решения – умная страна», которую проводит ЛАНИТ, всегда становятся необычные эксперты. В прошлом году всех волновал постковидный мир, и его постарался описать футуролог...
Многие ли из вас использовали сторонние библиотеки при написании кода? Вопрос риторический, ведь без применения сторонних библиотек разработка некоторых продуктов затягив...
Думаю, ни для кого не секрет, что в разговорах опытных разработчиков Python, и не только, часто проскальзывают фразы о том, что Django это зло, что в Django плохая архите...
Ранее в одном из наших КП добавление задач обрабатывалось бизнес-процессами, сейчас задач стало столько, что бизнес-процессы стали неуместны, и понадобился инструмент для массовой заливки задач на КП.
SLA, оно же «service-level agreement» —соглашение-гарантия между заказчиком и поставщиком услуг о том, что получит клиент в плане обслуживания. Также в нем оговариваются компенсации в случае ...