PySpark для аналитика. Как выгружать данные с помощью toPandas и его альтернатив

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Привет! Меня зовут Александр Ледовский. Я тимлид команды аналитики и DS, строю рекламные аукционы в Авито. В работе мы активно используем Apache Spark. Одна из типовых задач аналитика — посчитать что-то на pySpark, а потом выгрузить это. Например:

  • маленькую табличку в экселе, чтобы сделать отчёт или презентацию;

  • большую таблицу в экселе или csv, чтобы отправить коллегам — до нескольких Гб;

  • большой датасет для обучения ML-модели — до 100 Гб.

Данная статья о том, как это правильно делать. В том числе, как правильно использовать функцию toPandas вместе с библиотекой pyArrow, когда toPandas использовать не стоит и какие ещё есть варианты.

Немного о toPandas

toPandas — основной инструмент получения семплов в PySpark и основной инструмент аналитика для просмотра данных. Вероятно, большинство из вас пользуется функцией df.limit(10).toPandas(). Она же подходит, чтобы сохранить таблицу в Excel. Например:

df = spark.read.orc('path_to_your_dataset')
pdf = df.toPandas()
pdf.to_excel('my_dataset.xlsx', index=False)

Всё нормально, если работать с небольшими датасетами. Когда они становятся большими, начинаются проблемы: toPandas тормозит, падает с ошибками.

Другая проблема: toPandas загружает данные в оперативную память Jupyter notebook. Если сделать toPandas большого датасета, это займёт много ресурсов и повредит работе целого сервера. А там ещё и могут работать коллеги.

Разберём, как правильно работать с toPandas.

Включите Arrow-оптимизацию

TL;DR Есть настройка, которая на порядок увеличит скорость работы toPandas. Она называется spark.sql.execution.arrow.pyspark.enabled

PySpark состоит из параллельно работающих JVM- (Java Virtual Machine) и Python-процессов. Подробнее об этом я писал в статье про выделение ресурсов →

Python и Java обмениваются друг с другом данными. Но из-за того, что каждый хранит их в своём формате, это происходит не быстро.

Arrow — формат хранения данных в памяти. Эта библиотека поддерживается одновременно и в Python, и в Java/Spark-процессах. Формат поддерживает быструю передачу данных по сети. Прирост скорости оказывается огромным по сравнению со стандартным механизмом обмена данных между Python и Java.

Чтобы включить Arrow-оптимизацию, нужно прописать параметры в конфиге:

app_name = "Your App"

spark_conf = {
    # another params

		# если у вас спарк >= 3.0
    'spark.sql.execution.arrow.pyspark.enabled': 'true',

	  # если у вас спарк < 3.0
		'spark.sql.execution.arrow.enabled': 'true'
}


builder = (
    SparkSession
    .builder
    .appName(app_name)
)

for k, v in spark_conf.items():
    builder.config(k, v)
    
spark = builder.getOrCreate()
spark.sql.execution.arrow.enabled

Подробная документация по использованию Arrow с PySpark →

Ограничения Arrow и как их обходить

У Arrow есть только один недостаток. Он не поддерживает некоторые типы данных. Вот выдержка из документации (на момент версии 3.3.2):

Currently, all Spark SQL data types are supported by Arrow-based conversion except ArrayType of TimestampType, and nested StructType. MapType is only supported when using PyArrow 2.0.0 and above.

Если использовать toPandas без Arrow, то в колонке получатся довольно неприятные питоновские типы. Например, в случае с MapType — pyspark.sql.types.Row

В случае со свежим Arrow MapType даст обычный dict. А при вложенной структуре вы получите предупреждение и pySpark откатится к неоптимизированному варианту:

UserWarning: toPandas attempted Arrow optimization because ...

Если у вас большой датасет, а отказываться от Arrow не хочется, можно использовать строковый формат:

pdf = (
    spark.read.orc()
    .select('col1', 'col2', F.to_json('bad_col_3').alias('bad_col_3'))
    .toPandas()
)
pdf['bad_col_3'] = pdf['bad_col_3'].apply(lambda x: json.loads(x))

