Работаем с большими наборами данных в Spark3.2.0 с использованием Pandas

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Благодаря недавнему релизу spark3.2.0 у нас появилась возможность масштабировать данные с помощью pandas.

1. Введение

13 октября 2021 г. команда Apache Spark зарелизила spark3.2.0. На этот раз в Spark, помимо все прочего, был добавлен API Pandas. Pandas — мощный и хорошо известный среди дата-сайентистов пакет. Однако у Pandas есть свои ограничения при работе с большими объемами данных, потому что он обрабатывает данные на одной машине. Несколько лет назад databricks выпустили библиотеку ‘Koalas’, чтобы решить эту проблему.

Добавление API Pandas в spark3.2.0 избавляет нас от необходимости использовать сторонние библиотеки. Пользователи Pandas теперь могут не отказывать себе в удовольствии использовать Pandas и масштабировать процессы до многоузловых кластеров Spark.

2. Цель

В этой статье говорится непосредственно о способах использования API Pandas в Spark:

  • Чтение данных в виде датафреймов pandas-spark;

  • Чтение данных в виде датафреймов spark и преобразование в датафреймы pandas-spark;

  • Создание датафреймов pandas-spark;

  • Применение SQL-запросов непосредственно к датафреймам pandas-spark;

  • Построение графиков на основе датафреймов pandas-spark;

  • Переход от koalas к API pandas в Spark.

3. Данные

CSV-файл и Jupyter Notebook, упомянутые в этой статье, можно найти на моей странице GitHub. Наборы данных там небольшие, однако проиллюстрированные здесь подходы могут быть применимы в больших наборах.

4. Требуется установка

Прежде чем продолжить, сначала скачайте spark3.2.0 (установочный файл можно найти здесь) и правильно настройте PySpark. Вам также понадобятся библиотеки pyarrow и plotly, которые можно установить через интерфейс jupyter notebook, как показано ниже:

  • pyarrow (!conda install -c conda-forge — yes pyarrow)

  • plotly (!conda install — yes plotly)

Прекрасно! Если ваш PySpark готов к труду и обороне, то давайте перейдем к следующему разделу.

5. Импорт библиотек и запуск сессий Spark

Теперь начнем импортировать PySpark и запустим сессию с помощью блока кода, приведенного ниже.

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
spark

Spark info сообщает нам, что используется версия 3.2.0.

Изображение 1. Spark info.
Изображение 1. Spark info.

Также будет не лишним проверить версию python и pyspark, как показано ниже. В моем случае, я использовал Spark версии 3.2.0 и python версии 3.8.8.

print('Version of python: ') 
!python -V
print('Version of pyspark :', pyspark.__version__)
Изображение 2: Используемые версии.
Изображение 2: Используемые версии.

Хорошо! Теперь с помощью pyspark.pandas импортируем функцию read_csv для чтения данных CSV файла в виде датафрейма pandas-spark.

Если появится варнинг, как показано на изображении 3, то можно перед запуском pyspark.pandas import read_csv установить для переменной среды (т.е. PYARROW_IGNORE_TIMEZON) значение 1.

from pyspark.pandas import read_csv
Изображение 3: Варнинг при импорте pyspark.pandas.
Изображение 3: Варнинг при импорте pyspark.pandas.
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
sparkprint('Version of python: ') 
!python -V
print('Version of pyspark :', pyspark.__version__)from pyspark.pandas import read_csv
# To get rid of error set the environ variable as below
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"from pyspark.pandas import read_csv


# Чтобы избавиться от сообщения об ошибке, установите значение для переменной среды,как показано ниже
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"

from pyspark.pandas import read_csv

6.1 Чтение данных из csv-файла в виде датафрейма pandas-spark 

Для того, чтобы продемонстрировать различные варианты использования API pandas spark, мы воспользуемся файлом ‘example_csv.csv’. Функция read_csv возвращает датафрейм pandas-spark (назовите его: psdf).

# Читаем в качестве датафрейма pandas-spark  
datapath = '/Users/...../'
psdf = read_csv(datapath+'example_csv.csv')
psdf.head(2)
Изображение 4: Датафрейм Pandas-Spark.
Изображение 4: Датафрейм Pandas-Spark.

Замечательно! Мы только что создали с вами датафрейм pandas-spark и теперь можем перейти к использованию функций pandas для выполнения последующих задач. Например, psdf.head(2) и psdf.shape можно использовать для получения двух верхних строк и размерности данных соответственно. В отличие от стандартного датафрейма pandas в python, здесь мы располагаем очень крутой возможностью распараллеливания, что является неоспоримым преимуществом.

# получаем тип данных
# получаем размерность данных
# получаем имена столбцов данных

print('Data type :', type(psdf))
print('Data shape :', psdf.shape)
print('Data columns : \n ', psdf.columns)
Изображение 5. Использование функций pandas на датафремах pandas-spark.
Изображение 5. Использование функций pandas на датафремах pandas-spark.

