Как Apache Spark 3.0 увеличивает производительность ваших SQL рабочих нагрузок

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Практически в каждом секторе, работающем со сложными данными, Spark "де-факто" быстро стал средой распределенных вычислений для команд на всех этапах жизненного цикла данных и аналитики. Одна из наиболее ожидаемых функций Spark 3.0 - это новая платформа Adaptive Query Execution (AQE), устраняющая проблемы, которые возникают при многих рабочих нагрузках Spark SQL. Они были задокументированы в начале 2018 года командой специалистов Intel и Baidu. Для более глубокого изучения фреймворка можно пройти наш обновленный курс по тюнингу производительности Apache Spark (Apache Spark Performance Tuning).

Наш опыт работы с Workload XM, безусловно, подтверждает реальность и серьезность этих проблем.

AQE был впервые представлен в Spark 2.4, но в Spark 3.0 и 3.1 он стал намного более развитым. Для начала, давайте посмотрим, какие проблемы решает AQE.

Недостаток первоначальной архитектуры Catalyst

На диаграмме ниже представлен вид распределенной обработки, которая происходит, когда вы выполняете простой group-by-count запрос с использованием DataFrames.

Spark определяет подходящее количество партиций для первого этапа, но для второго этапа использует по умолчанию "магическое число" - 200.

И это плохо по трем причинам:

1. 200 вряд ли будет идеальным количеством партиций, а именно их количество является одним из критических факторов, влияющих на производительность;

2. Если вы запишете результат этого второго этапа на диск, у вас может получиться 200 маленьких файлов;

3. Оптимизация и ее отсутствие имеют косвенный эффект: если обработка должна продолжаться после второго этапа, то вы можете упустить потенциальные возможности дополнительной оптимизации.

Что можно сделать? Вручную установить значение этого свойства перед выполнением запроса с помощью такого оператора:

spark.conf.set(“spark.sql.shuffle.partitions”,”2″)

Но это также создает некоторые проблемы:

  • Задавать данный параметр перед каждым запросом утомительно.

  • Эти значения станут устаревшими по мере эволюции ваших данных.

  • Этот параметр будет применяться ко всем шаффлингах в вашем запросе.

Перед первым этапом в предыдущем примере известно распределение и объем данных, и Spark может предложить разумное значение для количества партиций. Однако для второго этапа эта информация еще не известна, так как цена, которую нужно заплатить за ее получение, - это выполнение фактической обработки на первом этапе: отсюда и обращение к магическому числу.

Принцип работы Adaptive Query Execution

Основная идея AQE состоит в том, чтобы сделать план выполнения не окончательным и перепроверять статус после каждого этапа. Таким образом, план выполнения разбивается на новые абстракции «этапов запроса», разделенных этапами.

Catalyst теперь останавливается на границе каждого этапа, чтобы попытаться применить дополнительную оптимизацию с учетом информации, доступной на основе промежуточных данных.

Поэтому AQE можно определить как слой поверх Spark Catalyst, который будет изменять план Spark "на лету".

Есть недостатки? Некоторые есть, но они второстепенные:

  • Выполнение останавливается на границе каждого этапа, чтобы Spark проверил свой план, но это компенсируется увеличением производительности.

  • Пользовательский интерфейс Spark труднее читать, потому что Spark создает для данного приложения больше заданий, и эти задания не подхватывают группу заданий и описание, которое вы задали.

Адаптивное количество перемешиваемых партиций

Эта функция AQE доступна, начиная с версии Spark 2.4.

Чтобы включить ее, вам нужно установить для spark.sql.adaptive.enabled значение true, значение по умолчанию - false. Когда AQE включено, количество партиций в случайном порядке регулируется автоматически и больше не равно 200 по умолчанию или заданному вручную значению.

Вот как выглядит выполнение первого запроса TPC-DS до и после включения AQE:

Динамическая конвертация Sort Merge Joins в Broadcast Joins

AQE преобразует соединения sort-merge в broadcast хэш-соединения, если статистика времени выполнения любой из сторон соединения меньше порога broadcast хэш-соединения.

Вот как выглядят последние этапы выполнения второго запроса TPC-DS до и после включения AQE:

Динамическое объединение shuffle партиций

Если количество разделов в случайном порядке больше, чем количество групп по ключам, то много циклов ЦП теряется из-за несбалансированного распределения ключей.

