Пишем ETL-процесс на Python

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Привет! Меня зовут Сергей Климов, я работаю Python-разработчиком, специализируюсь на написании бэкенда на фреймворке Django. Идея данной статьи появилась из-за желания написать простой и понятный ETL-процесс без итерации по спискам и прочей “вложенности”, попутно решая вопросы рационального использования ОЗУ. Уверен, что мои наработки будут полезны для питонистов, которые столкнулись с аналогичной задачей.

Зачем это надо

Для начала обозначим предметную область. Нас интересует ETL-процесс (extract, transform, search) реализованный через паттерн “Цепочка обязанностей”. Мы разработаем в качестве примера три обработчика, которые будут передавать данные последовательно из функции в функцию. Каждый последующий обработчик решает, может ли он обработать запрос сам и стоит ли передавать запрос дальше по цепи.

Перед тем, как начать

Каждый наш обработчик будет представлять из себя функцию, которая будет являться вызывающей функцией и/или функцией-генератором.

В первом приближении наша задача создать итератор, который будет состоять из этих функций. Это мы реализуем посредством декоратора.

В вызывающем (или передающем) коде передача данных в следующий обработчик будет через метод send(). Принимать данные будем через выражение yield

Также мы будем использовать распаковку кортежей, сопоставление match/case

Поэтому рекомендую ознакомиться/повторить данные темы и термины, чтобы понимать как всё будет работать.

Постановка задачи

Вымышленная задачка, главное “погонять” данные из функции в функцию.

В базе данных есть таблица, содержащая целые числа. ETL-процесс должен пройтись по всем записям таблицы, возвести каждое число в квадрат и отобразить в консоли. Для каждого четного числа вывести информационное сообщение "the square of an even number". Если число из базы данных равно 3, то никаких действий над ним мы не делаем.

Первая функция

Задача первой функции “цепочки обязанностей” - передать значение в генератор путем вызова метода send(). В нашем вымышленном примере мы делаем sql-запрос к таблице, содержащей числовые строки и по одной передаем их в генератор.

import psycopg2

from psycopg2.extras import DictCursor

SQL = """select id, number from etl.source"""

def extract(batch):

   dbs = dict(dbname='demo', user='sergei', password='sergei', host='localhost')
   with psycopg2.connect(**dbs) as connection:

       with connection.cursor(cursor_factory=DictCursor) as cursor:

           cursor.execute(SQL)
           record = cursor.fetchone()  # можно использовать fetchmany, чтобы извлекать данные "пачками"

           while record:
               batch.send(record)
               record = cursor.fetchone()  # можно использовать fetchmany, чтобы извлекать данные "пачками"

Декоратор

Теперь пишем обертку для наших функций-генераторов для обеспечения возможности принимать данные. Задача - запустить генератор. Для этого мы используем next() встроенную функция Python, которая возвращает следующий элемент в итераторе. 

from functools import wraps

from functools import wraps

def coroutine(func):
   @wraps(func)
   def inner(*args, **kwargs):
       fn = func(*args, **kwargs)
       next(fn)
       return fn
   return inner

Вторая и все промежуточные функции

У всех промежуточных функций “цепочки обязанностей” три задачи - принять данные, обработать их и передать в следующую функцию. В продолжении нашего вымышленного примера:

@coroutine
def transform(batch):
    while record := (yield):

        new_number = record["number"] ** 2
        if record["number"] % 2 == 0:
            foo = "an even number"
        elif record["number"] == 3:
            print("skip load stage")
            continue
        else:
            foo = 0

        batch.send((new_number, foo))

Обратите внимание, что yield в скобках предполагает, что будет получен итерируемый объект. Иначе вы словите исключение StopItearation. Ветками if/elif/else показано, что можно управлять наборами данных, которые будут направлены на следующий этап. А также через continue можно вообще прервать выполнение “цепочки обязанностей”.

Заключительная функция

Технологически это функция из предыдущего раздела только без инструкции send. Т.е. мы ничего далее не отправляем. Завершаем наш вымышленную цепочку:

@coroutine
def load():
    while subject := (yield):
        match subject:
            case (int(number), str(bar)):
                print("the square of", bar, number)
            case (int(number), int(bar)):
                print(number)
            case _:
                raise SyntaxError(f"Unknown structure of {subject=}")

Собираем все вместе

Ключевое слово “собираем”, а не “запускаем”. Вот как выглядит код запуска наших функций:

extract(
    transform(
       load()
    )
)

Т.е. порядок вложенности (читаем снаружи внутрь) регулирует порядок расположения функций в итераторе. При рефакторинге кода получаем:

unloads = load()
multiplication = transform(unloads)
extract(multiplication)

Это несколько сбивает с толку, т.к. запуск кода по факту идёт “снизу вверх”. Имейте это ввиду, т.к. общепринят именно второй вариант кода. Иначе никак, просто представьте себе как будет выглядеть первый вариант кода если количество этапов будет более пяти.

Репозиторий с кодом доступен по ссылке.

Что далее

При первом знакомстве с кодом возникает ощущение его асинхронности (спасибо декоратору с говорящим названием coroutine). Но это не так. Все действия выполняются последовательно. В следующий раз мы попытаемся добавить асинхронности в наш код.

Желаю успехов и с удовольствием отвечу на ваши вопросы.

Источник: https://habr.com/ru/post/710106/


Интересные статьи

Интересные статьи

Если вы когда-нибудь чувствовали, что вы погрязли в совещаниях и обсуждениях, которые всё длятся и длятся, а решения проблемы всё нет, знайте: в mypy есть 5-летний issue, о том что целое число не явля...
ВведениеВ данной статье я бы хотел рассмотреть проблему обновления PHP в виртуальной машине BitrixVM, и действия, которые возможно применить если выполнение переезда на машину с обновленным ПО невозмо...
Предполагается, что читатель уже имеет начальные знания языка C, что-то знает о Zigbee, чипе cc2530, методах его прошивания и использования, а также знаком с такими проектами, как zigbee2mqtt...
Привет, Хабр. На днях Siemargl предложил мне перевести любопытную статью о победе над юниксовым wc при помощи хаскеля. Переводить её я, конечно же, не буду, и по нескольким причинам: автор вы...
Предположим, ваша Python-программа оказалась медленной, и вы выяснили, что это лишь отчасти обусловлено нехваткой процессорных ресурсов. Как выяснить то, какие части кода вынуждены ожидать чего-т...