Как построить MVP системы для удобной работы аналитика без Docker, Kubernetes и Airflow

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Всем привет!

Меня зовут Максим Шептяков, и я занимаюсь продуктовой аналитикой уже больше четырёх лет. Так получилось, что я несколько раз приходил в компании или проекты, где (почти) совсем не было аналитики до меня, так что приходилось настраивать удобную работу с данными с нуля. И сегодня я вам расскажу, как быстро построить себе МВП аналитической системы без Docker, Kubernetes и Airflow, ведь часто аналитики не обладают знаниями этих систем.

Для понимания статьи нужно базовое знание Python и SQL.

Что есть в компании, куда вы пришли, с самого начала

Обычно в компании, куда вы пришли, уже есть какой-то рабочий продукт, необходимые данные о котором хранятся в боевой (то есть production) базе, на которой настроена репликация в базу-реплику. Первому аналитику выдаётся право ходить в реплику и получать данные оттуда. Но есть несколько нюансов:

  • Нельзя кидать слишком тяжелые запросы, ведь идёт постоянная репликация

  • Нельзя модифицировать схему данных, ведь это реплика production-базы

  • Часто нельзя коннектиться сторонними сервисами для визуализации данных

  • Некоторые данные нужно получать не из реплики, а из каких-то других источников (другие БД, API, etc.)

Подход, когда для каждой задачи отдельно подготавливаются данные возможен, но от него начинает тошнить уже на первой неделе такой работы, поэтому возникает желание поднять себе удобную аналитическую систему.

Как выглядит удобная аналитическая система

По моему опыту, для удобной работы аналитическая система должна удовлетворять следующим критериям:

  • Все данные, необходимые для работы, лежат в одном месте и автоматически обновляются

  • Аналитик имеет доступ ко всем данным, необходимым для работы, и может запускать на них тяжелые запросы

  • Аналитик может создавать промежуточные таблицы, витрины данных, вьюхи, etc

  • Аналитик может быстро добавить новый источник данных или новую витрину данных внутри системы

  • Аналитик может подключить систему визуализации к аналитической системе и выдать доступ команде

Схема аналитической системы
Схема аналитической системы

Кажется, что построить такую систему с нуля, да ещё и когда ты не разработчик, а аналитик, довольно сложно. Но я покажу, что для создания MVP (Minimal Viable Product), не нужно прикладывать больших усилий и что создать рабочий прототип для удобной работы можно всего за несколько часов.

В этом гайде сфокусируемся на части с загрузкой и обновлением данных в БД и на доступе аналитика ко всем данным.

Где будут храниться данные

Для хранения данных можно выбрать бесплатную реляционную БД, например, Postgres. Надеюсь, что в компании, куда вы пришли, есть ребята, ответственные за инфраструктуру, которые могут вам поднять БД и выдать все доступы для неё.

Но если это не так, есть несколько способов, как можно поднять себе аналитическую базу:

  • Бесплатно поднять базу локально на своём компьютере (Скачать Postgres-клиент можно с официального сайта PostgreSQL)

  • Платно поднять БД в облаке (Гайд, как поднять Postgres в AWS)

Не буду останавливаться на том, как поднимать базы данных, а лучше расскажу про то, как можно организовать данные внутри аналитической БД.

Схема данных для аналитической БД
Схема данных для аналитической БД

Я разделил аналитической БД на несколько схем, у каждой из которых своя роль:

  • etl — схема для загрузки сырых данных из различных источников

  • arch — схема для хранения старых сырых данных, которые могут понадобиться в будущем

  • public — схема для работы аналитика, то есть схема для проведения исследований, складывания временных данных, дизайна новых витрин данных

  • pre_dash — схема для предобработанных данных, которые будут использоваться для подготовки витрин для визуализации и для работы аналитика, но не будут доступны для доступа не из БД

  • dash — схема для данных, подготовленных для визуализации (то есть для небольших витрин данных, которые можно быстро достать из БД)

Для удобной работы с такой системой, нам пригодится 3 пользователя:

  • loader — системный пользователь для загрузки данных из внешних источников в аналитическую БД и обработки данных внутри БД

  • analyst — пользователь, у которого есть доступ ко всем данным и возможность работать в схеме public

  • viewer — системный пользователь для визуализации, у которого есть доступ только к схеме DASH с данными, готовыми для отрисовки

Код создания схем, пользователей и выдачи доступов в Postgres:

