В статье будет рассмотрен процесс экспорта данных в Hadoop из различных РСУБД посредством фреймворка Spark. Для взаимодействия с фреймворком Spark будет использован язык программирования Python с применением api pySpark.
Основные понятия
Реляционная СУБД (Relational Database Management System, RDBMS) – это система, обеспечивающая доступ к реляционной базе данных, в основе которой лежат данные, хранящиеся в табличном виде;
Apache Hadoop – это коллекция утилит, библиотек и фреймворк с открытым исходным кодом, предназначенных для разработки и выполнения распределенных программ, работающих на кластерах из множества узлов;
Apache Spark – фреймворк для распределенной пакетной и потоковой обработки больших неструктурированных и слабоструктурированных данных, с открытым исходным кодом, входящий в экосистему Hadoop.
Взаимодействие SPARK с Реляционными СУБД
В основе взаимодействия фреймворка Spark с реляционными базами данных лежит JDBC (Java Database Connectivity) Driver, реализованный на языке Java. При использовании JDBC происходит подключение к базе данных по специальному URL, который включает в себя адрес сервера и порт, по которому происходит отправка запросов.
Для того, чтобы приступить к реализации взаимодействия Spark и RDBMS, сначала необходимо сконфигурировать базовые свойства создаваемого application при создании SparkSession:
executors (количество исполнителей) – параллельные процессы, отвечающие за выполнение Spark Tasks.
executor cores (количество ядер для каждого исполнителя) – максимальное количество параллельно выполняемых задач, на одном исполнителе.
executor memory (память, выделяемая для каждого исполнителя) – количество ОЗУ, выделяемое системой на одного исполнителя.
driver memory (память, выделяемая для драйвера) – количество ОЗУ, выделяемое для драйвера Spark.
SparkSession – это способ инициализации базовой функциональности API (точка входа) для программного создания RDD, Dataframe и Dataset.
Рассмотрим пример, отвечающий за конфигурирование базовых свойств создаваемого application и создание SparkSession:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.executor.instances", "2") \
.config("spark.executor.cores", "2") \
.config("spark.executor.memory", "2g") \
.config("spark.driver.memory", "4g") \
.appName("application_name") \
.getOrCreate()
Теперь, когда SparkSession создана, можно приступать к отправлению запросов в базу данных.
Основные опции JDBC для взаимодействия с базой данных:
№ | Название опции | Описание | Ограничения |
1 | url | URL-адрес для подключения к базе данных по JDBC | |
2 | dbtable | Таблица или запрос, обернутый в скобки, который может быть добавлен в блок FROM SQL- запроса | |
3 | query | Запрос, который будет обернут в скобки и вставлен в блок FROM в качестве подзапроса. | |
4 | partitionColumn | Описание того, как разделить таблицу при параллельном чтении. Название колонки по которой будет проводиться разделение объекта. | Идет в группе с опциями 5, 6 и 7. Столбец раздела должен быть цифровым столбцом, датой или меткой времени из рассматриваемой таблицы. |
5 | lowerBound | Описание того, как разделить таблицу при параллельном чтении. Нижняя граница колонки по которой будет проводить разделение объекта. Используется для задания шага раздела, а не фильтрации данных. P.S.: более подробное описание опций будет рассмотрено на примерах в статье. | Идет в группе с опциями 4, 6 и 7 |
6 | upperBound | Описание того, как разделить таблицу при параллельном чтении. Верхняя граница колонки по которой будет проводить разделение объекта. Используется для задания шага раздела, а не фильтрации данных. P.S.: более подробное описание опций будет рассмотрено на примерах в статье. | Идет в группе с опциями 4, 5 и 7 |
7 | numPartitions | Максимальное количество разделов, которые можно использовать для параллелизма при чтении таблицы. Это также определяет максимальное количество одновременных подключений JDBC. P.S.: более подробное описание опций будет рассмотрено на примерах в статье. | Идет в группе с опциями 4, 5 и 6 |
8 | fetchsize | Размер выборки JDBC, который определяет, сколько строк нужно извлекать за один проход. Это может повысить производительность драйверов JDBC, которые по умолчанию имеют низкий размер выборки (например, для ORACLE с 10 строками) | |
9 | user | Пользователь, из-под которого происходит подключение к базе данных. | |
10 | password | Пароль к пользователю (из опции 9) |
Для того, чтобы отправить запрос в базу, необходимо выбрать формат соединения. Форматом, отвечающим за соединения с базой данных является JDBC. После того, как был выбран формат, необходимо добавить опции, которые предоставят нужную информацию для подключения к БД, инициализируют требуемый запрос и, при необходимости, сообщат драйверу о том, каким образом необходимо произвести чтение данных из БД.
Рассмотрим пример, иллюстрирующий отправку запроса в базу данных:
dataframe = spark.read.format("jdbc") \
.option("url", "jdbc:oracle:thin:@127.0.0.1:1521/testdb") \
.option("dbtable", "myOwner.myTableName") \
.option("partitionColumn", "ID") \
.option("lowerBound", 1) \
.option("upperBound", 1000) \
.option("numPartitions", 10) \
.option("fetchsize", 100) \
.option("user", "myUser") \
.option("password", "myPass") \
.load()
Мы видим, что spark
, используя JDBC
драйвер, пытается подключиться к БД jdbc:oracle:thin:@127.0.0.1:1521/testdb
и прочитать из нее myOwner.myTableName
, при этом используя разделение таблицы по колонке ID
на 10
разделов, для возможности параллельного чтения, где нижняя граница колонки ID = 1
, верхняя граница = 1000
. При этом размер выборки JDBC будет равняться 100
строкам.
После того, как выполнится выше представленный код, мы получим НЕ данные из этой таблицы, а фрейм данных (dataframe). Dataframe – это абстракция данных в SparkSQL, которые организованы в строки и типизированы по столбцам. Получая фрейм данных, мы можем узнать информацию о:
названиях полей объекта;
типах полей объекта;
свойстве nullable полей объекта.
fields = dataframe.schema.fields
names = [_.name.lower() for _ in fields] #названия полей объекта
types = [_.dataType for _ in fields] # типы полей объекта
nullable = [_.nullable for _ in fields] # свойство nullable полей объекта
Когда фрейм данных получен, мы можем перейти к записи данных в Hadoop Distributed File System (HDFS). Для этого необходимо вызвать у dataframe метод write, после чего объявить формат выходных данных (parquet, orc, csv, avro, etc), при необходимости добавить опции для записи и вызвать метод save().
Рассмотрим наиболее часто встречаемые опции/методы для записи:
№ | Название | Тип | Описание | Ограничения |
1 | compression | опция | кодек сжатия, используемый при сохранении фрейма данных в файл. Наиболее распространенные кодеки: none, bzip2, gzip, snappy и др. Самым популярным кодеком для сжатия является snappy: он не нацелен на максимальное сжатие или совместимость с любой другой библиотекой сжатия. Вместо этого, он нацелен на увеличение скорости при работе со сжатыми файлами и разумное сжатие. Более подробно можно почитать по ссылке: http://google.github.io/snappy/ | |
2 | partitionBy(*cols) | метод | метод, применяемый для партицирования по заданным наименованиям колонок | |
3 | mode(str) | метод | режим записи данных. Наиболее часто используемые: overwrite (перезапись)/append (дозапись) | |
4 | delimiter | опция | задает один символ в качестве разделителя для каждого поля и значения. Если значение не задано, используется значение по умолчанию "," | Применяется для формата csv |
5 | header | опция | использует первую строку в качестве имен столбцов. Если значение не задано, используется значение по умолчанию false. | Применяется для формата csv |
6 | inferSchema | опция | автоматически формирует схему из входных данных. Для определения будет выполняться один дополнительный проход по данным. Если значение не задано, используется значение по умолчанию false. | Применяется для формата csv |
7 | repartitoin(numPartitions, *cols) | метод | возвращает новый фрейм данных, который разделен по хешу. Целочисленный параметр numPartitions указывает целевое количество разделов. Строковый параметр *cols задает столбец, по которому будет произведено деление данных. Метод используется для увеличения или уменьшения выходных разделов. При использовании будет произведено полное перемешивание данных и создаются разделы одинакового размера. | |
8 | coalesce(numPartitions) | метод | возвращает новый фрейм данных, разделенный на numPartitions разделов. Целочисленный параметр numPartitions указывает целевое количество разделов. Метод используется для увеличения или уменьшения выходных разделов, но при этом создает разделы с разным объемом данных за счет объединения уже имеющихся разделов. При использовании не будет произведено полное перемешивание данных. |
Когда дело доходит до выбора формата хранения данных, разработчики часто бросают свой взгляд в сторону parquet файлов. Parquet – это бинарный, колоночно-ориентированный формат хранения данных, изначально созданный для экосистемы hadoop.
Достоинства хранения данных в Parquet:
это файлы, а значит с ними легко работать, перемещать, бэкапить и реплицировать;
колончатый вид позволяет значительно ускорить работу аналитика, если ему не нужны все колонки сразу;
нативная поддержка в Spark из коробки обеспечивает возможность просто взять и сохранить файл в любимое хранилище;
эффективное хранение с точки зрения занимаемого места;
обеспечивает самую быструю работу чтения по сравнению с использованием других файловых форматов.
Более подробную информацию о формате PARQUET можно почитать по ссылке: https://parquet.apache.org/documentation/latest/
Рассмотрим пример записи полученного фрейма данных в HDFS в формат Parquet:
dataframe.repartition(2).write \
.mode("overwrite") \
.partitionBy("myColumn") \
.format("parquet") \
.option("compression", "snappy") \
.save("/path/to/target/dir")
Из примера выше мы видим, что полученный dataframe
будет разделен на 2
равномерных раздела, после чего перед записью сотрет все данные (overwrite
), имеющиеся по пути /path/to/target/dir, произведет партицирование по колонке myColumn
и сохранит данные в формате parquet
по пути /path/to/target/dir
, применив сжатие snappy
.
Анализ процесса миграции данных из различных RDBMS в HADOOP
На наших проектах часто приходится решать задачи переноса больших данных между системами.
Например, есть таблицы, содержащие десятки Тб данных, на основании которых необходимо построить различные аналитические витрины. Первым шагом является вынос данных из продуктивной среды бизнес-приложений в среду аналитических хранилищ, чтобы работа с этими данными не повлекла деградацию сервиса бизнес-приложений. Самый распространенный пример, когда бизнес приложения работают с некоторой РСУБД, а анализируются в Hadoop. Как можно выполнить процесс выноса данных в Hadoop с помощью Spark, мы увидим ниже.
Проанализируем процесс выполнения следующего кода:
Запрос создания/структура таблицы "GRIGOREVDS.myTable" в БД:
create table GRIGOREVDS.myTable (
id NUMBER,
field1 VARCHAR2(1000),
field2 NUMBER,
field3 VARCHAR2(1000),
field4 NUMBER,
field5 VARCHAR2(1000),
field6 NUMBER
);
Заполнение таблицы "GRIGOREVDS.myTable" 100 строками, где колонка "ID" имеет целочисленный тип данных с шагом 1:
INSERT INTO GRIGOREVDS.myTable VALUES (1,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (2,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (3,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (4,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (5,'field1',2,'field3',4,'field5',6);
...
INSERT INTO GRIGOREVDS.myTable VALUES (99,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (100,'field1',2,'field3',4,'field5',6);
Python скрипт "migration.py" экспорта данных из RDBMS в HDFS, использующий API pyspark:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.dynamicAllocation.enabled", "false") \
.config("spark.executor.instances", "1") \
.config("spark.executor.cores", "2") \
.config("spark.executor.memory", "1g") \
.config("spark.driver.memory", "1g") \
.appName("test_migration") \
.getOrCreate()
df = spark.read.format("jdbc") \
.option("url", "jdbc:oracle:thin:@localhost:1521/oradb") \
.option("user", "GRIGOREVDS") \
.option("password", "***") \
.option("dbtable", "GRIGOREVDS.myTable") \
.option("partitionColumn", "id") \
.option("lowerBound", 1) \
.option("upperBound", 100) \
.option("numPartitions", 10) \
.load()
df.repartition(2).write \
.mode("overwrite") \
.partitionBy("field1") \
.format("parquet") \
.option("compression", "snappy") \
.save("/tmp/migration/test")
После того, как скрипт будет запущен через YARN (например: spark-submit --master yarn ~/migration.py
) создастся Spark application, где можно будет отследить процесс выполнения запущенного скрипта в Spark History (Web сервер для визуализации создаваемых Jobs, Stages, Tasks, а также отображения логов выполнения приложений Spark в удобном виде).
Любое приложение в момент своей работы, состоит из драйвера – программы, которая выполняет основную функцию приложения (точка входа в приложение (main)), и исполнителей, которым на вход поступают данные и инструкция, по которой будет производиться выполнение на узлах кластера. После чего, исполнители отдают результат драйверу, чтобы получить новую задачу. При этом каждый исполнитель может выполнять обработку более чем в один поток, где каждый поток будет работать со своим блоком данных независимо друг от друга. Таким образом, если посмотреть на наш код выше, то можно заметить, что при запуске нашего приложения мы запросили у менеджера кластера одного исполнителя с двумя ядрами (потоками). Это говорит о том, что нам будет доступно (spark.executor.instances * spark.executor.cores
) 1 * 2 = 2 потока и в лучшем случае мы сможем обрабатывать два блока данных одновременно. При необходимости данные показатели можно увеличить.
Ниже приведен снимок экрана, на котором видна общая информация о создаваемых Jobs, после запуска нашего скрипта:
Можно заметить, что запущенный процесс состоит из 1-ого Job (save), который, в свою очередь, делится на 12 задач.
На скриншоте выше, можно увидеть, что созданный Job был разбит на 2-а этапа:
Чтение данных из базы данных (Stage Id: 0), который состоит из 10 задач;
Запись данных в HDFS (Stage Id: 1), который состоит из двух задач;
Перейдя в этап чтения данных можно увидеть, что были созданы и успешно выполнены 10 Tasks (tasks: 0 9).
1 - количество созданных задач, в рамках рассматриваемого этапа
2 - созданные задачи и их детальная информация
3 - детальная информация о задействованных исполнителях, в рамках рассматриваемого этапа
Разберемся, откуда взялись 10 задач, если чтение производилось одной таблицы (GRIGOREVDS.myTable). Посмотрев внимательно на код, можно заметить, что были применены дополнительные опции (partitionColumn, lowerBound, upperBound, numPartitions
), которые позволили распараллелить чтение данных из нашей таблицы. В результате, после применения данных опций мы получили 10 (numPartitions
), сгенерированных JDBC драйвером, запросов в БД.
Посмотрим детальнее на процесс генерации запросов. Когда задаются данные опций, часто возникает вопрос: А что же делают опции lowerBound и upperBound? Могу ли я наложить таким образом фильтр на данные и выгрузить только те, которые попали в диапазон данных опций? Ответ - нет. При указании данных опций мы лишь задаем крайние границы значений выбранной колонки partitionColumn
, чтобы в дальнейшем получить наиболее равномерное распределение данных, с помощью запросов выборки. В рассматриваемом примере мы увидим, что у нас имеется в таблице 100 записей, которые должны быть разбиты на 10 блоков по колонке "ID". В результате мы получим:
Блок | Запрос выборки | Накладываемый фильтр на данные |
1 | SELECT | WHERE "ID" < 11 or "ID" is null |
2 | WHERE "ID" >= 11 and "ID" < 21 | |
3 | WHERE "ID" >= 21 and "ID" < 31 | |
4 | WHERE "ID" >= 31 and "ID" < 41 | |
5 | WHERE "ID" >= 41 and "ID" < 51 | |
6 | WHERE "ID" >= 51 and "ID" < 61 | |
7 | WHERE "ID" >= 61 and "ID" < 71 | |
8 | WHERE "ID" >= 71 and "ID" < 81 | |
9 | WHERE "ID" >= 81 and "ID" < 91 | |
10 | WHERE "ID" >= 91 |
Примеры отправленных запросов в Oracle представлены на скриншотах ниже:
Посмотреть информацию о том, было ли применено параллельное чтение данных, сколько строк было обработано в процессе работы каждого из этапов, количество выходных файлов, размер выходных данных и др., можно на вкладке SQL, перейдя в интересующий Query ID:
1 - SQL блок
2 - колонка для идентификации Queries
3 - ссылка на конкретный Query ID
1 - время выполнения блока
2 - параллельное чтение данных (разбиение на разделы) + количество разделов
3 - количество выходных файлов
4 - размер выходных данных
5 - количество выходных строк
После того, как фрейм данных получен, запускается блок записи в HDFS. В рамках рассматриваемого приложения в Spark History - это Stage Id: 1.
Так как все выходные данные решено было поделить на две, близкие к равенству по размеру части repartition(2)
, то в Spark History мы, соответственно, увидим две созданные задачи на запись. См. скриншот ниже:
1 - количество созданных задач, в рамках рассматриваемого этапа
2 - созданные задачи и их детальная информация
3 - детальная информация о задействованных исполнителях в рамках рассматриваемого этапа
В итоге, после успешного завершения работы скрипта, в директории HDFS (/tmp/migration/test
) мы увидим
файл "_SUCCESS", который будет сигнализировать пользователю о том, что запись была произведена успешно. А также директорию, которая была создана в результате выполнения партицирования по колонке "field1" (partitionBy("field1")
), в которой будут лежать два файла repartition(2)
, содержащие в себе экспортируемые из БД данные в формате parquet с применением кодека сжатия snappy.
1 - файл, сигнализирующий успешную запись
2 - директория, создавшаяся в результате партицирования
3 - два файла с данными
Название директории, по которой проводится партицирование, будет состоять из двух частей: название колонки, по которой проводится партицирование и сгруппированные значения из этой колонки.
Заключение
В рамках данной статьи мы рассмотрели вариант экспорта данных из реляционной базы данных в HDFS, с применением фреймворка Apache Spark. Кроме того, проанализировали поэтапно процесс выполнения исполняемого кода, а также рассмотрели возможные методы и опции для оптимизации и ускорения процесса выгрузки данных.