Всем привет!
В этой статье возьмем за основу пару таблиц и пройдемся по планам запросов по нарастающей: от обычного селекта до джойнов, оконок и репартиционирования. Посмотрим, чем отличаются виды планов друг от друга, что в них изменяется от запроса к запросу и разберем каждую строчку на примере партиционированной и непартиционированной таблицы.
Исходные данные
Будем работать с 3 таблицами:
1) campaigns – партиционирована по полю loading_id
2) campaigns_not_partitioned – точно такая же таблица, но не партиционирована
3) stats – таблица, с которой будем тестить джойны
Коротко про виды планов
Всего существует 4 плана:
Parsed Logical Plan – план после парсинга ячейки с кодом, отлавливает синтаксические ошибки
Analyzed Logical Plan – план после семантического анализа, подтягиваются конкретные таблички и столбцы с типами данных
Optimized Logical Plan – оптимизации к предыдущему плану, например, упрощает лишние операции для повышения производительности
Physical Plan – как физически будет исполнен запрос на кластере, конкретные файлики, пути, форматы, партиции и т. д.
Далее мы будем смотреть только на физический план, т.к. он отражает фактический алгоритм обработки данных. Чтобы чтение было комфортным, есть оглавление с быстрыми ссылками на сами запросы.
Погнали!
Оглавление
Самый обычный селект
Самый обычный фильтр
Селект одного столбца
Селект одного столбца + фильтр
Кэширование
Переименование, добавление нового столбца
Селект с функциями
Группировка
Distinct
Sort
Агрегирующие функции
DropDuplicates
Window functions
Union
Join
Repartition
Комплексные условия
1. Самый обычный селект
Что ж, пойдемте изучать план:
1) FileScan parquet
Наша таблица читается из схемы, перечисляются все поля, формат данных – паркет, который хранится на hdfs по указанному пути, партиций нет (в данном случае берем все), фильтров нет, указывается схема данных.
2) ColumnarToRow
Исходные данные хранятся в паркет-файлах поколоночно, но в спарке датафрейм по сути содержит множество строк, поэтому нам нужно преобразование колонок в строки.
Напомню, что первая таблица была партиционированной по полю loading_id. Теперь посмотрим на непартиционированную таблицу:
Что поменялось?
Location вместо CatalogFileIndex стал InMemoryFileIndex.
CatalogFileIndex используется, когда мы читаем партиционированную таблицу полностью.
InMemoryFileIndex используется, когда мы читаем непартиционированную таблицу или отдельные партиции.
К оглавлению⬆️
2. Самый обычный фильтр
Как мы видим, заполнилось поле PartitionFilters – мы отобрали конкретные партиции. Вообще оптимизатор Catalyst в фильтрах всегда добавляет еще один – isnotnull, т.к., чтобы отфильтровать по конкретным значениям, поле точно должно быть не пустым.
В непартиционированной таблице:
PartitionFilters переместились на уровень DataFilters и PushedFilters.
DataFilters – это фильтры на непартиционированных столбцах.
PushedFilters – фильтры, которые мы можем пробросить на уровень источника данных и применить прямо при чтении файла.
При этом эти два параметра могут не совпадать. Здесь я добавила чисто технический фильтр:
В DataFilters фильтр с concat() есть, а в PushedFilters уже нет, потому что мы не можем применить эту сложную конструкцию на источнике.
Также в план запроса добавляется еще один степ – Filter. Он нужен, чтобы окончательно отфильтровать данные по указанным условиям. Потому что на этапе PushedFilters мы берем не нужные строки, а мы берем файлы, содержащие нужные строки. Соответственно, в них запросто может попасть что-то лишнее.
К оглавлению⬆️
3. Селект одного столбца
В случае партиционированной таблицы мы всегда будем таскать за собой поле партиционирования, а потом выполнять Project – это и есть select.
К оглавлению⬆️
4. Селект одного столбца + фильтр
Помимо разницы между PartitionFilters и PushedFilters, которые мы уже обсудили в п. 2, еще добавилась разница в параметре ReadSchema. Если таблица партиционирована, то поле партиционирования не хранится в виде столбца в данных, оно выносится на уровень файловой системы: поэтому в первом пути есть папка /loading_id=40678148. Во втором случае loading_id хранится прямо в файле, поэтому нам нужно сначала его достать.
К оглавлению⬆️
5. Кэширование
Здесь добавились 2 операции: InMemoryRelation, InMemoryTableScan, которые всегда будут сопровождать любые трансформации с закэшированной таблицей.
К оглавлению⬆️
Итак, самые подкапотные штуки мы посмотрели, различий между таблицами больше не будет. Далее я буду вставлять только сами действия, чтобы не забивать экран однотипной информацией, а набор колонок, схемы и прочее оставлю за скобками.
6. Переименование, добавление нового столбца
Обе пары запросов приводят к одному результату, операции выполняются на этапе Project:
К оглавлению⬆️
7. Селект с функциями, case when
Применение функций, case when так же, как и изменение нейминга полей, происходит на этапе Project.
К оглавлению⬆️
8. Группировка
Так как для группировки нам нужно только одно поле account_id, а спарк на каждый последующий этап по возможности хочет передать как можно меньше данных, то в FileScan мы берем только нужные поля. Операция Project появилась из-за того, что мы еще тянем за собой поле партиционирования.
Теперь посмотрим на новые операции:
HashAggregate – агрегация, keys – поля группировки, functions – агрегирующая функция. Здесь используется partial_count, потому что спарк старается делать агрегацию в 2 подхода:
1 – агрегация в рамках каждой партиции.
Exchange hashpartitioning – это шафл, 200 – количество партиций после шафла. Все одинаковые ключи собираются в рамках одной партиции на основе вычисления хеш-кода, и происходит обмен парами ключ-partial_count.
2 – вторая итерация агрегации, суммируются все partial_count.
К оглавлению⬆️
9. Distinct
Единственное отличие distinct от groupBy – это отсутствие агрегирующей функции.
К оглавлению⬆️
10. Sort
Exchange rangepartitioning – на этом этапе происходит распределение данных на основе диапазона. Например, если account_id от 1 до 100 и мы хотим поделить на 3 партиции, то account_id с 1 по 33 попадут в первую партицию, с 34 по 66 – во вторую и далее по аналогии.
К оглавлению⬆️
11. Агрегирующие функции
Здесь мы видим новый стейдж – SortAggregate. Он используется, когда HashAggregate невозможен из-за ограничений по памяти или когда он не поддерживает агрегирующие функции или ключи (например, неизменяемые типы данных, а у нас в примере оба поля типа string). Этот метод включает предварительную сортировку, поэтому работает медленнее. SortAggregate так же, как и HashAggregate, выполняется в 2 подхода: до шафла локально на каждом маппере и после шафла.
И так как мы считаем минимальное значение, то используются соответствующие функции partial_min и min.
Попробуем убрать группировку:
У нас изменился один из этапов: появился Exchange SinglePartition. Это означает, что все данные перемещаются в одну партицию и будут обработаны на одном ядре. Он используется при вычислении, например, min, max, avg или с оконкой без ключа партиционирования (плохо!).
К оглавлению⬆️
12. DropDuplicates
Функция dropDuplicates при наличии дубликатов по умолчанию оставляет первый элемент, поэтому сначала считается partial_first в рамках каждой партиции, а после шафла first для каждого ключа. Все как всегда.
К оглавлению⬆️
13. Window functions
Кажется, что с учетом предыдущих пунктов тут уже все довольно просто: нам не нужно предварительно агрегировать или сортировать, поэтому мы начинаем сразу с этапа шафла. Затем выполняется часть с .orderBy(), рассчитывается оконная функция, и берется указанная выборка полей. Все остальные оконки аналогичны, меняется только этап Window.
К оглавлению⬆️
14. Union
Оба запроса приводят к одному результату, считываются две таблицы, которые затем объединяются на этапе Union:
К оглавлению⬆️
15. Join
SortMergeJoin
SMJ работает, когда есть условие на равенство и когда ключи сортируемы.
Что происходит?
Filter isnotnull: так как у нас inner join, то ключи содержать null не могут, поэтому спарк фильтрует как можно раньше, чтобы обрабатывать меньшее количество данных.
Небольшая табличка по типам джойнов:
Тип джойна | Наличие левого фильтра | Наличие правого фильтра | Комментарий |
inner | + | + | оба ключа не null |
left | - | + | левая таблица может содержать null |
right | + | - | правая таблица может содержать null |
full | - | - | обе таблицы могут содержать null |
Exchange hashpartitioning – оба датафрейма репатиционируются в 200 партиций по ключам джойна.
Sort – сортировка внутри партиции по ключам джойна.
SortMergeJoin – в цикле обходится каждая пара партиций, и с помощью сравнения левого и правого ключей соединяются строки с одинаковыми ключами.
ShuffledHashJoin
SHJ работает только при наличии хинта, т.к. по умолчанию используется SMJ. В данном случае у нас отсутствует этап Sort и изменяется вид джойна.
Exchange hashpartitioning – датафреймы с одинаковым ключом джойна перемещаются на один экзекьютор.
ShuffledHashJoin – на экзекьюторе создается хеш-таблица для меньшего датафрейма, где ключ – это кортеж из полей джойна (в нашем примере id). Затем происходит итерация по большему датафрейму внутри каждой партиции, и проверяется наличие ключей в хеш-таблице.
BroadcastHashJoin
BHJ работает, когда есть условие на равенство и когда один из датафреймов небольшой и полностью помещается в память экзекьютора.
BroadcastExchange – это копирование правого датафрейма на каждый экзекьютор.
Hash join происходит аналогично, основное различие – в использовании стратегии обмена данными: шафл для SHJ и броадкаст для BHJ.
BroadcastNestedLoopJoin
BNLJ работает, когда есть условие на неравенство и когда один из датафреймов небольшой и полностью помещается в память экзекьютора.
BroadcastNestedLoopJoin – во вложенном цикле проходимся по элементам каждой партиции левого датафрейма и копии правого датафрейма и проверяем условие.
CartesianProduct
CPJ работает, когда есть условие на неравенство, но BNLJ не может быть применен.
CartesianProduct – спарк создает пары из каждой партиции левого датафрейма с каждой партицией правого датафрейма. Затем перемещает каждую пару на один экзекьютор и проверяет условие джойна.
К оглавлению⬆️
16. Repartition
Напоследок посмотрим на еще один вид шафла – Exchange RoundRobinPartitioning. Именно этот алгоритм позволяет получить партиции примерно одного размера: он равномерно распределяет данные и предотвращает перекосы (data skew).
К оглавлению⬆️
17. Комплексные условия
Совмещаем все!
(ну, почти)
Надеюсь, здесь вам все понятно)
На этом все, спасибо за прочтение!
Контакты: дата инженеретта