-- Создаём схемы данных
create schema etl;
create schema pre_dash;
create schema dash;
create schema arch;

-- Создаём пользователей
create role loader login password '*loader_password*';
create role analyst login password '*analyst_password*';
create role viewer login password '*viewer_password*';

-- Выдаём доступы пользователям к нужным схемам и таблицам в них 
-- После создания таблиц нужно будет заного выдать доступы к таблицам пользователям
grant usage, create on schema etl, arch, pre_dash, dash to loader;

grant usage on schema etl, arch, pre_dash, dash to analyst;
grant usage, create on schema public to analyst;
grant select on all tables in schema etl, arch, pre_dash, dash, public to analyst;


grant usage on schema dash to viewer;
grant select on all tables in schema dash to viewer;

Аналитическая БД готова к работе, можно начинать наполнять её данными!

ETL для аналитической БД

Теперь перейдём к сложной части, настройке ETL для автоматической загрузки и подготовки данных.

Давайте для начала определимся с задачами, которые должна выполнять ETL-система:

  • Загрузка данных из разных источников данных в аналитическую БД

  • Быстрое добавление новых источников данных для загрузки

  • Обработка данных внутри аналитической БД

  • Быстрое добавление новых обработок данных внутри БД

  • Учёт зависимостей данных при обработке

  • Работа по расписанию (мы рассмотрим работу раз в день)

Для начала создадим класс, который будет отвечать за работу ETL-системы. Сразу учтём то, что работать он будет раз в день.

from datetime import date


class TransferData:
    """
    Класс, ответственный за сбор данных из внешних источников и сохранение их в аналитическую БД
    """
    def __init__(self, work_date=date.today()):
        self.work_date = work_date

Все загрузки и обработки данных будут представлены как JSON-файлы (джобы), по которым будет проходиться класс TransferData. В этом классе должна быть информация о

  • Типе проводимой работы (загрузка или обработка данных)

  • Названии источника данных для загрузки

  • Необходимых параметрах для загрузки/обработки данных

Создадим первую джобу, которая будет собирать данные из открытого API https://api.publicapis.org/entries и перекладывать их к нам в аналитическую базу. Для описания этого процесса мы можем создать словарь в Python:

load_job = {
    # Тип джобы
    'job_type': 'load', 
    # Название источника данных
    'db_from':'open_api', 
    # DDL таблицы, в которую будут заливаться данные
    'ddl': '''create table if not exists etl.open_apis
(
    api text,
    description text,
    auth text,
    https text,
    cors text,
    link text,
    category text,
    load_date date
);''',
    # Название таблицы, в которую будут заливаться данные.
    'product': 'etl.open_apis',
    # Название промежуточного файла, через который будут загружаться данные
    'file_name': 'open_apis.txt',
}

И теперь напишем код, который при использовании нашего JSON создаст в аналитической БД таблицу и загрузит в неё данные из другой БД.

# Добавим импорты в начало файла
import psycopg2
import os

# .....

# Добавим 2 функции в класс TransferData
class TransferData:

# .....
  
    def load_data(self, load_job):
        """
        Функция для загрузки данных из внешних источников на диск.
        Должна по итогу создавать csv-файл на диске с форматом sep='|', quotechar="^".

        :param load_job: словарь с описанием джобы для загрузки данных
        """
        if load_job.get('db_from') == 'open_api':
            # Специфичная для источника логика сбора данных
            import requests
            import pandas as pd
            data = requests.get('https://api.publicapis.org/entries').json()
            data = pd.DataFrame(data['entries'])
            data['work_date'] = self.work_date
            data = data.loc[:, ['API', 'Description', 'Auth', 'HTTPS', 'Cors', 
                                'Link', 'Category', 'work_date']]
            # Сохраняем в специальный формат для уменьшения количества ошибок при загрузке
            data.to_csv(load_job.get('file_name'), index=False, 
                        sep='|', quotechar="^")

    def upload_data(self, load_job):
        """
        Функция для загрузки данных с диска в таблицы в аналитической БД.
        Для корректной работы необходим файл в формате csv с параметрами DELIMITER '|', QUOTE '^'
        ANALYTICS_DB_CONN_STRING - переменная окружения, в которой содержится connect-строка для
        нашей аналитической БД для юзера loader.
        Например, postgres://loader:*loader_password*@localhost:5432/postgres

        :param load_job: словарь с описанием джобы для загрузки данных
        """
        #
        with psycopg2.connect(os.environ['ANALYTICS_DB_CONN_STRING']) as conn:
            curs = conn.cursor()
            file_name = load_job.get('file_name')
            with open(file_name, 'r') as f:
                product = load_job.get('product')
                curs.execute(load_job.get('ddl'))
                curs.execute(f'truncate {product};')
                copy_query = f"""COPY {product} from STDIN with (FORMAT csv, DELIMITER '|', QUOTE '^', HEADER True)"""
                curs.copy_expert(copy_query, f)
                conn.commit()
            os.remove(file_name)

