Миграция данных из различных RDBMS в HADOOP

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

В статье будет рассмотрен процесс экспорта данных в Hadoop из различных РСУБД посредством фреймворка Spark. Для взаимодействия с фреймворком Spark будет использован язык программирования Python с применением api pySpark.

Основные понятия

  • Реляционная СУБД (Relational Database Management System, RDBMS) – это система, обеспечивающая доступ к реляционной базе данных, в основе которой лежат данные, хранящиеся в табличном виде;

  • Apache Hadoop – это коллекция утилит, библиотек и фреймворк с открытым исходным кодом, предназначенных для разработки и выполнения распределенных программ, работающих на кластерах из множества узлов;

  • Apache Spark – фреймворк для распределенной пакетной и потоковой обработки больших неструктурированных и слабоструктурированных данных, с открытым исходным кодом, входящий в экосистему Hadoop.

Взаимодействие SPARK с Реляционными СУБД

В основе взаимодействия фреймворка Spark с реляционными базами данных лежит JDBC (Java Database Connectivity) Driver, реализованный на языке Java. При использовании JDBC происходит подключение к базе данных по специальному URL, который включает в себя адрес сервера и порт, по которому происходит отправка запросов.

Для того, чтобы приступить к реализации взаимодействия Spark и RDBMS, сначала необходимо сконфигурировать базовые свойства создаваемого application при создании SparkSession:

  • executors (количество исполнителей) – параллельные процессы, отвечающие за выполнение Spark Tasks.

  • executor cores (количество ядер для каждого исполнителя) – максимальное количество параллельно выполняемых задач, на одном исполнителе.

  • executor memory (память, выделяемая для каждого исполнителя) – количество ОЗУ, выделяемое системой на одного исполнителя.

  • driver memory (память, выделяемая для драйвера) – количество ОЗУ, выделяемое для драйвера Spark.

SparkSession – это способ инициализации базовой функциональности API (точка входа) для программного создания RDD, Dataframe и Dataset.

Рассмотрим пример, отвечающий за конфигурирование базовых свойств создаваемого application и создание SparkSession:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .config("spark.executor.instances", "2") \
  .config("spark.executor.cores", "2") \
  .config("spark.executor.memory", "2g") \
  .config("spark.driver.memory", "4g") \
  .appName("application_name") \
  .getOrCreate()

Теперь, когда SparkSession создана, можно приступать к отправлению запросов в базу данных.

Основные опции JDBC для взаимодействия с базой данных:

Название опции

Описание

Ограничения

1

url

URL-адрес для подключения к базе данных по JDBC

2

dbtable

Таблица или запрос, обернутый в скобки, который может быть добавлен в блок FROM SQL- запроса

3

query

Запрос, который будет обернут в скобки и вставлен в блок FROM в качестве подзапроса.

4

partitionColumn

Описание того, как разделить таблицу при параллельном чтении. Название колонки по которой будет проводиться разделение объекта.

Идет в группе с опциями 5, 6 и 7. Столбец раздела должен быть цифровым столбцом, датой или меткой времени из рассматриваемой таблицы.

5

lowerBound

Описание того, как разделить таблицу при параллельном чтении. Нижняя граница колонки по которой будет проводить разделение объекта. Используется для задания шага раздела, а не фильтрации данных. P.S.: более подробное описание опций будет рассмотрено на примерах в статье.

Идет в группе с опциями 4, 6 и 7

6

upperBound

Описание того, как разделить таблицу при параллельном чтении. Верхняя граница колонки по которой будет проводить разделение объекта. Используется для задания шага раздела, а не фильтрации данных. P.S.: более подробное описание опций будет рассмотрено на примерах в статье.

Идет в группе с опциями 4, 5 и 7

7

numPartitions

Максимальное количество разделов, которые можно использовать для параллелизма при чтении таблицы. Это также определяет максимальное количество одновременных подключений JDBC. P.S.: более подробное описание опций будет рассмотрено на примерах в статье.

Идет в группе с опциями 4, 5 и 6

8

fetchsize

