Курсы валют и аналитика – использование обменных курсов в Хранилище Данных

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Привет! На связи Артемий – Analytics Engineer из Wheely.

Сегодня хотел бы поговорить о вопросах конвертирования финансовых показателей в разные валюты. Вопрос достаточно актуальный, так как большое количество компаний имеют мультинациональные зоны присутствия, строят аналитику глобального масштаба, готовят отчетность по международным стандартам.

Покажу как этот вопрос решается с помощью современных подходов на примере кейса Wheely:

  • Расширение списка базовых валют

  • Регулярное обновление и получения актуальных курсов

  • Обеспечение корректности исторических показателей

  • Максимальное удобство и простота использования в аналитических инструментах

Велком под кат для разбора решения проблемы учета мультивалютных метрик и показателей: Open Exchange Rate, Airflow, Redshift Spectrum, dbt.


Новые требования к сервису валютных курсов

В качестве legacy-источника использовался веб-сервис ЦБ РФ. Однако с изменяющимися требованиями и расширением зон присутствия компании его стало недостаточно. Например, по причине отсутствия котировки AED (дирхам ОАЭ). Для кого-то могут быть актуальны курсы криптовалют BTC, ETH, которые в веб-сервисе ЦБ РФ тоже отсутствуют.

Новые требования можно суммировать следующим образом:

  • Поддержка расширенного набора базовых валют, которые отсутствуют в API ЦБ РФ

  • Получение самых актуальных котировок, включая внутридневные курсы

  • Минимизация трансформаций данных вне Хранилища Данных (лучше если их вообще нет)

Матрица новых требований к работе с курсами валют
Матрица новых требований к работе с курсами валют

Задачи, которые предстоит решить легко визуализировать в виде матрицы. Красным помечены области, поддержку которых предстоит добавить:

  • Интеграция нового API для уже использующихся курсов

  • Добавление новых базовых валют в выгрузку

  • Получение ретроспективных (исторических) данных по новым валютам за прошлые периоды

  • Архивирование курсов из legacy-источника

Легаси приложение по выгрузке курсов валют формировало pivot-таблицу с коэффициентом для каждой пары в отдельном столбце. Это удобно, когда у нас есть строго фиксированный набор валют и наименования колонок, но превращается в головную боль если список валют необходимо расширить. 

Появилось желание уйти от всех трансформаций и формирований таблиц в pandas до того как данные попадают в Хранилище. Здесь я придерживаюсь принципа применения всех трансформаций (T в ELT) в одном месте, и помогает мне в этом замечательный инструмент dbt.

Интеграция с новым поставщиком данных

Как уже стало понятно, без внешнего поставщика данных обойтись не получится, поэтому предлагаю рассмотреть один из ряда провайдеров курсов валют – https://openexchangerates.org/

Минимальный необходимый план Developer включает в себя:

  • 10.000 запросов ежемесячно (более чем достаточно)

  • Ежечасные внутридневные обновления курсов

  • Широкий набор базовых валют, включая криптовалюты

Доступные методы API:

Для получения актуальных курсов валют воспользуемся API endpoint /latest.json

Простой запрос-ответ может выглядеть следующим образом:

Установка на расписание в Airflow

Для регулярного получения актуальных курсов валют я воспользуюсь инструментом Airflow. Apache Airflow – де-факто стандарт в области оркестрации данных, data engineering и управления пайплайнами. 

Смысловая составляющая графа задачи (DAG):

  • Сделать запрос к API

  • Сохранить полученный ответ (например, в виде уникального ключа на S3)

  • Уведомить в Slack в случае ошибки

Конфигурация DAG:

  • Базовые валюты (base currency), от которых отсчитываем курсы

  • Синхронизация расписание запусков с расчетом витрин в Хранилище Данных

  • Токен доступа к сервису

Самый простой DAG состоит из одного таска с вызовом простого shell-скрипта:

TS=`date +"%Y-%m-%d-%H-%M-%S-%Z"`
 
curl -H "Authorization: Token $OXR_TOKEN" \
 "https://openexchangerates.org/api/historical/$BUSINESS_DT.json?base=$BASE_CURRENCY&symbols=$SYMBOLS" \
 | aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$BUSINESS_DT-$BASE_CURRENCY-$TS.json

Вот как выглядит результат регулярной работы скрипта в S3:

Сегодня в штатном режиме выполняется около 25 обращений к сервису в сутки, статистика выглядит следующим образом:

Выгрузка истории по новым валютам

После обеспечения регулярной выгрузки всех необходимых валют, можно приступить к формированию истории по новым базовым валютам (которой, очевидно, нет). Это позволит переводить в новые валюты суммы транзакций прошлых периодов.

К сожалению, план Developer не включает обращения к API endpoint /time-series.json, и только ради этой разовой задачи не имеет смысла делать upgrade на более дорогостоящую версию.