Теперь можно запуском 2 функций выгрузить данные из одного источника и загрузить их в аналитическую БД:

# Убедитесь, что находитесь в одной директории с файлом simple_etl.py
# и что установлены все зависимости из requirements.txt: pip install -r requirements.txt
import simple_etl as se

tr = se.TransferData()
tr.load_data(load_query)
tr.upload_data(load_query)

Теперь нам нужно создать обработку данных внутри БД. Создадим такую витрину, которая будет содержать в себе данные о количестве открытых API в https://api.publicapis.org/entries на каждый день:

process_job = {
    'job_type': 'process',
    # SQL-запрос, чтобы получить требуемые данные. load_date подставится в коде:
    'query': '''create table if not exists dash.openapi_daily_count
(
    work_date date,
    public_api_count int
);

delete from dash.openapi_daily_count
where work_date = '{work_date}';

insert into dash.openapi_daily_count(work_date, public_api_count) 
select work_date, count(distinct link) public_api_count 
from etl.open_apis 
where work_date = '{work_date}'
group by 1;''',
    # Название таблицы, в которую будут заливаться данные. Нужно для правильной работы зависимостей:
    'product': 'dash.openapi_daily_count',
    # Список таблиц, от которых зависит обработка данных
    'dependencies': ['etl.open_public_apis'],
}

И добавим код, который обработает данные внутри БД:

class TransferData:

# .....

    def process_data(self, process_job):
        """
        Функция для обработки данных внутри БД.

        :param process_job: Описание джобы по обработке данных.
        """
        with psycopg2.connect(os.environ['ANALYTICS_DB_CONN_STRING']) as conn:
            curs = conn.cursor()
            curs.execute(process_job.get('query').format(work_date=self.work_date))
            conn.commit()

Теперь мы можем обработать собранные данные одной командой:

import simple_etl as se
tr = se.TransferData()
tr.process_data(process_job)

Давайте выполним требование по быстрому добавлению новых источников данных и новых обработок данных. Для этого можно создать папку, в которой будут сохранены все джобы, которые нам нужны, в виде JSON-файлов.

import os
import json 

os.mkdir('etl_jobs')

with open('etl_jobs/dash_revenue_daily.json', 'w') as f:
    json.dump(process_job, f, indent=4)
    
with open('etl_jobs/etl_payments_daily.json', 'w') as f:
    json.dump(load_job, f, indent=4)

Теперь мы можем быстро добавлять в папку JSON-файлы с описанием джобов, которые нужно выполнить в течение дня.

Запуск всех джобов с учётом зависимостей

Теперь создадим функцию, которая поможет нам запускать все джобы разом, с учётом зависимостей.

# ...
import json

class TransferData:
    # .....
    # Обновим __init__ метод: 
    def __init__(self, work_date=date.today(), jobs_dir='etl_jobs'):
        self.work_date = work_date
        self.jobs_dir = jobs_dir
        self.loaded_dependencies = set()

    # .....

    def launch_job(self, job):
        """
        Функция, которая запускает джобы в зависимости от их типа

        :param job: Джоба по сбору или обработке данных
        """
        job_type = job.get('job_type')
        if job_type == 'load':
            self.load_data(job)
            self.upload_data(job)
        elif job_type == 'process':
            self.process_data(job)
        else:
            print('Unknown job type for job', job)

    def launch_all_jobs(self):
        """
        Запускаем все джобы с учётом наличия зависимостей в них
        """
        # Загружаем все джобы в память
        job_paths = os.listdir(self.jobs_dir)
        jobs = {}
        for job_path in job_paths:
            with open(os.path.join(self.jobs_dir, job_path), 'r') as f:
                jobs[job_path] = json.load(f)
        # Итерируемся по джобам, проверяя, загружены ли зависимости для них.
        # Осторожно, если зависимости не найдутся, цикл будет вечным
        while True:
            jobs_local = jobs.copy() # Для удаления элементов словаря во время итерации по нему
            for job_name, job in jobs_local.items():
                job_dependencies = job.get('dependencies')
                if job_dependencies is None or len(set(job_dependencies) - self.loaded_dependencies) == 0:
                    print(f'Started {job_name}')
                    self.launch_job(job)
                    product = job.get('product')
                    if product is not None:
                        self.loaded_dependencies.add(product)
                    jobs.pop(job_name)
                    print(f'Finished {job_name}')
            if len(jobs) == 0:
                break
        print('All jobs processed')