Размер выборки JDBC, который определяет, сколько строк нужно извлекать за один проход. Это может повысить производительность драйверов JDBC, которые по умолчанию имеют низкий размер выборки (например, для ORACLE с 10 строками)

9

user

Пользователь, из-под которого происходит подключение к базе данных.

10

password

Пароль к пользователю (из опции 9)

Для того, чтобы отправить запрос в базу, необходимо выбрать формат соединения. Форматом, отвечающим за соединения с базой данных является JDBC. После того, как был выбран формат, необходимо добавить опции, которые предоставят нужную информацию для подключения к БД, инициализируют требуемый запрос и, при необходимости, сообщат драйверу о том, каким образом необходимо произвести чтение данных из БД.

Рассмотрим пример, иллюстрирующий отправку запроса в базу данных:

dataframe = spark.read.format("jdbc") \
    .option("url", "jdbc:oracle:thin:@127.0.0.1:1521/testdb") \
  .option("dbtable", "myOwner.myTableName") \
  .option("partitionColumn", "ID") \
  .option("lowerBound", 1) \
  .option("upperBound", 1000) \
  .option("numPartitions", 10) \
  .option("fetchsize", 100) \
  .option("user", "myUser") \
  .option("password", "myPass") \
  .load()

Мы видим, что spark, используя JDBC драйвер, пытается подключиться к БД jdbc:oracle:thin:@127.0.0.1:1521/testdb и прочитать из нее myOwner.myTableName, при этом используя разделение таблицы по колонке ID на 10 разделов, для возможности параллельного чтения, где нижняя граница колонки ID = 1, верхняя граница = 1000. При этом размер выборки JDBC будет равняться 100 строкам.

После того, как выполнится выше представленный код, мы получим НЕ данные из этой таблицы, а фрейм данных (dataframe). Dataframe – это абстракция данных в SparkSQL, которые организованы в строки и типизированы по столбцам. Получая фрейм данных, мы можем узнать информацию о:

  • названиях полей объекта;

  • типах полей объекта;

  • свойстве nullable полей объекта.

fields = dataframe.schema.fields
names = [_.name.lower() for _ in fields] #названия полей объекта
types = [_.dataType for _ in fields] # типы полей объекта
nullable = [_.nullable for _ in fields] # свойство nullable полей объекта

Когда фрейм данных получен, мы можем перейти к записи данных в Hadoop Distributed File System (HDFS). Для этого необходимо вызвать у dataframe метод write, после чего объявить формат выходных данных (parquet, orc, csv, avro, etc), при необходимости добавить опции для записи и вызвать метод save().

Рассмотрим наиболее часто встречаемые опции/методы для записи:

Название

Тип

Описание

Ограничения

1

compression

опция

кодек сжатия, используемый при сохранении фрейма данных в файл. Наиболее распространенные кодеки: none, bzip2, gzip, snappy и др. Самым популярным кодеком для сжатия является snappy: он не нацелен на максимальное сжатие или совместимость с любой другой библиотекой сжатия. Вместо этого, он нацелен на увеличение скорости при работе со сжатыми файлами и разумное сжатие. Более подробно можно почитать по ссылке: http://google.github.io/snappy/

2

partitionBy(*cols)

метод

метод, применяемый для партицирования по заданным наименованиям колонок

3

mode(str)

метод

режим записи данных. Наиболее часто используемые: overwrite (перезапись)/append (дозапись)

4

delimiter

опция

задает один символ в качестве разделителя для каждого поля и значения. Если значение не задано, используется значение по умолчанию ","

Применяется для формата csv

5

header

опция

использует первую строку в качестве имен столбцов. Если значение не задано, используется значение по умолчанию false.

Применяется для формата csv

6

inferSchema

опция

автоматически формирует схему из входных данных. Для определения будет выполняться один дополнительный проход по данным. Если значение не задано, используется значение по умолчанию false.

Применяется для формата csv

7

repartitoin(numPartitions, *cols)

метод

возвращает новый фрейм данных, который разделен по хешу. Целочисленный параметр numPartitions указывает целевое количество разделов. Строковый параметр *cols задает столбец, по которому будет произведено деление данных. Метод используется для увеличения или уменьшения выходных разделов. При использовании будет произведено полное перемешивание данных и создаются разделы одинакового размера.

