Apache Flink ML – прогнозирование в реальном времени

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Всем привет!

В этой статье рассмотрим применение библиотеки Apache Flink ML для построения конвейеров машинного обучения. Затем реализуем простой проект по прогнозированию поведения системы, а также ответим на вопросы: какие задачи Machine Learning подходят для Flink и какие особенности Flink делают его подходящим для использования в задачах Machine Learning.

Задача

Давайте представим, что у нас есть некая система, за эффективностью работы которой нам необходимо следить в реальном времени, чтобы вовремя обнаруживать тенденцию к ее деградации и иметь возможность заранее предпринимать меры для поддержания ее работы на должном уровне.

Допустим, что нам предоставляют данные о значениях нескольких показателей системы в формате csv:

0.5,10.0,4.3,1.2,2.5,0.0
1.2,9.0,4.0,1.1,5.2,0.0
..

Эти параметры мы получаем в топик Kafka, далее мы передаем их в Flink на вход нашей модели и на выходе получаем оценку.

Условимся, что максимальное значение оценки может быть равно 10.0. Если полученная оценка находится в диапазоне 0.0 - 5.0, то все в порядке, если же начинает превышать 5.0, то стоит напрячься и начать рассылать оповещение в службу поддержки системы. В конкретном случае, будем формировать и отправлять сообщение с оповещением в топик Kafka.

Ниже изображена схема нашей системы:

Программируем

Стек используемых технологий:

1.     Java 8

2.     Flink 1.16.1

3.     Flink ML 2.2.0

Сначала создаем файл pom.xml, в котором опишем все необходимые нам зависимости. Теперь создаем основной класс RealTimePrediction. В методе main получаем окружение для доступа к Flink и устанавливаем режим потоковой обработки.

public class RealTimePrediction {
    public static void main(String[] args) {
      //получаем окружение для доступа к Flink
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      //устанавливаем режим потоковой обработки
      env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
  }
}

API Flink ML основан на Table API этого фреймворка, который представляет собой интегрированный в язык API запросов для Java, Scala и Python, позволяющий составлять запросы из реляционных операторов, включая выборку, фильтрацию и объединение. Дополнительно к набору типов данных, который поддерживает Table API, Flink ML также обеспечивает поддержку векторов с двойными значениями, включая плотные (DenseVector) и разреженные (SparseVector). Мы будем использовать DenseVector.

Как описали ранее, данные мы будем получать из топика Kafka. Тут можно пойти несколькими путями, например, создать источник данных из Kafka, получить поток данных DataStream, а потом преобразовать его в Table, но мы воспользуемся более простым способом, получим окружение StreamTableEnvironment для доступа к Flink Table API. И теперь создадим источник данных из Kafka сразу в виде Table:

// получаем окружение для доступа к Flink Table API
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// создаем дескриптор, описывающий коннект к брокеру Kafka
TableDescriptor tableDescriptor = TableDescriptor
                .forConnector("kafka")
                // описываем структуру нашей таблицы
                .schema(
                        Schema.newBuilder()
                                .column("f1", DataTypes.DOUBLE())
                                .column("f2", DataTypes.DOUBLE())
                                .column("f3", DataTypes.DOUBLE())
                                .column("f4", DataTypes.DOUBLE())
                                .column("f5", DataTypes.DOUBLE())
                                .column("label", DataTypes.DOUBLE())
                                .build()
                )
                // указываем формат входящих данных для парсинга
                .format("csv")
                .option("topic", "input-topic")
                .option("properties.bootstrap.servers", "localhost:9092")
                .option("scan.startup.mode", "latest-offset")
                .build();
// создаем таблицу с указанным именем, передав дескриптор в качестве аргумента
tableEnv.createTable("features_table", tableDescriptor);
// получаем Table с именем features
Table predictTable = tableEnv.sqlQuery("SELECT * FROM features")

Для любителей SQL то же самое можно было сделать следующим образом:

tableEnv.executeSql("CREATE TABLE features_table " +
         "(" +
         "f1 DOUBLE," +
         "f2 DOUBLE," +
         "f3 DOUBLE," +
         "f4 DOUBLE," +
         "f5 DOUBLE," +
         "label DOUBLE" +
         ") WITH (" +
         "'connector'='kafka'," +
         "'format'='csv'," +
         "'topic'='input-topic'," +
         "'properties.bootstrap.servers'='localhost:9092'," +
         "'scan.startup.mode'='latest-offset'" +
         ")"
);

