Привет, Хабр. Для будущих студентов курса «Экосистема Hadoop, Spark, Hive» подготовили перевод материала.
Также приглашаем всех желающих на вебинар «Тестирование Spark приложений». На этом открытом уроке рассмотрим проблемы в тестировании Spark приложений: стат данные, частичную проверку и запуск/остановку тяжелых систем. Изучим библиотеки для решения и напишем тесты.
Эта статья посвящена исключительно операции Join (соединение) в Apache Spark и дает общее представление о фундаменте, на котором построена технология Spark Join.
Операции Join часто используются в типовых потоках анализа данных для корреляции двух наборов данных. Apache Spark, будучи унифицированным аналитическим движком, также обеспечил прочную основу для выполнения широкого спектра сценариев Join.
На очень высоком уровне Join работает с двумя наборами входных данных, операция выполняется путем сопоставления каждой записи данных, принадлежащей одному из наборов входных данных, с каждой другой записью, принадлежащей другому набору входных данных. При обнаружении совпадения или несовпадения (в соответствии с заданным условием) операция Join может либо вывести отдельную сопоставляемую запись из любого из двух наборов данных, либо объединенную (Joined) запись. Объединенная запись представляет собой комбинацию отдельных сопоставляемых записей из обоих наборов данных.
Важные аспекты операции Join:
Теперь давайте разберемся в трех важных аспектах, которые влияют на выполнение операции Join в Apache Spark. К ним относятся:
1) Размер наборов входных данных: Размер наборов входных данных напрямую влияет на эффективность выполнения и надежность операции Join. Кроме этого, сравнительный размер наборов входных данных влияет на выбор механизма Join, что может еще сильнее сказаться на эффективности и надежности Join.
2) Условие соединения (Join Condition): Условие или положение, на основании которого объединяются входные наборы данных, называется условием соединения (Join Condition). Условие обычно включает логическое сравнение(я) между атрибутами, принадлежащими входным наборам данных. Исходя из условия соединения, объединения классифицируются на две широкие категории: эквивалентные соединения (Equi Join) и неэквивалентные соединения ( Non-Equi Joins).
Эквивалентные соединения включают одно условие равенства, либо несколько, которые должны выполняться одновременно. Каждое условие равенства применяется к атрибутам двух входных наборов данных. Например, (A.x == B.x) или ((A.x == B.x) и (A.y == B.y)) - это два примера условий эквивалентного соединения для атрибутов x, y двух входных наборов данных A и B, участвующих в операции Join.
Неэквивалентные соединения не подразумевают условий равенства. Однако они могут допускать несколько условий равенства, которые не должны выполняться одновременно. Например, (A.x < B.x) или ((A.x == B.x) или (A.y == B.y)) - это два примера условий неравнозначного соединения для атрибутов x, y двух входных наборов данных A и B, участвующих в операции Join.
3) Тип соединения (Join type): Тип Join влияет на результат операции Join после применения условия Join между записями входных наборов данных. Ниже приведена общая классификация различных типов Join:
Внутреннее соединение (Inner Join): Inner Join выводит только совпадающие записи Joined (по условию Join) из входных наборов данных.
Внешнее соединение (Outer Join): Outer Join выводит не только совпадающие записи, но и не совпадающие. Внешнее соединение дополнительно классифицируется на левое, правое и полное внешнее соединение на основе выбора набора (наборов) входных данных для вывода несовпадающих записей.
Полусоединение (Semi Join): Semi Join выводит отдельные записи, принадлежащие только одному из двух входных наборов данных, в совпадающем, либо в несовпадающем экземпляре. Если запись, принадлежащая одному из входных наборов данных, выводится на несовпадающий экземпляр, то полусоединение (Semi Join) также называется антисоединением (Anti Join).
Перекрестное соединение (Cross Join): Cross Join выводит все объединенные записи, которые возможны при объединении каждой записи из одного набора входных данных с каждой записью из другого набора входных данных.
Основываясь на трех вышеперечисленных важных аспектах выполнения Join, Apache Spark выбирает правильный механизм для выполнения Join.
Различные механизмы выполнения операции Join
После того, как мы поняли различные аспекты осуществления операции Join, давайте теперь разберемся в различных механизмах ее выполнения.
Apache Spark предоставляет в общей сложности пять механизмов для выполнения операций Join. К ним относятся:
Перемешанный хеш (Shuffle Hash Join)
Широковещательный хеш (Broadcast Hash Join)
Сортировка через слияние (Sort Merge Join)
Декартово соединение (Cartesian Join)
Широковещательное соединение вложенного цикла (Broadcast Nested Loop Join)
Broadcast Hash Join: в механизме «Broadcast Hash Join» один из двух входных наборов данных (участвующих в Join) транслируется всем исполнителям. Хеш-таблица строится для всех исполнителей из транслируемого набора данных, после чего каждый раздел не транслируемого входного набора данных присоединяется независимо к другому набору данных, доступному как локальная хеш-таблица.
“Broadcast Hash Join" не требует этапа перемешивания и является наиболее эффективным. Единственным требованием к надежности является то, что исполнители должны иметь достаточно памяти для размещения транслируемого набора данных. Поэтому Spark избегает этого механизма, когда оба входных набора данных достаточно велики и превышают настраиваемый порог.
Shuffle Hash Join: В механизме 'Shuffle Hash Join' сначала два набора входных данных выравниваются в соответствии с выбранной схемой разделения (партиционирования) выходных данных (Чтобы узнать больше о выбранной схеме разделения выходных данных, вы можете обратиться к моей недавней книге под названием ”Guide to Spark Partitioning (Руководство по партиционированию Spark)”. В случае, если один или оба набора входных данных не соответствуют выбранной схеме разбиения, для достижения соответствия используется операция перемешивания (shuffle) перед фактическим выполнением Join.
После того, как соответствие выбранной схеме разбиения выходных данных обеспечено для обоих входных наборов данных, Shuffle Hash выполняет операцию Join, для каждого выходного раздела, используя стандартный подход Hash Join. То есть, для каждого выходного раздела сначала строится хэш-таблица из соответствующего раздела меньшего входного набора данных, а затем соответствующий раздел большего входного набора данных соединяется с построенной хэш-таблицей.
Требования к памяти исполнителей относительно меньше в случае "Shuffle Hash Join" по сравнению с "Broadcast Hash Join". Это связано с тем, что хэш-таблица строится только на определенном разделе меньшего набора входных данных. Таким образом, если вы предоставляете большое количество выходных разделов и у вас большое количество исполнителей с подходящей конфигурацией памяти, вы можете достичь более высокой производительности для операции Join с помощью 'Shuffle Hash Join'. Однако эффективность будет ниже, чем у 'Broadcast Hash Join', если Spark потребуется выполнить дополнительную операцию перемешивания на одном или обоих входных наборах данных для соответствия выходному разделению.
Sort Merge Join: Начальная часть 'Sort Merge Join' аналогична 'Shuffle Hash Join'. Здесь также сначала два набора входных данных выравниваются в соответствии с выбранной схемой разбиения (партиционирования) выходных данных. В случае, если один или оба набора входных данных не соответствуют выбранной схеме разбиения, то для достижения соответствия используется операция перемешивания (shuffle) перед выполнением операции Join.
После того, как соответствие выбранной схеме разбиения выходных данных обеспечено для обоих входных наборов данных, Sort Merge выполняет операцию Join для каждого выходного раздела, используя стандартный подход Sort Merge Join.
'Sort Merge Join' менее эффективен в вычислительном плане по сравнению с 'Shuffle Hash Join' и 'Broadcast Hash Join', однако, требования к памяти исполнителей для выполнения 'Sort Merge Join' значительно ниже, чем для 'Shuffle Hash' и 'Broadcast Hash'. Также, как и в случае с 'Shuffle Hash Join', если входные наборы данных не соответствуют желаемому разделению на выходе, то операция перемешивания (shuffle) одного или обоих входных наборов данных, в зависимости от ситуации, увеличивает нагрузку на выполнение 'Sort Merge Join'.
Cartesian Join: Cartesian Join используется исключительно для выполнения перекрестного соединения между двумя наборами входных данных. Количество выходных разделов всегда равно произведению количества разделов входного набора данных. Каждый выходной раздел сопоставляется с уникальной парой разделов, которая состоит из одного раздела одного и другого раздела второго набора входных данных. Для каждого из выходных разделов результат вычисляется как декартово произведение данных из двух входных разделов, сопоставленных с выходным разделом.
Недостатком Cartesian Join является увеличение количества выходных разделов. Но если вам требуется перекрестное Join (соединение), Cartesian - единственный подходящий механизм.
Broadcast Nested Loop Join: В 'Broadcast Nested Loop Join' один из наборов входных данных транслируется всем исполнителям. После этого каждый раздел не транслируемого набора входных данных присоединяется к транслируемому набору с помощью стандартной процедуры Nested Loop Join для получения выходных объединенных данных.
«Broadcast Nested Loop Join» наименее эффективен с точки зрения вычислений, поскольку для сравнения двух наборов данных выполняется вложенный цикл. Кроме того, это требует большого объема памяти, поскольку один из наборов входных данных должен транслироваться всем исполнителям.
Как Spark выбирает механизм Join?
Рассмотрев важные аспекты операции Join и различные механизмы выполнения Join, давайте теперь посмотрим, как Spark выбирает тот или иной механизм:
Spark выбирает конкретный механизм для выполнения операции Join, основываясь на следующих факторах:
Параметры конфигурации
Подсказки для Join (соединения)
Размер наборов входных данных
Тип Join (соединения)
Эквивалентное или неэквивалентное соединение (Equi or Non-Equi Join)
Spark обеспечил гибкость в API Join для указания дополнительных подсказок Join с целью завершения механизма Join. Подсказки Join, такие как 'broadcast', 'merge', 'shuffle_hash' и 'shuffle_replicate_nl', могут быть предоставлены вместе с наборами данных, принимающими участие в Join.
Ниже приводится полное описание того, как Spark выбирает различные механизмы Join с учетом вышеперечисленных факторов:
'Broadcast Hash Join'
Обязательные условия
Применимо только к условию Equi Join
Не применимо к типу соединения 'Full Outer' Join
Помимо обязательного условия, одно из следующих условий должно быть истинным:
Для левого набора входных данных предоставлена подсказка 'Broadcast', а тип Join - 'Right Outer', 'Right Semi' или 'Inner'.
Подсказка не предоставлена, но левый входной набор данных является широковещательным согласно конфигурации '
spark.sql.autoBroadcastJoinThreshold
(по умолчанию 10 МБ)' и тип Join - 'Right Outer', 'Right Semi', or 'Inner'.Предоставлена подсказка 'Broadcast' для правого набора входных данных, а тип Join - 'Left Outer', 'Left Semi' или 'Inner'.
Подсказка не предоставляется, но правый входной набор данных является широковещательным согласно конфигурации '
spark.sql.autoBroadcastJoinThreshold
(по умолчанию 10 МБ)' и тип Join - 'Left Outer', 'Left Semi', or 'Inner'.Предоставлена подсказка 'Broadcast' для обоих наборов входных данных, а тип Join - 'Left Outer', 'Left Semi', 'Right Outer', 'Right Semi' или 'Inner'.
Подсказка не предоставляется, но оба входных набора данных являются широковещательными согласно конфигурации '
spark.sql.autoBroadcastJoinThreshold
(по умолчанию 10 МБ)' и тип Join - 'Left Outer', 'Left Semi', 'Right Outer', 'Right Semi' или 'Inner'.
'Shuffle Hash Join'
Обязательные условия
Применимо только к условию Equi Join
Не применимо к типу соединения 'Full Outer' Join
Конфигурация '
spark.sql.join.prefersortmergeJoin
(по умолчанию true)' имеет значение false
Помимо обязательного условия, одно из следующих условий должно быть истинным:
Предоставлена подсказка 'shuffle_hash' для левого набора входных данных, а тип Join - 'Right Outer', 'Right Semi', или 'Inner'.
Подсказка не предоставляется, но левый входной набор данных значительно меньше правого входного набора данных, а тип Join - 'Right Outer', 'Right Semi' или 'Inner'.
Подсказка 'shuffle_hash' предоставлена для правого набора входных данных, а тип Join - 'Left Outer', 'Left Semi', или 'Inner'.
Подсказка не предоставляется, но правый входной набор данных значительно меньше левого, а тип Join - 'Left Outer', 'Left Semi', или 'Inner'.
Подсказка 'shuffle_hash' предоставлена для обоих входных наборов данных, а тип Join - 'Left Outer', 'Left Semi', 'Right Outer', 'Right Semi', или 'Inner'.
Подсказка не предоставляется, но оба набора данных существенно малы, а тип Join - 'Left Outer', 'Left Semi', 'Right Outer', 'Right Semi', или 'Inner'.
'Sort Merge Join'
Обязательные условия
Применимо только к условию Equi Join
Ключи Join Keys, определенные из условия Equi Join, поддаются сортировке
Конфигурация 'spark.sql.join.prefersortmergeJoin (по умолчанию true)' имеет значение true.
Помимо обязательных условий, одно из следующих условий должно быть истинным:
Подсказка 'merge' предоставляется для любого набора входных данных, а тип Join может быть любым.
Подсказка не предоставлена, а тип Join может быть любым.
'Cartesian Join'
Обязательные условия
Тип соединения 'Inner'
Помимо обязательного условия, должно выполняться одно из следующих условий:
Подсказка 'shuffle_replicate_nl' предоставлена для любого из входных наборов данных, условие Join может быть Equi или Non-Equi.
Подсказка не предоставлена, условие Join может быть Equi или Non-Equi.
'Broadcast Nested Loop Join'
'Broadcast Nested Loop Join' - является механизмом Join по умолчанию; когда нельзя выбрать другие механизмы, тогда 'Broadcast Nested Loop Join' выбирается как окончательный механизм для выполнения любого типа Join для любого условия Join.
В случае, если более одного механизма Join становятся пригодными для выполнения, тогда выбирается предпочтительный в следующем порядке 'Broadcast Hash Join', 'Sort Merge Join', 'Shuffle Hash Join', 'Cartesian Join'.
Среди Cartesian и Broadcast Nested Loop Join, Broadcast Nested Loop предпочтительнее для Inner, Non-Equi Joins, чем Cartesian Join, в случае, когда один из входных наборов данных может быть транслирован.
И последнее, но не менее важное: партиционирование также играет очень важную роль в эффективности выполнения данного механизма Join. Чтобы узнать больше о партиционировании, вы можете обратиться к предыдущей ссылке.
Надеемся, что эта статья прояснила все ваши сомнения и замешательства в отношении выполнения Join в Apache Spark. Если какие-то вопросы еще остались, пожалуйста, напишите в разделе комментариев или отправьте мне сообщение.
Подробнее о курсе «Экосистема Hadoop, Spark, Hive»
Смотреть открытый урок «Тестирование Spark приложений»