Более того, если вы хотите преобразовать датафрейм pandas-spark в датафрейм spark, это можно осуществить с помощью функции to_spark(). На выходе мы получим датафрейм spark (назовите его: sdf), и теперь можем использовать все функции pyspark на этом датафрейме. Например, sdf.show(5) и sdf.printSchema() выводят пять верхних строк и схему данных датафрейма spark соответственно.

# Преобразование из датафрейма pandas-spark в датафрейм spark 

# Вывод пяти верхних строк датафрейма spark 
sdf = psdf.to_spark()
sdf.show(5)
Изображение 6: Вывод пяти верхних строк датафрейма spark. 
Изображение 6: Вывод пяти верхних строк датафрейма spark. 
# Вывод схемы
sdf.printSchema()
Изображение 7: Вывод схемы датафрейма spark.
Изображение 7: Вывод схемы датафрейма spark.

6.2. Чтение данных из csv-файла в виде датафрейма spark и их преобразование в датафрейм pandas-spark

Мы можем преобразовать датафрейм spark в датафрейм pandas-spark с помощью команды to_pandas_on_spark(). Она принимает на вход датафрейм spark, на выходе мы получаем датафрейм pandas-spark (как вы могли и сами догадаться). Ниже, мы читаем данные в виде датафрейма spark (назовем его: sdf1). Чтобы подтвердить, что это датафрейм spark, мы можем использовать type(sdf1), который определяет, что это точно датафhейм spark, т.е. ‘pyspark.sql.dataframe.DataFrame’.

# Чтение данных с помощью spark
sdf1 = spark.read.csv(datapath+'example_csv.csv', header=True,inferSchema=True)
type(sdf1)
Изображение 8. Результирующий тип - датафрейм spark. 
Изображение 8. Результирующий тип - датафрейм spark. 

После преобразования в датафрейм pandas-spark (psdf1) результирующим типом будет “pyspark.pandas.frame.DataFrame”. Мы можем использовать функцию pandas, например, .head(), чтобы убедиться, что это все таки датафрейм pandas-spark.

# Преобразование в датафрейм pandas-spark
psdf1 = sdf1.to_pandas_on_spark()
# Вывод двух верхних строк
psdf1.head(2)
Изображение 9. Вывод двух верхних строк датафрейма pandas-spark. 
Изображение 9. Вывод двух верхних строк датафрейма pandas-spark. 
# Проверка типа psdf1
type(psdf1)
Изображение 10. Результирующий тип - датафрейм pandas-spark.
Изображение 10. Результирующий тип - датафрейм pandas-spark.

6.3 Создание датафрейма pandas-spark

В этом разделе мы разберем, как вместо создания датафрейма pandas-spark из CSV-файла сделать это напрямую, импортировав pyspark.pandas как ps. Ниже с помощью ps.DataFrame() мы создали датафрейм pandas-spark (psdf2). У psdf2 два признака и три строки.

import pandas as pd
import pyspark.pandas as ps
# Создание датафрейма pandas-spark
psdf2 = ps.DataFrame({'id': [1,2,3], 'score': [89, 97, 79]})
psdf2.head()
Изображение 11. Созданный нами датафрейм pandas -spark.
Изображение 11. Созданный нами датафрейм pandas -spark.

Если мы хотим преобразовать датафрейм pandas-spark (psdf2) обратно в датафрейм spark, то для этого у нас есть функция to_spark(), о которой мы ранее уже упоминали. Синтаксис обеспечивает гибкость при смене типов датафрейм, что может оказаться довольно полезным в зависимости от функций (pandas или spark), которые вы хотите использовать в своем анализе.

# Обратное преобразование датафрейма pandas-spark в датафрейм spark
sdf2 = psdf2.to_spark()
sdf2.show(2)
Изображение 12. Датафрейм spark, преобразованный из датафрейма pandas-spark
Изображение 12. Датафрейм spark, преобразованный из датафрейма pandas-spark

7. Применение SQL-запросов непосредственно к датафреймам pandas-spark

Еще одна замечательная тема для обсуждения в рамках pandas-spark API — это функция sql. Давайте используем эту функцию на созданном ранее датафрейме pandas-spark (psdf2) для извлечения некоторой информации. По сути для выполнения SQL-запроса нам просто нужно запустить функцию ps.sql() поверх датафрейма pandas-spark. Как показано ниже, функция count(*) для данных psdf2 результат равный трем. Точно так же второй запрос выводит отфильтрованные данные со score болеьше 80.

# Реализация SQL-запроса. Входные данные: датафрейм pandas-spark (psdf)
ps.sql("SELECT count(*) as num FROM {psdf2}")
Изображение 13. Отображение результатов sql-запроса: всего три записи.
Изображение 13. Отображение результатов sql-запроса: всего три записи.
# Возвращает датафрейм pandas-spark
selected_data = ps.sql("SELECT id, score  FROM {psdf2} WHERE score>80")
selected_data.head()
Изображение 14. Отображение результатов sql-запроса: score> 80
Изображение 14. Отображение результатов sql-запроса: score> 80