Бенчмарки toPandas с Arrow-оптимизацией и без

Для сравнения я получил бенчмарки на одном из наших датасетов. Используемые ресурсы Spark: 60 экзекьюторов по 2 ядра.

Количество данных

Репартиционирование

Arrow

no Arrow

4m

Нет

9s

10s

40m

Нет

9s

15s

400m

Нет

9s

1m 10s

4g

Да

46s

11m 48s

16g

Да

2m 29s

45m 23s

40g

Да

6m 9s

??

Бенчмарки показывают, что Arrow дает прирост скорости примерно в 10 раз.

Я, конечно, показал, что можно скачать себе 40 Гб, но делать так не рекомендую, это довольно много. Во-первых, вам стоит предварительно согласовать с вашими дата-инженерами, что вы будете работать с большими датасетами на одной ноде. Во-вторых, большие датасеты надёжнее и безопаснее скачивать в виде файлов.

Используйте sample вместо limit для семплов

При выгрузке данных приходится формировать семплы. Если бы данные можно было выкачать целиком, то Spark не был бы нужен.

Для семплов мы часто используем limit. Но его можно использовать только для небольших семплов, примерно до 100 строк. Нам нравится limit, потому что мы можем чётко указать количество строк, которые хотим получить. Это удобно, но такую операцию непросто сделать параллельной.

Возьмём пример запроса:

df = spark.read.orc('some_path').limit(10).toPandas()

Запустим запрос и посмотрим его план на вкладке SQL в SparkUI. Подробнее о том, как смотреть план запроса, я расскажу в одной из следующих статей.

+- == Initial Plan ==
   Project (11)
   +- GlobalLimit (10)
      +- Exchange (9)
         +- LocalLimit (8)
            +- Scan orc  (1)

Как работает limit:

  • LocalLimit — limit(10) выполняется на каждой партиции ваших данных;

  • Exchange — результат сливается вместе в одну партицию (операция Exchange);

  • GlobalLimit — у результата опять выполняется limit(10)

То есть, если у вас 200 партиций и вы сказали limit(1000), то Spark соберет 200 семплов по 1000 строк, объединит их, выделит из них 1000 строк и отдаст вам.

Sample работает по-другому. Вы указываете долю строк, которую хотите получить в семпле. Эта операция параллелится и работает эффективно.

df.sample(fraction=0.01, seed=42).toPandas()

Умейте исправлять ошибки нехватки памяти при toPandas

Ошибка нехватка памяти на драйвере

Все данные toPandas идут в Python через JVM-процесс драйвера. Нужно, чтобы он смог принять весь объём данных. Если вам не хватит памяти, то вы получите следующую ошибку

23/03/06 22:19:05 ERROR TaskSetManager: Total size of serialized results of 1 tasks (1536.5 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
/var/jhub/venv/lib/python3.9/site-packages/pyspark/sql/pandas/conversion.py:201: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in the middle of computation.
  An error occurred while calling o75.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:97)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:93)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (1536.5 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
...
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:68)

Решение. Увеличивать параметры spark.driver.memory и spark.driver.maxResultSize

Ошибка нехватки памяти на экзекьюторе

toPandas требует больше памяти, чем занимают ваши данные, поэтому может получиться так, что вам не хватит памяти на экзекьюторе. Например, вы можете увидеть ошибки в логах драйвера:

23/03/14 13:45:10 ERROR YarnScheduler: Lost executor 2 on avi-ix-spark41.msk.avito.ru: Container from a bad node: container_1676537640029_9205_01_000003 on host: avi-ix-spark41.msk.avito.ru. Exit status: 143. Diagnostics: [2023-03-14 13:45:08.771]Container killed on request. Exit code is 143
[2023-03-14 13:45:08.771]Container exited with a non-zero exit code 143. 
[2023-03-14 13:45:08.772]Killed by external signal

В логах экзекьютора:

2023-03-14 13:44:50,112 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
2023-03-14 13:44:50,114 ERROR executor.Executor: Exception in task 51.0 in stage 5.0 (TID 266)
java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
	at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1853)
	at java.io.ObjectOutputStream.write(ObjectOutputStream.java:709)
	at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:244)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:53)
	at org.apache.spark.scheduler.DirectTaskResult$$Lambda$801/1910985021.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1470)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:657)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Подробнее о том, как смотреть логи экзекьютора, я расскажу в одной из следующих статей.

Решение. Не стоит сразу увеличивать память экзекьюторов. Скорее всего, у вас большие партиции. Сделайте репартиционирование:

# Было
spark.read.orc().toPandas()

# Стало
spark.read.orc().repartition(200).toPandas()

Выгружайте большие датасеты консольной утилитой HDFS DFS

В моих бенчмарках на toPandas вы видели, что я смог быстро выгрузить 40 Гб в локальный процесс. Но выгрузка больших файлов ненадёжна. И самое главное: вы можете навредить серверу и всем людям и процессам на нём.

Я советую использовать для выгрузки больших датасетов консольную утилиту HDFS. Алгоритм состоит из следующих шагов:

  1. Сохранить ваш датасет на HDFS в csv-формате, несколькими файлами по несколько Гб каждый.

  2. Скачать их на локальную файловую систему через консоль: в папку с вашим ноутбуком или куда-нибудь рядом.

  3. Прочитать датасет через pandas батчами.

df.repartition(10).write.mode('overwrite').csv('path_in_hdfs')
hdfs dfs -get path_in_hdfs .
path_list = os.listdir(local_path)
# pdfs = []
for path in path_list:
    pdf = pd.read_csv(path)
    # do something
.   # pdfs.append(pdf)

Существует питоновские библиотеки для работы с HDFS (например, HdfsCLI), но я не рекомендую ими пользоваться, т.к. они используют WebHDFS, который работает менее эффективно, чем нативный драйвер HDFS, который используется в консольной утилите.

Рекомендации

Я рассказал про подходы к работе с toPandas и альтернативный способ выгрузки данных через консоль. Подведём итоги:

  • если вы выгружаете только маленькие таблички для презентаций, то материалы этой статьи вам не понадобятся;

  • если вам периодически нужно выгрузить семплы от 100 Мб до нескольких Гб, то вам стоит использовать toPandas с включенным Arrow;

  • если вам необходимо выгрузить большой датасет, то лучше не лениться и сохранить его через HDFS и консольную утилиту. Используйте toPandas, только если знаете, что делаете.

Немного обо мне

Я занимаюсь разработкой аукционных механик в компании Авито, в том числе автобиддингом. Этот функционал используется в алгоритмах продвижения. Для обработки сложных вложенных структур поисковых логов мы в основном используем Spark, хотя основной DWH в Авито построен на Vertica и ClickHouse. До этого я работал в Сбере, где создал дата-команду для трайба малого и микробизнеса с собственным промышленным Hadoop-кластером.

Периодически я делюсь своими инсайтами и впечатлениями в своём телеграм-канале https://t.me/big_ledovsky. Буду рад ответить на вопросы по статье и вообще обсудить Apache Spark и алгоритмы анализа данных.

Предыдущая статья: Как я очень захотел перейти из фронтенда в бэкенд — и перешёл

Источник: https://habr.com/ru/companies/avito/articles/740232/


Интересные статьи

Интересные статьи

Kubernetes (k8s) является де-факто стандартом для развёртывания приложений. В терминах K8s приложение представляет собой набор сконфигурированных ресурсов нескольких типов — pod, service, deployment, ...
Выпускник курса Python для инженеров Денис Алексеев рассказал о своем проекте, который защитил на итоговой презентации.Проект помогает упростить экспорт статистики из сообществ в социальных сетях и по...
Практическое руководство по написанию лаконичного кода и повторному использованию вспомогательных функций JS в проектах.
Привет, Хабр! Меня зовут Екатерина Герт. Вот уже больше 10 лет я работаю системным аналитиком в проектах по заказной разработке ПО для компаний из разных отраслей и госсе...
Как показывает практика, декларируемый образ и реальный характер взаимодействия владельцев VPN-сервисов со спецслужбами различных стран могут иметь существенные различия....