Благодаря недавнему релизу spark3.2.0 у нас появилась возможность масштабировать данные с помощью pandas.
1. Введение
13 октября 2021 г. команда Apache Spark зарелизила spark3.2.0. На этот раз в Spark, помимо все прочего, был добавлен API Pandas. Pandas — мощный и хорошо известный среди дата-сайентистов пакет. Однако у Pandas есть свои ограничения при работе с большими объемами данных, потому что он обрабатывает данные на одной машине. Несколько лет назад databricks выпустили библиотеку ‘Koalas’, чтобы решить эту проблему.
Добавление API Pandas в spark3.2.0 избавляет нас от необходимости использовать сторонние библиотеки. Пользователи Pandas теперь могут не отказывать себе в удовольствии использовать Pandas и масштабировать процессы до многоузловых кластеров Spark.
2. Цель
В этой статье говорится непосредственно о способах использования API Pandas в Spark:
Чтение данных в виде датафреймов pandas-spark;
Чтение данных в виде датафреймов spark и преобразование в датафреймы pandas-spark;
Создание датафреймов pandas-spark;
Применение SQL-запросов непосредственно к датафреймам pandas-spark;
Построение графиков на основе датафреймов pandas-spark;
Переход от koalas к API pandas в Spark.
3. Данные
CSV-файл и Jupyter Notebook, упомянутые в этой статье, можно найти на моей странице GitHub. Наборы данных там небольшие, однако проиллюстрированные здесь подходы могут быть применимы в больших наборах.
4. Требуется установка
Прежде чем продолжить, сначала скачайте spark3.2.0 (установочный файл можно найти здесь) и правильно настройте PySpark. Вам также понадобятся библиотеки pyarrow и plotly, которые можно установить через интерфейс jupyter notebook, как показано ниже:
pyarrow (!conda install -c conda-forge — yes pyarrow)
plotly (!conda install — yes plotly)
Прекрасно! Если ваш PySpark готов к труду и обороне, то давайте перейдем к следующему разделу.
5. Импорт библиотек и запуск сессий Spark
Теперь начнем импортировать PySpark и запустим сессию с помощью блока кода, приведенного ниже.
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
spark
Spark info сообщает нам, что используется версия 3.2.0.
Также будет не лишним проверить версию python и pyspark, как показано ниже. В моем случае, я использовал Spark версии 3.2.0 и python версии 3.8.8.
print('Version of python: ')
!python -V
print('Version of pyspark :', pyspark.__version__)
Хорошо! Теперь с помощью pyspark.pandas
импортируем функцию read_csv
для чтения данных CSV файла в виде датафрейма pandas-spark.
Если появится варнинг, как показано на изображении 3, то можно перед запуском pyspark.pandas import read_csv
установить для переменной среды (т.е. PYARROW_IGNORE_TIMEZON
) значение 1.
from pyspark.pandas import read_csv
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
sparkprint('Version of python: ')
!python -V
print('Version of pyspark :', pyspark.__version__)from pyspark.pandas import read_csv
# To get rid of error set the environ variable as below
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"from pyspark.pandas import read_csv
# Чтобы избавиться от сообщения об ошибке, установите значение для переменной среды,как показано ниже
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"
from pyspark.pandas import read_csv
6.1 Чтение данных из csv-файла в виде датафрейма pandas-spark
Для того, чтобы продемонстрировать различные варианты использования API pandas spark, мы воспользуемся файлом ‘example_csv.csv’. Функция read_csv возвращает датафрейм pandas-spark (назовите его: psdf).
# Читаем в качестве датафрейма pandas-spark
datapath = '/Users/...../'
psdf = read_csv(datapath+'example_csv.csv')
psdf.head(2)
Замечательно! Мы только что создали с вами датафрейм pandas-spark и теперь можем перейти к использованию функций pandas для выполнения последующих задач. Например, psdf.head(2)
и psdf.shape
можно использовать для получения двух верхних строк и размерности данных соответственно. В отличие от стандартного датафрейма pandas в python, здесь мы располагаем очень крутой возможностью распараллеливания, что является неоспоримым преимуществом.
# получаем тип данных
# получаем размерность данных
# получаем имена столбцов данных
print('Data type :', type(psdf))
print('Data shape :', psdf.shape)
print('Data columns : \n ', psdf.columns)
Более того, если вы хотите преобразовать датафрейм pandas-spark в датафрейм spark, это можно осуществить с помощью функции to_spark()
. На выходе мы получим датафрейм spark (назовите его: sdf), и теперь можем использовать все функции pyspark на этом датафрейме. Например, sdf.show(5)
и sdf.printSchema()
выводят пять верхних строк и схему данных датафрейма spark соответственно.
# Преобразование из датафрейма pandas-spark в датафрейм spark
# Вывод пяти верхних строк датафрейма spark
sdf = psdf.to_spark()
sdf.show(5)
# Вывод схемы
sdf.printSchema()
6.2. Чтение данных из csv-файла в виде датафрейма spark и их преобразование в датафрейм pandas-spark
Мы можем преобразовать датафрейм spark в датафрейм pandas-spark с помощью команды to_pandas_on_spark()
. Она принимает на вход датафрейм spark, на выходе мы получаем датафрейм pandas-spark (как вы могли и сами догадаться). Ниже, мы читаем данные в виде датафрейма spark (назовем его: sdf1). Чтобы подтвердить, что это датафрейм spark, мы можем использовать type(sdf1)
, который определяет, что это точно датафhейм spark, т.е. ‘pyspark.sql.dataframe.DataFrame’
.
# Чтение данных с помощью spark
sdf1 = spark.read.csv(datapath+'example_csv.csv', header=True,inferSchema=True)
type(sdf1)
После преобразования в датафрейм pandas-spark (psdf1) результирующим типом будет “pyspark.pandas.frame.DataFrame”
. Мы можем использовать функцию pandas
, например, .head()
, чтобы убедиться, что это все таки датафрейм pandas-spark.
# Преобразование в датафрейм pandas-spark
psdf1 = sdf1.to_pandas_on_spark()
# Вывод двух верхних строк
psdf1.head(2)
# Проверка типа psdf1
type(psdf1)
6.3 Создание датафрейма pandas-spark
В этом разделе мы разберем, как вместо создания датафрейма pandas-spark из CSV-файла сделать это напрямую, импортировав pyspark.pandas
как ps
. Ниже с помощью ps.DataFrame()
мы создали датафрейм pandas-spark (psdf2). У psdf2 два признака и три строки.
import pandas as pd
import pyspark.pandas as ps
# Создание датафрейма pandas-spark
psdf2 = ps.DataFrame({'id': [1,2,3], 'score': [89, 97, 79]})
psdf2.head()
Если мы хотим преобразовать датафрейм pandas-spark (psdf2) обратно в датафрейм spark, то для этого у нас есть функция to_spark()
, о которой мы ранее уже упоминали. Синтаксис обеспечивает гибкость при смене типов датафрейм, что может оказаться довольно полезным в зависимости от функций (pandas или spark), которые вы хотите использовать в своем анализе.
# Обратное преобразование датафрейма pandas-spark в датафрейм spark
sdf2 = psdf2.to_spark()
sdf2.show(2)
7. Применение SQL-запросов непосредственно к датафреймам pandas-spark
Еще одна замечательная тема для обсуждения в рамках pandas-spark API — это функция sql
. Давайте используем эту функцию на созданном ранее датафрейме pandas-spark (psdf2) для извлечения некоторой информации. По сути для выполнения SQL-запроса нам просто нужно запустить функцию ps.sql()
поверх датафрейма pandas-spark. Как показано ниже, функция count(*)
для данных psdf2 результат равный трем. Точно так же второй запрос выводит отфильтрованные данные со score болеьше 80.
# Реализация SQL-запроса. Входные данные: датафрейм pandas-spark (psdf)
ps.sql("SELECT count(*) as num FROM {psdf2}")
# Возвращает датафрейм pandas-spark
selected_data = ps.sql("SELECT id, score FROM {psdf2} WHERE score>80")
selected_data.head()
8. Графики датафреймов pandas и pandas-spark
Супер! Рад, что вы дошли до этого момента. Теперь предлагаю кратко затронуть возможности построения графиков нашего нового API pandas-spark. В отличие от статического графика по умолчанию в стандартном API Python pandas, график по умолчанию в API pandas-spark является интерактивным, поскольку по умолчанию он использует plotly. Сейчас мы импортируем данные в виде датафрейма pandas и pandas-spark и построим гистограмму по переменной зарплаты (salary) для каждого из типов данных.
# Чтение данных в виде датафрейма pandas
pddf = pd.read_csv(datapath+'example_csv.csv')
type(pddf)
#pandas.core.frame.DataFrame
pddf.head(2)
На изображении ниже показана гистограмма зарплаты из датафрейма pandas.
# Чтение данных в виде датафрейма pandas-spark
pdsdf = read_csv(datapath+'example_csv.csv')
type(pdsdf)
# pyspark.pandas.frame.DataFrame
# постороение гистограмма по датафрейму pandas
pddf['salary'].hist(bins=3)
Ниже пример гистограммы по той же переменной на основе датафрейма pandas-spark, которая в сущности является интерактивным графиком.
Примечание: Приведенный ниже график вставлен как изображение, поэтому он статичен. Если вы запустите приведенный ниже синтаксис в jupyter notebook у вас будет возможность увеличивать/уменьшать масштаб (сделать его интерактивным).
# построение гистограммы по датафрейму pandas-spark
import plotly
pdsdf['salary'].hist(bins=3)
9. Переход от Koalas в API Pandas
Напоследок давайте поговорим о том, какие изменения требуются при переходе от библиотеки Koalas в API pandas-spark. В таблице ниже показаны некоторые изменения синтаксиса: что было в Koalas, и как это выглядит в новом API pandas-spark.
10. Заключение
В этой статье вы узнали о способах использования недавно добавленной API pandas в spark3.2.0 с целью чтения данных, создания датафрейма, использования SQL непосредственно во фреймворке pandas-spark и перехода от существующей библиотеки Koalas в API pandas-spark.
Спасибо внимание! Подписывайтесь на мой аккаунт в LinkedIn, чтобы быть в курсе обновлений по полезным навыкам работы в датасайенс.
11. Ссылки
https://spark.apache.org/docs/latest/api/python/migration_guide/koalas_to_pyspark.html
https://databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html
https://www.datanami.com/2021/10/26/spark-gets-closer-hooks-to-pandas-sql-with-version-3-2/
https://www.datamechanics.co/blog-post/apache-spark-3-2-release-main-features-whats-new-for-spark-on-kubernetes
https://databricks.com/blog/2020/08/11/interoperability-between-koalas-and-apache-spark.html
Материал подготовлен в рамках курса «Spark Developer».
Всех желающих приглашаем на бесплатное demo-занятие «Написание коннекторов для Spark». На занятии мы разберем подключение к внешним системам из коробки и создание кастомного коннектора для подключения к нестандартным БД.
>> РЕГИСТРАЦИЯ