8. Графики датафреймов pandas и pandas-spark

Супер! Рад, что вы дошли до этого момента. Теперь предлагаю кратко затронуть возможности построения графиков нашего нового API pandas-spark. В отличие от статического графика по умолчанию в стандартном API Python pandas, график по умолчанию в API pandas-spark является интерактивным, поскольку по умолчанию он использует plotly. Сейчас мы импортируем данные в виде датафрейма pandas и pandas-spark и построим гистограмму по переменной зарплаты (salary) для каждого из типов данных.

# Чтение данных в виде датафрейма pandas 
pddf = pd.read_csv(datapath+'example_csv.csv')

type(pddf)
#pandas.core.frame.DataFrame

pddf.head(2)
Изображение 15. Чтение данные в виде датафрейма pandas.
Изображение 15. Чтение данные в виде датафрейма pandas.

На изображении ниже показана гистограмма зарплаты из датафрейма pandas.

# Чтение данных в виде датафрейма pandas-spark
pdsdf = read_csv(datapath+'example_csv.csv')

type(pdsdf)
# pyspark.pandas.frame.DataFrame

# постороение гистограмма по датафрейму pandas
pddf['salary'].hist(bins=3)
Изображение 16. Дефолтная python-гистограмма датафрейма pandas 
Изображение 16. Дефолтная python-гистограмма датафрейма pandas 

Ниже пример гистограммы по той же переменной на основе датафрейма pandas-spark, которая в сущности является интерактивным графиком.

Примечание: Приведенный ниже график вставлен как изображение, поэтому он статичен. Если вы запустите приведенный ниже синтаксис в jupyter notebook у вас будет возможность увеличивать/уменьшать масштаб (сделать его интерактивным).

# построение гистограммы по датафрейму pandas-spark
import plotly
pdsdf['salary'].hist(bins=3)
Изображение 17. Скрин интерактивного графика по датафрему pandas-pyspark.
Изображение 17. Скрин интерактивного графика по датафрему pandas-pyspark.

9. Переход от Koalas в API Pandas 

Напоследок давайте поговорим о том, какие изменения требуются при переходе от библиотеки Koalas в API pandas-spark. В таблице ниже показаны некоторые изменения синтаксиса: что было в Koalas, и как это выглядит в новом API pandas-spark.

Таблица 1. Переход от Koalas в API pandas-spark 
Таблица 1. Переход от Koalas в API pandas-spark 

10. Заключение

В этой статье вы узнали о способах использования недавно добавленной API pandas в spark3.2.0 с целью чтения данных, создания датафрейма, использования SQL непосредственно во фреймворке pandas-spark и перехода от существующей библиотеки Koalas в API pandas-spark.

Спасибо внимание! Подписывайтесь на мой аккаунт в LinkedIn, чтобы быть в курсе обновлений по полезным навыкам работы в датасайенс.

11. Ссылки

https://spark.apache.org/docs/latest/api/python/migration_guide/koalas_to_pyspark.html

https://databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html

https://www.datanami.com/2021/10/26/spark-gets-closer-hooks-to-pandas-sql-with-version-3-2/

https://www.datamechanics.co/blog-post/apache-spark-3-2-release-main-features-whats-new-for-spark-on-kubernetes

https://databricks.com/blog/2020/08/11/interoperability-between-koalas-and-apache-spark.html


Материал подготовлен в рамках курса «Spark Developer».

Всех желающих приглашаем на бесплатное demo-занятие «Написание коннекторов для Spark». На занятии мы разберем подключение к внешним системам из коробки и создание кастомного коннектора для подключения к нестандартным БД.
>> РЕГИСТРАЦИЯ

Источник: https://habr.com/ru/company/otus/blog/594787/


Интересные статьи

Интересные статьи

Data-science развивается очень быстро, в том числе благодаря росту объема доступных данных для анализа или построения моделей. Но для создания сложных моделей командам ан...
В жизни множества приложений наступает такой момент, когда наскоро внедренное решение перестает удовлетворять требованиям банальной пригодности к использованию вследствие...
Научные изыскания позволяют нам не только лучше понимать окружающий наш мир, но и контролировать некоторые процессы и явления. За долгие годы исследований и экспериментов мы научились...
Начну свой рассказ фразой, которая оправдывает себя на все 100: «Если информация не сохранена в трёх местах – её не существует». Не помню, сказал это кто-то из великих или это нар...
Как быстро определить, что на отдельно взятый сайт забили, и им никто не занимается? Если в подвале главной страницы в копирайте стоит не текущий год, а старый, то именно в этом году опека над са...