8

coalesce(numPartitions)

метод

возвращает новый фрейм данных, разделенный на numPartitions разделов. Целочисленный параметр numPartitions указывает целевое количество разделов. Метод используется для увеличения или уменьшения выходных разделов, но при этом создает разделы с разным объемом данных за счет объединения уже имеющихся разделов. При использовании не будет произведено полное перемешивание данных.

Когда дело доходит до выбора формата хранения данных, разработчики часто бросают свой взгляд в сторону parquet файлов. Parquet – это бинарный, колоночно-ориентированный формат хранения данных, изначально созданный для экосистемы hadoop.

Достоинства хранения данных в Parquet:

  • это файлы, а значит с ними легко работать, перемещать, бэкапить и реплицировать;

  • колончатый вид позволяет значительно ускорить работу аналитика, если ему не нужны все колонки сразу;

  • нативная поддержка в Spark из коробки обеспечивает возможность просто взять и сохранить файл в любимое хранилище;

  • эффективное хранение с точки зрения занимаемого места;

  • обеспечивает самую быструю работу чтения по сравнению с использованием других файловых форматов.

Более подробную информацию о формате PARQUET можно почитать по ссылке: https://parquet.apache.org/documentation/latest/

Рассмотрим пример записи полученного фрейма данных в HDFS в формат Parquet:

dataframe.repartition(2).write \
  .mode("overwrite") \
  .partitionBy("myColumn") \
  .format("parquet") \
  .option("compression", "snappy") \
  .save("/path/to/target/dir")

Из примера выше мы видим, что полученный dataframe будет разделен на 2 равномерных раздела, после чего перед записью сотрет все данные (overwrite), имеющиеся по пути /path/to/target/dir, произведет партицирование по колонке myColumn и сохранит данные в формате parquet по пути /path/to/target/dir, применив сжатие snappy.

Анализ процесса миграции данных из различных RDBMS в HADOOP

На наших проектах часто приходится решать задачи переноса больших данных между системами.
Например, есть таблицы, содержащие десятки Тб данных, на основании которых необходимо построить различные аналитические витрины. Первым шагом является вынос данных из продуктивной среды бизнес-приложений в среду аналитических хранилищ, чтобы работа с этими данными не повлекла деградацию сервиса бизнес-приложений. Самый распространенный пример, когда бизнес приложения работают с некоторой РСУБД, а анализируются в Hadoop. Как можно выполнить процесс выноса данных в Hadoop с помощью Spark, мы увидим ниже.

Проанализируем процесс выполнения следующего кода:

Запрос создания/структура таблицы "GRIGOREVDS.myTable" в БД:

create table GRIGOREVDS.myTable (
        id         NUMBER,
        field1     VARCHAR2(1000),
        field2     NUMBER,
        field3     VARCHAR2(1000),
        field4     NUMBER,
        field5     VARCHAR2(1000),
        field6     NUMBER
);

Заполнение таблицы "GRIGOREVDS.myTable" 100 строками, где колонка "ID" имеет целочисленный тип данных с шагом 1:

INSERT INTO GRIGOREVDS.myTable VALUES (1,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (2,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (3,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (4,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (5,'field1',2,'field3',4,'field5',6);
...
INSERT INTO GRIGOREVDS.myTable VALUES (99,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (100,'field1',2,'field3',4,'field5',6);

Python скрипт "migration.py" экспорта данных из RDBMS в HDFS, использующий API pyspark:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.dynamicAllocation.enabled", "false") \
    .config("spark.executor.instances", "1") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "1g") \
    .config("spark.driver.memory", "1g") \
    .appName("test_migration") \
    .getOrCreate()

df = spark.read.format("jdbc") \
    .option("url", "jdbc:oracle:thin:@localhost:1521/oradb") \
    .option("user", "GRIGOREVDS") \
    .option("password", "***") \
    .option("dbtable", "GRIGOREVDS.myTable") \
    .option("partitionColumn", "id") \
    .option("lowerBound", 1) \
    .option("upperBound", 100) \
    .option("numPartitions", 10) \
    .load()

df.repartition(2).write \
    .mode("overwrite") \
    .partitionBy("field1") \
    .format("parquet") \
    .option("compression", "snappy") \
    .save("/tmp/migration/test")

После того, как скрипт будет запущен через YARN (например: spark-submit --master yarn ~/migration.py) создастся Spark application, где можно будет отследить процесс выполнения запущенного скрипта в Spark History (Web сервер для визуализации создаваемых Jobs, Stages, Tasks, а также отображения логов выполнения приложений Spark в удобном виде).

Любое приложение в момент своей работы, состоит из драйвера – программы, которая выполняет основную функцию приложения (точка входа в приложение (main)), и исполнителей, которым на вход поступают данные и инструкция, по которой будет производиться выполнение на узлах кластера. После чего, исполнители отдают результат драйверу, чтобы получить новую задачу. При этом каждый исполнитель может выполнять обработку более чем в один поток, где каждый поток будет работать со своим блоком данных независимо друг от друга. Таким образом, если посмотреть на наш код выше, то можно заметить, что при запуске нашего приложения мы запросили у менеджера кластера одного исполнителя с двумя ядрами (потоками). Это говорит о том, что нам будет доступно (spark.executor.instances * spark.executor.cores) 1 * 2 = 2 потока и в лучшем случае мы сможем обрабатывать два блока данных одновременно. При необходимости данные показатели можно увеличить.

Ниже приведен снимок экрана, на котором видна общая информация о создаваемых Jobs, после запуска нашего скрипта:

Вкладка JOBS
Вкладка JOBS
Прогресс по выполнению задач рассматриваемого Job
Прогресс по выполнению задач рассматриваемого Job

Можно заметить, что запущенный процесс состоит из 1-ого Job (save), который, в свою очередь, делится на 12 задач.

Детализация рассматриваемого Job
Детализация рассматриваемого Job

На скриншоте выше, можно увидеть, что созданный Job был разбит на 2-а этапа:

  1. Чтение данных из базы данных (Stage Id: 0), который состоит из 10 задач;

  2. Запись данных в HDFS (Stage Id: 1), который состоит из двух задач;

Перейдя в этап чтения данных можно увидеть, что были созданы и успешно выполнены 10 Tasks (tasks: 0 9).

Детализация этапа чтения данных
Детализация этапа чтения данных

1 - количество созданных задач, в рамках рассматриваемого этапа
2 - созданные задачи и их детальная информация
3 - детальная информация о задействованных исполнителях, в рамках рассматриваемого этапа

Разберемся, откуда взялись 10 задач, если чтение производилось одной таблицы (GRIGOREVDS.myTable). Посмотрев внимательно на код, можно заметить, что были применены дополнительные опции (partitionColumn, lowerBound, upperBound, numPartitions), которые позволили распараллелить чтение данных из нашей таблицы. В результате, после применения данных опций мы получили 10 (numPartitions), сгенерированных JDBC драйвером, запросов в БД.

Посмотрим детальнее на процесс генерации запросов. Когда задаются данные опций, часто возникает вопрос: А что же делают опции lowerBound и upperBound? Могу ли я наложить таким образом фильтр на данные и выгрузить только те, которые попали в диапазон данных опций? Ответ - нет. При указании данных опций мы лишь задаем крайние границы значений выбранной колонки partitionColumn, чтобы в дальнейшем получить наиболее равномерное распределение данных, с помощью запросов выборки. В рассматриваемом примере мы увидим, что у нас имеется в таблице 100 записей, которые должны быть разбиты на 10 блоков по колонке "ID". В результате мы получим:

Блок

Запрос выборки

Накладываемый фильтр на данные

1

SELECT
"ID",
"FIELD1",
"FIELD2",
"FIELD3",
"FIELD4",
"FIELD5",
"FIELD6"
FROM GRIGOREVDS.myTable

WHERE "ID" < 11 or "ID" is null

2

WHERE "ID" >= 11 and "ID" < 21

3

WHERE "ID" >= 21 and "ID" < 31

4

WHERE "ID" >= 31 and "ID" < 41

5

WHERE "ID" >= 41 and "ID" < 51

6

WHERE "ID" >= 51 and "ID" < 61

7

WHERE "ID" >= 61 and "ID" < 71

8

WHERE "ID" >= 71 and "ID" < 81

9

WHERE "ID" >= 81 and "ID" < 91

10

WHERE "ID" >= 91

Примеры отправленных запросов в Oracle представлены на скриншотах ниже:

Снимок детализации создаваемой сессии в Oracle (session 1)
Снимок детализации создаваемой сессии в Oracle (session 1)
Снимок детализации создаваемой сессии в Oracle (session 2)
Снимок детализации создаваемой сессии в Oracle (session 2)

Посмотреть информацию о том, было ли применено параллельное чтение данных, сколько строк было обработано в процессе работы каждого из этапов, количество выходных файлов, размер выходных данных и др., можно на вкладке SQL, перейдя в интересующий Query ID:

Вкладка SQL
Вкладка SQL

1 - SQL блок
2 - колонка для идентификации Queries
3 - ссылка на конкретный Query ID

Детализация рассматриваемого Query
Детализация рассматриваемого Query

1 - время выполнения блока
2 - параллельное чтение данных (разбиение на разделы) + количество разделов
3 - количество выходных файлов
4 - размер выходных данных
5 - количество выходных строк

После того, как фрейм данных получен, запускается блок записи в HDFS. В рамках рассматриваемого приложения в Spark History - это Stage Id: 1.

Так как все выходные данные решено было поделить на две, близкие к равенству по размеру части repartition(2), то в Spark History мы, соответственно, увидим две созданные задачи на запись. См. скриншот ниже:

Детализация этапа записи данных
Детализация этапа записи данных

1 - количество созданных задач, в рамках рассматриваемого этапа
2 - созданные задачи и их детальная информация
3 - детальная информация о задействованных исполнителях в рамках рассматриваемого этапа

В итоге, после успешного завершения работы скрипта, в директории HDFS (/tmp/migration/test) мы увидим
файл "_SUCCESS", который будет сигнализировать пользователю о том, что запись была произведена успешно. А также директорию, которая была создана в результате выполнения партицирования по колонке "field1" (partitionBy("field1")), в которой будут лежать два файла repartition(2), содержащие в себе экспортируемые из БД данные в формате parquet с применением кодека сжатия snappy.

1 - файл, сигнализирующий успешную запись
2 - директория, создавшаяся в результате партицирования
3 - два файла с данными

Название директории, по которой проводится партицирование, будет состоять из двух частей: название колонки, по которой проводится партицирование и сгруппированные значения из этой колонки.

Заключение

В рамках данной статьи мы рассмотрели вариант экспорта данных из реляционной базы данных в HDFS, с применением фреймворка Apache Spark. Кроме того, проанализировали поэтапно процесс выполнения исполняемого кода, а также рассмотрели возможные методы и опции для оптимизации и ускорения процесса выгрузки данных.

Источник: https://habr.com/ru/company/neoflex/blog/586056/


Интересные статьи

Интересные статьи

Весь телеком-бизнес основан на данных, и Билайн не исключение. Данные генерируются как внутри, так и снаружи: в OSS-системах (события на оборудовании, сетевой трафик), в ...
Как предсказывать временные ряды на JavaScript, зачем нам понадобилось это делать в браузере и когда это имеет смысл. Читать далее
Я всегда считал, что одна из основных проблем при использовании систем резервного копирования — свойственный этому процессу консерватизм. Причин для него огромное количество, начиная...
Всем привет. Если вы когда-либо работали с универсальными списками в Битрикс24, то, наверное, в курсе, что страница детального просмотра элемента полностью идентична странице редак...
Анамнез, так сказать: Сервер Fujitsu rx300 s6, RAID6 из 6 1Тб дисков, поднят XenServer 6.2, крутятся несколько серверов, среди них Убунта с несколькими шарами, 3,5 миллиона файлов, 1,5 Тб данн...