Воспользуемся методом /historical/*.json и простым опросом API в цикле для формирования исторической выгрузки:

#!/bin/bash
 
d=2011-01-01
while [ "$d" != 2021-02-19 ]; do
 echo $d
 curl -H "Authorization: Token $TOKEN" "https://openexchangerates.org/api/historical/$d.json?base=AED&symbols=AED,GBP,EUR,RUB,USD" > ./export/$d.json
 d=$(date -j -v +1d -f "%Y-%m-%d" $d +%Y-%m-%d)
done

Пиковая нагрузка вызвала вопросы у коллег, которые тоже пользуются сервисом, но это была разовая акция:

Архивирование исторических курсов валют

Вся история обменных курсов полученная из legacy-источника ЦБ РФ до даты X (перехода на новый сервис-провайдер) подлежит архивированию в неизменном виде.

Я хочу сохранить все те курсы, которые мы показывали в своих аналитических инструментах без изменений. То есть чтобы суммы в дашбордах и отчетах бизнес-пользователей не были изменены ни на копейку.

Для этого я выполню выгрузку накопленных значений обменных курсов за весь исторический период в Data Lake. Более детально, я произведу:

  • Трансформацию legacy pivot-таблицы в двумерную

  • Запись в колоночный формат PARQUET в AWS S3

Формирование архива в S3 в формате PARQUET
CREATE EXTERNAL TABLE spectrum.currencies_cbrf
STORED AS PARQUET
LOCATION 's3://<BUCKET>/dwh/currencies_cbrf/' AS
WITH base AS (
   SELECT 'EUR' AS base_currency
   UNION ALL
   SELECT 'GBP'
   UNION ALL
   SELECT 'RUB'
   UNION ALL
   SELECT 'USD'
)
SELECT
   "day" AS business_dt
   ,b.base_currency
   ,CASE b.base_currency
       WHEN 'EUR' THEN 1
       WHEN 'GBP' THEN gbp_to_eur
       WHEN 'RUB' THEN rub_to_eur
       WHEN 'USD' THEN usd_to_eur
       ELSE NULL
     END AS eur
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_gbp
       WHEN 'GBP' THEN 1
       WHEN 'RUB' THEN rub_to_gbp
       WHEN 'USD' THEN usd_to_gbp
       ELSE NULL
     END AS gbp
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_rub
       WHEN 'GBP' THEN gbp_to_rub
       WHEN 'RUB' THEN 1
       WHEN 'USD' THEN usd_to_rub
       ELSE NULL
     END AS rub
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_usd
       WHEN 'GBP' THEN gbp_to_usd
       WHEN 'RUB' THEN rub_to_usd
       WHEN 'USD' THEN 1
       ELSE NULL
     END AS usd     
FROM ext.currencies c
   CROSS JOIN base b
;

Таким образом, в хранилище S3 у меня теперь есть статический снимок всех обменных курсов, когда-либо использованных в аналитических приложениях, сериализованный в оптимизированный колоночный формат со сжатием. В случае необходимости пересчета витрин и исторических данных я запросто смогу воспользоваться этими курсами.

Доступ к данным из DWH через S3 External Table

А теперь самое интересное – из своего аналитического движка Amazon Redshift я хочу иметь возможность просто и быстро обращаться к самым актуальным курсам валют, использовать их в своих трансформациях.

Оптимальное решение – создание внешних таблиц EXTERNAL TABLE, которые обеспечивают SQL-доступ к данным, хранящимся в S3. При этом нам доступно чтение полуструктурированных данных в формате JSON, бинарных данных в форматах AVRO, ORC, PARQUET и другие опции. Продукт имеет название Redshift Spectrum и тесно связан с SQL-движком Amazon Athena, который имеет много общего с Presto.

CREATE EXTERNAL TABLE IF NOT EXISTS spectrum.currencies_oxr (
   "timestamp" bigint
   , base varchar(3)
   , rates struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
)
ROW format serde 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<BUCKET>/dwh/currencies/'
;

Обратите внимание на обращение ко вложенному документу rates с помощью создания типа данных struct.

Теперь добавим к этой задаче секретную силу dbt. Модуль dbt-external-tables позволяет автоматизировать создание EXTERNAL TABLES и зарегистрировать их в качестве источников данных:

   - name: external
     schema: spectrum
     tags: ["spectrum"]
     loader: S3
     description: "External data stored in S3 accessed vith Redshift Spectrum"
     tables:
       - name: currencies_oxr
         description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
         freshness:
           error_after: {count: 15, period: hour}
         loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
         external:
           location: "s3://<BUCKET>/dwh/currencies/"
           row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
         columns:
           - name: timestamp
             data_type: bigint
           - name: base
             data_type: varchar(3)
           - name: rates
             data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>

Немаловажным элементом является проверка своевременности данных – source freshness test на курсы валют. Тем самым мы будем постоянно держать руку на пульсе поступления актуальных данных в Хранилище. Очень важно рассчитывать все финансовые метрики корректно и в срок, а без актуальных значений курсов задачу решить невозможно.

В случае отставания данных – более 15 часов без свежих обменных курсов – мы тут же получаем уведомление в Slack.

Для прозрачности и простоты пользователей объединим исторические данные (архив) и постоянно поступающие актуальные курсы (новый API) в одну модель currencies:

Объединение исторических и новых данных в единый справочник
{{
   config(
       materialized='table',
       dist='all',
       sort=["business_dt", "base_currency"]
   )
}}
 
with cbrf as (
 
 select
 
     business_dt
   , null as business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
 
 from {{ source('external', 'currencies_cbrf') }}
 where business_dt <= '2021-02-18'
 ),
 
oxr_all as (
 
   select
 
     (timestamp 'epoch' + o."timestamp" * interval '1 second')::date as business_dt
   , (timestamp 'epoch' + o."timestamp" * interval '1 second') as business_ts
   , o.base as base_currency
   , o.rates.aed::decimal(10,4) as aed
   , o.rates.eur::decimal(10,4) as eur
   , o.rates.gbp::decimal(10,4) as gbp
   , o.rates.rub::decimal(10,4) as rub
   , o.rates.usd::decimal(10,4) as usd
   , row_number() over (partition by base_currency, business_dt order by business_ts desc) as rn
 
   from {{ source('external', 'currencies_oxr') }} as o
   where business_dt > '2021-02-18'
 
),
 
oxr as (
 
 select
 
     business_dt
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
 
 from {{ ref('stg_currencies_oxr_all') }}
 where rn = 1
 ),
 
united as (
 
 select
 
     business_dt
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
  from cbrf
 
 union all
 
 select
 
     business_dt
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
  from oxr
 
)
 
select
 
   business_dt
 , business_ts
 , base_currency
 , aed
 , eur
 , gbp
 , rub
 , usd
 
from united

При этом физически справочник с курсами валют копируется на каждую ноду аналитического кластера Redshift и хранится в отсортированном по дате и базовой валюте  виде для ускорения работы запросов.

Использование курсов в моделировании данных

В целом, работа с курсами валют для аналитиков и инженеров, которые развивают Хранилище Данных не изменилась и осталась весьма простой. Все детали использования нового API, обращения к внешним полу-структурированным документам JSON в S3, объединению с архивными данными скрыты  . В своих трансформациях достаточно сделать простой джоин на таблицу с курсами валют:

   select
 
       -- price_details
       , r.currency
       , {{ convert_currency('price', 'currency') }}
       , {{ convert_currency('discount', 'currency') }}
       , {{ convert_currency('insurance', 'currency') }}
       , {{ convert_currency('tips', 'currency') }}
       , {{ convert_currency('parking', 'currency') }}
       , {{ convert_currency('toll_road', 'currency') }}
 
   from {{ ref('requests') }} r
       left join {{ ref('stg_currencies') }} currencies on r.completed_dt_utc = currencies.business_dt
           and r.currency = currencies.base_currency

Сами метрики конвертируются при помощи простого макроса, который на вход принимает колонку с исходной суммой и колонку с исходным кодом валюты:

-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
 
     ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
   , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
   , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
   , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
   , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd
 
{%- endmacro %}

Практико-ориентированное развитие

Работа с данными – одно из наиболее востребованных и бурно развивающихся направлений. Каждый день я нахожу новые интересные задачи и придумываю решения для них. Это захватывающий и интересный путь, расширяющий горизонты.

В конце мая состоится юбилейный запуск курса Data Engineer в ОТУС, в котором я принимаю участие в роли преподавателя.

По прошествии двух лет программа постоянно менялась, адаптировалась. Ближайший запуск принесет ряд нововведений и будет построен вокруг кейсов – реальных прикладных проблем инженеров:

  • Data Architecture

  • Data Lake

  • Data Warehouse

  • NoSQL / NewSQL

  • MLOps

Детально с программой можно ознакомиться на лендинге курса.

Также я делюсь своими авторскими заметками и планами в телеграм-канале Technology Enthusiast.

Благодарю за внимание.

Источник: https://habr.com/ru/company/otus/blog/558238/


Интересные статьи

Интересные статьи

Носить виртуальные аксессуары – это весело, но до их ношения в реальной жизни всего один шаг. Мы могли бы легко создать приложение, которое позволяет виртуально примерять...
Сегодня мы рассмотрим пример базы данных и различные команды агрегации, группировки, сортировки, соединения таблиц и другое на примере MySQL. Сами данные представляют соб...
Когда тот, кто работает в сфере Data Science, собирается показать результаты своей деятельности другим людям, оказывается, что таблиц и отчётов, полных текстов, недостаточно для того чтоб...
Недавно перед нами встала задача развернуть Dgraph в кластере Kubernetes. В этой статье я поделюсь полученным опытом: с чем мы столкнулись во время деплоя и последующего использования...
Привет! Меня зовут Никита Учителев. Я представляю отдел Research & Development компании Lamoda. Нас 20+ человек, и мы работаем над различными рекомендациями на сайте и в приложениях, разрабат...