Когда оба: 

·         spark.sql.adaptive.enabled и

·         spark.sql.adaptive.coalescePartitions.enabled 

установлены на true, Spark объединит смежные перемешанные разделы в соответствии с целевым размером, указанным в spark.sql.adaptive.advisoryPartitionSizeInBytes. Это делается, чтобы избежать слишком большого количества мелких задач.

Динамическая оптимизация обьединений с перекосом

Skew (перекос) - это камень преткновения распределенной обработки. Это может задержать обработку буквально на несколько часов:

Без оптимизации время, необходимое для выполнения объединения, будет определяться самым большим разделом.

Оптимизация skew join, таким образом, разобъет раздел A0 на подразделы, используя значение, указанное park.sql.adaptive.advisoryPartitionSizeInBytes, и присоединит каждый из них к соответствующему разделу B0 таблицы B.

Следовательно, вам необходимо предоставить AQE свое определение перекоса.

Это включает в себя два параметра:

1.   spark.sql.adaptive.skewJoin.skewedPartitionFactor является относительным: партиция считается с пересом, если ее размер больше, чем этот коэффициент, умноженный на средний размер партиции, а также, если он больше, чем

2.   spark.sql.adaptive.skewedPartitionThresholdInBytes, который является абсолютным: это порог, ниже которого перекос будет игнорироваться.

Динамическое сокращение разделов

Идея динамического сокращения разделов (dynamic partition pruning, DPP) - один из наиболее эффективных методов оптимизации: считываются только те данные, которые вам нужны. Если в вашем запросе есть DPP, то AQE не запускается. DPP было перенесено в Spark 2.4 для CDP.

Эта оптимизация реализована как на логическом, так и на физическом уровне.

1.   На логическом уровне фильтр размера идентифицируется и распространяется через обьединение на другую часть сканирования.

2.   Затем на физическом уровне фильтр выполняется один раз в части измерения, и результат транслируется в основную таблицу, где также применяется фильтр.

DPP в действительности может работать с другими типами обьединений (например, SortMergeJoin), если вы отключите spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly.

В этом случае Spark оценит, действительно ли фильтр DPP улучшает производительность запроса.

DPP может привести к значительному увеличению производительности высокоселективных запросов, например, если ваш запрос фильтрует данные за один месяц из выборки за 5 лет.

Не все запросы получают такой впечатляющий прирост производительности, но 72 из 99 запросов TPC-DS положительно влияют на DPP.

Заключение 

Spark прошел долгий путь от своей первоначальной базовой парадигмы: неспешного выполнения оптимизированного статического плана для статического набора данных.

Анализ статического набора данных был пересмотрен из-за потоковой передачи: команда Spark сначала создала довольно неуклюжий дизайн на основе RDD, прежде чем придумать лучшее решение с использованием DataFrames.

Часть статического плана подверглась сомнению со стороны SQL, а структура адаптивного выполнения запросов в некотором роде - это то, чем является структурированная потоковая передача для исходной потоковой библиотеки: элегантное решение, которым она должна была быть с самого начала.

Благодаря фреймворку AQE, DPP, усиленной поддержке графических процессоров и Kubernetes перспективы увеличения производительности теперь весьма многообещающие, поэтому мы и наблюдаем повсеместный переход на Spark 3.1

Источник: https://habr.com/ru/company/cloudera/blog/560246/


Интересные статьи

Интересные статьи

Чтобы повысить производительность web-приложений, используйте WebAssembly в связке с AssemblyScript, чтобы переписать критически важные для производительности компоненты web-приложени...
Для частого запуска Spark-приложений, особенно в промышленной эксплуатации, необходимо максимально упростить процесс запуска задач, а также уметь гибко настраивать их кон...
Data Science требует использования статистических методов и алгоритмов машинного обучения для работы с большим объёмом данных, и для того чтобы делать это эффективно, вам потребуется мног...
Несмотря на то, что “в коробке” с Битриксом уже идут модули как для SOAP (модуль “Веб сервисы” в редакции “Бизнес” и старше), так и для REST (модуль “Rest API” во всех редакциях, начиная с...
Решил написать статью об очень простом устройстве, сделанным за два вечера. Решило оно давнюю проблему, чему я сильно рад и использую его по сей день. Отработало исправно оно около 3 месяцев. Соб...