Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
Введение
На текущий момент не так много примеров тестов для приложений на основе Spark Structured Streaming. Поэтому в данной статье приводятся базовые примеры тестов с подробным описанием.
Все примеры используют: Apache Spark 3.0.1.
Подготовка
Необходимо установить:
- Apache Spark 3.0.x
- Python 3.7 и виртуальное окружение для него
- Conda 4.y
- scikit-learn 0.22.z
- Maven 3.v
- В примерах для Scala используется версия 2.12.10.
- Загрузить Apache Spark
- Распаковать: tar -xvzf ./spark-3.0.1-bin-hadoop2.7.tgz
- Создать окружение, к примеру, с помощью conda: conda create -n sp python=3.7
Необходимо настроить переменные среды. Здесь приведен пример для локального запуска.
SPARK_HOME=/Users/$USER/Documents/spark/spark-3.0.1-bin-hadoop2.7
PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;
Тесты
Пример с scikit-learn
При написании тестов необходимо разделять код таким образом, чтобы можно было изолировать логику и реальное применение конечного API. Хороший пример изоляции: DataFrame-pandas, DataFrame-spark.
Для написания тестов будет использоваться следующий пример: LinearRegression.
Итак, пусть код для тестирования использует следующий "шаблон" для Python:
class XService:
def __init__(self):
# Инициализация
def train(self, ds):
# Обучение
def predict(self, ds):
# Предсказание и вывод результатов
Для Scala шаблон выглядит соответственно.
Полный пример:
from sklearn import linear_model
class LocalService:
def __init__(self):
self.model = linear_model.LinearRegression()
def train(self, ds):
X, y = ds
self.model.fit(X, y)
def predict(self, ds):
r = self.model.predict(ds)
print(r)
Тест.
Импорт:
import unittest
import numpy as np
Основной класс:
class RunTest(unittest.TestCase):
Запуск тестов:
if __name__ == "__main__":
unittest.main()
Подготовка данных:
X = np.array([
[1, 1], # 6
[1, 2], # 8
[2, 2], # 9
[2, 3] # 11
])
y = np.dot(X, np.array([1, 2])) + 3 # [ 6 8 9 11], y = 1 * x_0 + 2 * x_1 + 3
Создание модели и обучение:
service = local_service.LocalService()
service.train((X, y))
Получение результатов:
service.predict(np.array([[3, 5]]))
service.predict(np.array([[4, 6]]))
Ответ:
[16.]
[19.]
Все вместе:
import unittest
import numpy as np
from spark_streaming_pp import local_service
class RunTest(unittest.TestCase):
def test_run(self):
# Prepare data.
X = np.array([
[1, 1], # 6
[1, 2], # 8
[2, 2], # 9
[2, 3] # 11
])
y = np.dot(X, np.array([1, 2])) + 3 # [ 6 8 9 11], y = 1 * x_0 + 2 * x_1 + 3
# Create model and train.
service = local_service.LocalService()
service.train((X, y))
# Predict and results.
service.predict(np.array([[3, 5]]))
service.predict(np.array([[4, 6]]))
# [16.]
# [19.]
if __name__ == "__main__":
unittest.main()
Пример с Spark и Python
Будет использован аналогичный алгоритм – LinearRegression. Нужно отметить, что Structured Streaming основан на тех же DataFrame-х, которые используются и в Spark Sql. Но как обычно есть нюансы.
Инициализация:
self.service = LinearRegression(maxIter=10, regParam=0.01)
self.model = None
Обучение:
self.model = self.service.fit(ds)
Получение результатов:
transformed_ds = self.model.transform(ds)
q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
return q
Все вместе:
from pyspark.ml.regression import LinearRegression
class StructuredStreamingService:
def __init__(self):
self.service = LinearRegression(maxIter=10, regParam=0.01)
self.model = None
def train(self, ds):
self.model = self.service.fit(ds)
def predict(self, ds):
transformed_ds = self.model.transform(ds)
q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
return q
Сам тест.
Обычно в тестах можно использовать данные, которые создаются прямо в тестах.
train_ds = spark.createDataFrame([
(6.0, Vectors.dense([1.0, 1.0])),
(8.0, Vectors.dense([1.0, 2.0])),
(9.0, Vectors.dense([2.0, 2.0])),
(11.0, Vectors.dense([2.0, 3.0]))
],
["label", "features"]
)
Это очень удобно и код получается компактным.
Но подобный код, к сожалению, не будет работать в Structured Streaming, т.к. созданный DataFrame не будет обладать нужными свойствами, хотя и будет соответствовать контракту DataFrame.
На текущий момент для создания источников для тестов можно использовать такой же подход, что и в тестах для Spark.
def test_stream_read_options_overwrite(self):
bad_schema = StructType([StructField("test", IntegerType(), False)])
schema = StructType([StructField("data", StringType(), False)])
df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \
.schema(bad_schema)\
.load(path='python/test_support/sql/streaming', schema=schema, format='text')
self.assertTrue(df.isStreaming)
self.assertEqual(df.schema.simpleString(), "struct<data:string>")
И так.
Создается контекст для работы:
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
Подготовка данных для обучения (можно сделать обычным способом):
train_ds = spark.createDataFrame([
(6.0, Vectors.dense([1.0, 1.0])),
(8.0, Vectors.dense([1.0, 2.0])),
(9.0, Vectors.dense([2.0, 2.0])),
(11.0, Vectors.dense([2.0, 3.0]))
],
["label", "features"]
)
Обучение:
service = structure_streaming_service.StructuredStreamingService()
service.train(train_ds)
Получение результатов. Для начала считываем данные из файла и выделяем: признаки и идентификатор для объектов. После запускаем предсказание с ожиданием в 3 секунды.
def extract_features(x):
values = x.split(",")
features_ = []
for i in values[1:]:
features_.append(float(i))
features = Vectors.dense(features_)
return features
extract_features_udf = udf(extract_features, VectorUDT())
def extract_label(x):
values = x.split(",")
label = float(values[0])
return label
extract_label_udf = udf(extract_label, FloatType())
predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \
.withColumn("features", extract_features_udf(col("value"))) \
.withColumn("label", extract_label_udf(col("value")))
service.predict(predict_ds).awaitTermination(3)
Ответ:
15.96699
18.96138
Все вместе:
import unittest
import warnings
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
from pyspark.ml.linalg import Vectors, VectorUDT
from spark_streaming_pp import structure_streaming_service
class RunTest(unittest.TestCase):
def test_run(self):
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Prepare data.
train_ds = spark.createDataFrame([
(6.0, Vectors.dense([1.0, 1.0])),
(8.0, Vectors.dense([1.0, 2.0])),
(9.0, Vectors.dense([2.0, 2.0])),
(11.0, Vectors.dense([2.0, 3.0]))
],
["label", "features"]
)
# Create model and train.
service = structure_streaming_service.StructuredStreamingService()
service.train(train_ds)
# Predict and results.
def extract_features(x):
values = x.split(",")
features_ = []
for i in values[1:]:
features_.append(float(i))
features = Vectors.dense(features_)
return features
extract_features_udf = udf(extract_features, VectorUDT())
def extract_label(x):
values = x.split(",")
label = float(values[0])
return label
extract_label_udf = udf(extract_label, FloatType())
predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \
.withColumn("features", extract_features_udf(col("value"))) \
.withColumn("label", extract_label_udf(col("value")))
service.predict(predict_ds).awaitTermination(3)
# +-----+------------------+
# |label| prediction|
# +-----+------------------+
# | 1.0|15.966990887541273|
# | 2.0|18.961384020443553|
# +-----+------------------+
def setUp(self):
warnings.filterwarnings("ignore", category=ResourceWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)
if __name__ == "__main__":
unittest.main()
Нужно отметить, что для Scala можно воспользоваться созданием потока в памяти.
Это может выглядеть вот так:
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val source = MemoryStream[Record]
source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
val predictDs = source.toDF()
service.predict(predictDs).awaitTermination(2000)
Полный пример на Scala (здесь, для разнообразия, не используется sql):
package aaa.abc.dd.spark_streaming_pr.cluster
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.streaming.StreamingQuery
class StructuredStreamingService {
var service: LinearRegression = _
var model: LinearRegressionModel = _
def train(ds: DataFrame): Unit = {
service = new LinearRegression().setMaxIter(10).setRegParam(0.01)
model = service.fit(ds)
}
def predict(ds: DataFrame): StreamingQuery = {
val m = ds.sparkSession.sparkContext.broadcast(model)
def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = {
m.value.predict(features)
}
val transform: org.apache.spark.ml.linalg.Vector => Double = transformFun
val toUpperUdf = udf(transform)
val predictionDs = ds.withColumn("prediction", toUpperUdf(ds("features")))
predictionDs
.writeStream
.foreachBatch((r: DataFrame, i: Long) => {
r.show()
// scalastyle:off println
println(s"$i")
// scalastyle:on println
})
.start()
}
}
Тест:
package aaa.abc.dd.spark_streaming_pr.cluster
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.scalatest.{Matchers, Outcome, fixture}
class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers {
test("run") { spark =>
// Prepare data.
val trainDs = spark.createDataFrame(Seq(
(6.0, Vectors.dense(1.0, 1.0)),
(8.0, Vectors.dense(1.0, 2.0)),
(9.0, Vectors.dense(2.0, 2.0)),
(11.0, Vectors.dense(2.0, 3.0))
)).toDF("label", "features")
// Create model and train.
val service = new StructuredStreamingService()
service.train(trainDs)
// Predict and results.
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val source = MemoryStream[Record]
source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
val predictDs = source.toDF()
service.predict(predictDs).awaitTermination(2000)
// +-----+---------+------------------+
// |label| features| prediction|
// +-----+---------+------------------+
// | 1.0|[3.0,5.0]|15.966990887541273|
// | 2.0|[4.0,6.0]|18.961384020443553|
// +-----+---------+------------------+
}
override protected def withFixture(test: OneArgTest): Outcome = {
val spark = SparkSession.builder().master("local[2]").getOrCreate()
try withFixture(test.toNoArgTest(spark))
finally spark.stop()
}
override type FixtureParam = SparkSession
case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector)
}
Выводы
При написании тестов необходимо разделять код таким образом, чтобы разделять логику и применение конкретных вызовов API. Можно использоваться любые доступные источники. В том числе и kafka.
Такие абстракции как “DataFrame” позволяют это сделать легко и просто.
При использовании Python данные придется хранить в файлах.
Ссылки и ресурсы
- Исходный код — Python
- Исходный код — Scala