Теперь мы можем запустить все джобы, находящиеся в папке etl_jobs с помощью одной команды:

import simple_etl as se
tr = se.TransferData()
tr.launch_all_jobs()

Регулярный запуск сбора и обработки данных

Для регулярного запуска сбора и обработки данных добавим в файл с классом TransferData логику запуска обработки данных:

if __name__ == '__main__':
    """
    Запускаем обновление данных раз в день в 10.00
    """
    import schedule
    import time

    def scheduled_update():
        tr = TransferData(jobs_dir='path_to_jobs_dir/')
        tr.launch_all_jobs()

    schedule.every().day.at("10:00").do(scheduled_update)

    while True:
        schedule.run_pending()
        time.sleep(60)

Теперь мы можем запустить на своём компьютере (или на сервере, если вам его могут выдать ваши инфраструктурщики) в фоне запуск файла и наслаждаться обновлёнными данными в БД каждый день:

Как добавить новый сбор данных из нового источника

Добавить сбор данных из нового источника можно, если обновить функцию load_data и добавить новое условие в if c новым db_from. Тут можно добавлять логику сбора из любых источников: базы данных, API, документы, онлайн-таблицы...

Так можно добавить новый сбор данных в код:

    def load_data(self, load_job):
        # .....
        if load_job.get('db_from') == 'open_api':

           # .....

        elif load_job.get('db_from') == 'other_db_type':
            # DO SOME HERE AND SAVE FILE TO load_job.get('file_name')
            return

Ссылка на репозиторий с полным кодом simple_etl.

Вот и всё

Поздравляю, вы создали MVP аналитической системы! Теперь у вас есть:

  • Автоматическая загрузка и обработка данных из разных источников

  • Возможность работать со всеми данными в одной БД

  • Лёгкое и быстрое добавление новых данных (или обработка имеющихся) в систему

  • Возможность подключить визуализацию к заранее подготовленным и обновляемым данным

Наполняйте систему нужными данными и удобно работайте с ними в одной БД!
Дальше вы можете настроить автоматически обновляемые дэшборды в любой удобной системе визуализации, например, в Metabase (если интересно увидеть, как её можно просто настроить, пишите в комментарии).

Послесловие

Конечно, эта система — это лишь начало, её можно и нужно улучшать и развивать: добавлять логирование, обработку ошибок, параллельность работы, улучшать отказоустойчивость... Вообще говоря, обычно для этого существуют отдельные команды. Но это уже совсем другая история!

Источник: https://habr.com/ru/post/718670/


Интересные статьи

Интересные статьи

Праздники уже прошли, кто успел отдохнуть — тот молодец. Ну а мы продолжаем публиковать обзоры новых одноплатных компьютеров, которые могут пригодиться читателям Хабра для опытов с электроникой, раб...
Меня зовут Максим, я руководитель проектов в ИНТЕРВОЛГЕ. Мы с командой уже несколько лет развиваем сеть B2B/B2C интернет-магазинов нашего клиента. Хочу сегодня рассказать про рефакторинг. И кода,...
В погоне за кубитами Baidu Inc как лидер среди китайских поисковых систем не отстаёт от своего западного конкурента Alphabet Inc.В августе 2022 года китайский гигант поисковой системы Baidu Inc разраб...
И еще раз о передатчиках и приемниках 433 МГц. Попробуем как можно проще и дешевле подключить их к ATMEGA8, добиться дальности более 2 км и сделать крипто-защищенную беспроводную сеть-шлюз к 1-Wire с...
Наверняка те, кто хоть раз в жизни видел своими глазами северное (или южное) полярное сияние, скажут, что это просто фантастическое зрелище. Чудо природы планетарного масштаба, грандиозное явлени...