После того, как мы подготовили входные данные, приступим к построению конвейера с помощью Flink ML. Сначала необходимо определиться с используемым алгоритмом Machine Learning. В библиотеке Flink ML уже собрано множество алгоритмов под разные задачи, поэтому можно воспользоваться готовым или разработать свой собственный алгоритм.

Так как нам нужно предсказывать поведение системы, то в конкретном примере будем использовать простейший алгоритм классификации –kNN (расшифровывается как k Nearest Neighbor или k Ближайших Соседей). При использовании этого алгоритма для классификации объект присваивается тому классу, который является наиболее схожим с ним среди k соседей данного элемента, классы которых уже известны.

//создадим объект реализующий алгоритм kNN и установим количество ближайших объектов для классификации равным, например, 4
Knn knn = new Knn().setK(4);

Далее получаем модель, которую будем использовать для предсказаний, предварительно обучив ее. Для этого подготавливаем данные для ее обучения. После этого вызываем метод .fit(...) и передаем их в качестве аргумента.

//создаем поток данных для обучения
DataStream<Row> trainStream = env.fromElements(
  Row.of(Vectors.dense(0.7, 0.1, 0.0, 2.0, 1.0), 1.0),
  ...
  Row.of(Vectors.dense(1.0, 1.5, 3.0, 2.0, 1.0), 3.0)
);
//преобразуем DataStream в Table, используя окружение Flink Table Api
Table trainTable = tableEnv.fromDataStream(trainStream).as("features", "label");

//обучаем нашу Knn модель
KnnModel knnModel = knn.fit(trainTable);

Теперь проверим схемы таблицы trainTable с данными для обучения и predictTable – с данными из топика Kafka, для этого воспользуемся методом .printSchema():

trainTable.printSchema();
predictTable.printSchema();

Получаем следующий результат:

// таблица для обучения
(
  `features` RAW('org.apache.flink.ml.linalg.DenseVector', '...'),
  `label` DOUBLE
)
// таблица с данными из топика Kafka
(
  `f1` DOUBLE,
  `f2` DOUBLE,
  `f3` DOUBLE,
  `f4` DOUBLE,
  `f5` DOUBLE,
  `label` DOUBLE
)

Давайте приведем таблицу predictTable к формату, который будет понятен нашей модели. Для этого внесем изменения в строку, в которой мы получаем эту таблицу:

Table predictTable = tableEnv
  // объединим входные параметры в массив
  .sqlQuery("SELECT ARRAY[f1, f2, f3, f4, f5] AS features, label FROM features_table")
  // преобразуем массив features в DenseVector          
  .select(arrayToVector($("features")).as("features"), $("label"));

Получаем схему таблицы predictTable вызовом метода .printSchema():

// схема таблицы predictTable
(
  `features` RAW('org.apache.flink.ml.linalg.DenseVector', '...'),
  `label` DOUBLE
)

Теперь, когда схемы таблиц trainTable и predictTable полностью соответствуют, подаем на вход нашей модели таблицу predictTable в качестве аргумента метода .transform(...)и получаем таблицу outputTable с результатами.

// используем нашу обученную kNN модель для прогнозирования поведения системы
Table outputTable = knnModel.transform(predictTable)[0];

Далее нужно проанализировать полученные результаты.

//получаем результаты из таблицы outputTable
outputTable.execute().collect().forEachRemaining(row -> {
  DenseVector features = (DenseVector) row.getField(knn.getFeaturesCol());
  double predictionResult = (Double) row.getField(knn.getPredictionCol());
  System.out.printf(
    "Features: %-15s \tPrediction Result: %s\n",
    features, predictionResult);
});

Осталось добавить отправку сообщений с оповещением в топик Kafka. Для этого создадим KafkaProducer, который будет отправлять сообщения типа String:

Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.ACKS_CONFIG, "all");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Напишем метод, формирующий текст отправляемого сообщения, в зависимости от полученного результата. По условию задачи мы не должны отправлять сообщение, если результат < 5.0, поэтому в качестве типа возвращаемого результата будем использовать Optional<String>:

private static Optional<String> maybeCreateAlertMessage(double predictionResult) {
  if (predictionResult >= 5.0 && predictionResult < 7.0) {
    return Optional.of("WARNING. The system load rating has exceeded 5.0 and is equal to " + predictionResult);
  } else if (predictionResult >= 7.0) {
    return Optional.of("CRITICAL. The system load rating has exceeded 7.0 and is equal to " + predictionResult);
  } else return Optional.empty();
}

Добавим вызов этого метода и отправку сообщения в топик Kafka в блок анализа результата.

outputTable.execute().collect().forEachRemaining(row -> {
  DenseVector features = (DenseVector) row.getField(knn.getFeaturesCol());
  double predictionResult = (Double) row.getField(knn.getPredictionCol());
  System.out.printf(
    "Features: %-15s \tPrediction Result: %s\n",
    features, predictionResult);
  //вызываем метод формирования текста сообщения
  maybeCreateAlertMessage(predictionResult)
    .ifPresent(msg -> {
      //создаем сообщение и отправляем его в топик Kafka
      ProducerRecord<String, String> message = new ProducerRecord<>("alert-topic", "message-key", msg);
      producer.send(message);
  });
});

producer.close();

Наш проект готов, осталось скомпилировать код и запустить его.

После запуска отправим пару сообщений во входной топик Kafka. В итоге получили следующие результаты:

//         входные данные               результаты прогнозирования модели 
Features: [3.0, 3.0, 1.0, 2.0, 2.0] 	Prediction Result: 1.0
Features: [8.0, 3.0, 7.0, 1.0, 1.0] 	Prediction Result: 5.0

Видим, что при обработке второго сообщения, в котором Prediction Result: 5.0 должно быть отправлено сообщение в топик Kafka. Убедимся в этом, воспользовавшись утилитой Offset Explorer.

Все получилось. И надеемся, что до службы поддержки сообщение дойдет вовремя.

Итоги

Библиотека Apache Flink ML достаточно удобна для разработки, так как все алгоритмы ML объединены в единый API, что упрощает их использование и снижает затраты на изучение, понимание алгоритмов, а также обеспечивает лучшее взаимодействие и совместимость между алгоритмами.

Как видно из проекта, работа с библиотекой Apache Flink ML сводится к нескольким этапам:

1.     Выбор алгоритма ML или создание собственного;

2.     Подготовка данных для обучения модели;

3.     Обучение модели;

4.     Подача входных данных на вход обученной модели;

5.     Анализ результатов;

Различные операторы Flink можно объединять между собой и получить более сложный, расширенный оператор, что значительно повышает эффективность разработки новых алгоритмов. Стоит отметить, что данные для модели передаются в формате таблиц с использованием унифицированного Table API, что позволяет алгоритмам, разработанными разными компаниями, быть совместимыми друг с другом, что значительно снижает стоимость разработки и повышает эффективность используемого алгоритма.

Одним из весомых преимуществ при использовании фреймворка Apache Flink и библиотеки Flink ML для решения задач прогнозирования в реальном времени является потоковый режим работы. Помимо этого, Flink содержит множество готовых коннекторов для взаимодействия с различными системами, базами данных., что положительно сказывается на скорости разработки.

Код проекта можно найти здесь

Источник: https://habr.com/ru/companies/neoflex/articles/755804/


Интересные статьи

Интересные статьи

Небольшой рассказ с картинками о проведении тематического моделирования для массива документов, на примере датасета анекдотов на русском языке. В работе применены библиотеки Gensim, Sklearn. Рассмотре...
Рассмотрим простой способ получить информацию из лога Nifi с помощью самого Nifi. Это может быть полезно при разработке, тестировании, отладке потоков. А также, поможет организовать получение событий...
Современные сети, основанные на маршрутизации IP-пакетов, а точнее сервисы, которые они предоставляют, по факту управляются протоколом BGP. Этот протокол был спроектирова...
В прошлой статье я писал, как добавить на сервер мониторинг. Вот только доступ до него не очень удобный, через порт: домен.com:8080/monitorixРешил переделать, чтобы можн...
Недавно мне на глаза попался датасет на Kaggle с данными о 45 тысячах фильмов с Full MovieLens Dataset. Данные содержали не только информацию об актерах, съемочной команде, сюжете и т.п., но и оц...