Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
Для будущих студентов курсов "Data Engineer" и "Экосистема Hadoop, Spark, Hive" подготовили еще один перевод полезной статьи.
Criteo — это компания, работа которой основана на данных. Каждый день через наши системы проходят десятки терабайт новых данных для обучения моделей рекомендаций, обрабатывающих запросы в масштабах всего Интернета. Spark — наше основное средство обработки больших данных. Это мощный и гибкий инструмент, однако он отличается довольно высокой сложностью в освоении, а чтобы пользоваться им эффективно, зачастую требуется читать исходный код платформы.
Быстрая обработка больших данных имеет критическое значение для нашего бизнеса:
мы часто обновляем наши модели, повышая их производительность для наших клиентов;
мы быстро выводим новые продукты на базе машинного обучения на рынок за счет того, что можем быстро выполнять итерации;
от скорости обработки данных зависят затраты на инфраструктуру.
В этой статье я расскажу о написании эффективного кода Spark и на примерах продемонстрирую распространенные подводные камни. Я покажу, что в большинстве случаев Spark SQL (Datasets) следует отдавать предпочтение перед Spark Core API (RDD), и если сделать правильный выбор, можно повысить производительность обработки больших данных в 2–10 раз, а это очень значимо.
Конфигурация для экспериментов
Spark 2.4.6, Macbook Pro 2017 с процессором Intel Core i7 с частотой 3,5 ГГц
Измерения всегда производятся на разогретой виртуальной Java-машине (выполняется 100 прогонов кода, и берется среднее значение за последние 90 прогонов). Приведенный в этой статье код написан на Scala, но ее выводы должны быть справедливыми и для Python.
Заблуждения, связанные с обработкой больших данных
Существует распространенное мнение, что в процессах обработки больших данных есть два основных узких места, влияющих на производительность:
перетасовка данных, поскольку для ее выполнения требуется отправлять данные по сети;
дисковый ввод-вывод, поскольку доступ к данным на диске всегда намного медленнее, чем доступ к данным в ОЗУ.
Эти представления имеют под собой исторические основания — в 2006 году, когда впервые появилась библиотека Hadoop, обычные жесткие диски были медленными и ненадежными, а основной платформой для обработки больших данных была MapReduce. Именно медленная работа жестких дисков и подстегнула разработку таких средств обработки в памяти, как Spark. С того времени характеристики аппаратного обеспечения значительно улучшились.
В 2015 году в исследовании Кей Остерхаут (Kay Ousterhout) и др.¹ были проанализированы узкие места в заданиях Spark, и в результате выяснилось, что скорость их выполнения в большей степени определяется операциями, загружающими ЦП, а не вводом-выводом и передачей данных по сети. В частности, авторами этой научной работы был выполнен широкий спектр запросов к трем тестовым наборам данных, включая TPC-DS², и было определено, что:
если бы пропускная способность сети была безграничной, время выполнения заданий можно было бы сократить на 2 % (медианное значение);
если бы пропускная способность дискового ввода-вывода была безграничной, время выполнения стандартного аналитического процесса можно было бы сократить на 19 % (медианное значение).
Весьма неожиданный результат! Получается, что дисковый ввод-вывод оказывает намного большее влияние на производительность, чем передача данных по сети. Этому есть несколько причин:
Spark использует дисковый ввод-вывод не только при считывании входного набора данных и записи результата, но и в ходе выполнения заданий для кэширования и переноса на диск данных, которые не умещаются в ОЗУ.
При выполнении аналитических заданий часто требуется производить агрегацию, поэтому объем данных, передаваемых по сети, обычно меньше, чем объем данных, которые первоначально считываются с диска.
Интересно, что специалисты Databricks примерно в 2016 году³ пришли к таким же заключениям, что заставило их переориентировать вектор развития Spark на оптимизацию использования процессора. Результатом стало внедрение поддержки SQL, а также API DataFrames и позднее Datasets.
Насколько быстро работает Spark?
Давайте рассмотрим простую задачу — посчитаем наивным методом четные числа от 0 до 10⁹. Для выполнения такого задания Spark, в принципе, не требуется, поэтому для начала напишем простую программу на Scala:
var res: Long = 0L
var i: Long = 0L
while (i < 1000L * 1000 * 1000) {
if (i % 2 == 0) res += 1
i += 1L
}
Листинг 1. Наивный подсчет
А теперь давайте также вычислим этот же результат с помощью Spark RDD и Spark Datasets. Чтобы эксперимент был честным, я запускаю Spark в локальном[1] режиме:
val res = spark.sparkContext
.range(0L, 1000L * 1000 * 1000)
.filter(_ % 2 == 0)
.count()
Листинг 2. Подсчет с помощью RDD
val res = spark.range(1000L * 1000 * 1000)
.filter(col("id") % 2 === 0)
.select(count(col("id")))
.first().getAs[Long](0)
Листинг 3. Подсчет с помощью Datasets
Время выполнения всех фрагментов кода приведено ниже. Неудивительно, что написанный вручную код является самым эффективным решением. Удивительно же то, что RDD в пять раз медленнее, тогда как у Datasets время вычисления почти такое же, как у написанного вручную кода.
Парадокс Datasets
Парадокс: API-интерфейс Datasets построен на основе RDD, однако работает намного быстрее, почти так же быстро, как код, написанный вручную для конкретной задачи. Как такое вообще возможно? Дело в новой модели выполнения.
Прошлое — модель Volcano
Код, написанный с использованием RDD, выполняется с помощью модели выполнения Volcano. На практике это означает, что каждый RDD следует стандартному интерфейсу:
знает свой родительский RDD;
предоставляет посредством метода
compute
доступ к итератору Iterator[T], который перебирает элементы данного RDD (он является private и должен использоваться только разработчиками Spark).
abstract class RDD[T: ClassTag]
def compute(…): Iterator[T]
Листинг 4. RDD.scala
С учетом этих свойств упрощенная версия реализации функции подсчета для RDD, которая игнорирует разбиение, выглядит вот так:
def pseudo_rdd_count(rdd: RDD[T]): Long = {
val iter = rdd.compute
var result = 0
while (iter.hasNext) result += 1
result
}
Листинг 5. Псевдокод для действия подсчета на основе RDD
Почему этот код работает значительно медленнее, чем написанный вручную код, который приведен в листинге 1? Есть несколько причин:
Вызовы итераторов виртуальной функцией: вызовы Iterator.next() несут дополнительную нагрузку по сравнению с функциями, не являющимися виртуальными, которые могут выполняться компилятором или JIT как встроенные (inline).
Отсутствие оптимизации на уровне ЦП: виртуальная Java-машина и JIT не могут оптимизировать байт-код, образуемый листингом 5, так же хорошо, как байт-код, получаемый при использовании листинга 1. В частности, написанный вручную код позволяет виртуальной Java-машине и JIT хранить промежуточные результаты вычислений в регистре ЦП, а не помещать их в основную память.
Настоящее — формирование кода всего этапа
Код, написанный с помощью Spark SQL⁵, выполняется не так, как код, написанный с использованием RDD. Когда запускается действие, Spark генерирует код, который сворачивает несколько трансформаций данных в одну функцию. Этот процесс называется формированием кода всего этапа (Whole-Stage Code Generation)⁶. Spark пытается воспроизвести процесс написания специального кода для конкретной задачи, в котором не используются вызовы виртуальных функций. Такой код может выполняться JVM/JIT более эффективно. На самом деле Spark генерирует довольно много кода, см., например, код Spark для листинга 3.
Технически Spark только формирует высокоуровневый код, а генерация байт-кода выполняется компилятором Janino⁴. Именно это и делает Spark SQL настолько быстрым по сравнению с RDD.
Эффективное использование Spark
Сегодня в Spark есть 3 API-интерфейса Scala/Java: RDD, Datasets и DataFrames (который теперь объединен с Datasets). RDD все еще широко применяется в Spark — в частности, из-за того, что этот API используется большинством созданных ранее заданий, и перспектива «продолжать в том же духе» весьма заманчива. Однако, как показывают тесты, переход на API-интерфейс Datasets может дать громадный прирост производительности за счет оптимизированного использования ЦП.
Неправильный подход — классический способ
Самая распространенная проблема, с которой я сталкивался при использовании Spark SQL, это явное переключение на API RDD. Причина состоит в том, что программисту зачастую проще сформулировать вычисление в терминах объектов Java, чем с помощью ограниченного языка Spark SQL:
val res = spark.range(1000L * 1000 * 1000)
.rdd
.filter(_ %2 == 0)
.count()
Листинг 6. Переключение с Dataset на RDD
Этот код выполняется в течение 43 секунд вместо исходных 2,1 секунды, при этом делая абсолютно то же самое. Явное переключение на RDD останавливает формирование кода всего этапа и запускает преобразование элементов наборов данных из примитивных типов в объекты Java, что оказывается очень затратным. Если мы сравним схемы этапов выполнения кода из листингов 3 и 6 (см. ниже), то увидим, что во втором случае появляется дополнительный этап.
Неправильный подход — изысканный способ
Производительность Spark SQL является на удивление хрупкой. Это незначительное изменение приводит к увеличению времени выполнения запроса в три раза (до 6 секунд):
val res = spark
.range(1000L * 1000 * 1000)
.filter(x => x % 2 == 0) // note that the condition changed
.select(count(col("id")))
.first()
.getAs[Long](0)
Листинг 7. Замена выражения Spark SQL функцией Scala
Spark не способен генерировать эффективный код для условия в фильтре. Условие является анонимной функцией Scala, а не выражением Spark SQL, и Spark выполнит десериализацию каждой записи из оптимизированного внутреннего представления, чтобы вызвать эту функцию. Причем вот что примечательно — это изменение никак не сказывается на визуальном представлении этапов (рис. 1a), поэтому его невозможно обнаружить, анализируя направленный ациклический граф (DAG) задания в пользовательском интерфейсе Spark.
Высокая производительность Spark SQL обеспечивается за счет ограничения круга доступных операций — чем-то все равно приходится жертвовать! Чтобы получить максимальную производительность, нужно использовать преобразования, которые работают со столбцами: используйте filter(condition: Column) вместо filter(T => Boolean) и select(…) вместо map(…). При этом Spark не придется перестраивать объект, представленный одной строкой набора данных (Dataset). И, разумеется, избегайте переключения на RDD.
Заключение и итоговые замечания
Приведенные в этой статье простые примеры демонстрируют, что большая часть времени выполнения заданий обработки больших данных не тратится на полезную работу. Хорошим решением этой проблемы является компиляция запросов, которая возможна с использованием Spark SQL и обеспечивает более эффективное использование современного аппаратного обеспечения. Последние исследования свидетельствуют, что использование эффективных запросов для стандартных процессов обработки больших данных важнее, чем оптимизация использования сети и дискового ввода-вывода.
Правильное применение компиляции запросов может сократить время обработки в 2–10 раз, а это означает ускорение экспериментов, снижение затрат на инфраструктуру и громадное удовольствие от элегантного выполнения своей работы!
Образцы кода из этой статьи можно найти здесь. С помощью этого репозитория можно анализировать производительность разных запросов Spark.
Использованные материалы
Ousterhout, Kay, et al. Making sense of performance in data analytics frameworks (Анализ производительности платформ анализа данных). 12-й симпозиум {USENIX} по проектированию и реализации сетевых систем ({NSDI} 15). 2015.
http://www.tpc.org/tpcds/
https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
4. https://janino-compiler.github.io/janino/
5. http://people.csail.mit.edu/matei/papers/2015/sigmodsparksql.pdf
6. https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html
Узнать подробнее о курсах "Data Engineer" и "Экосистема Hadoop, Spark, Hive".