Gatling. Тестирование Kafka

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Привет! Меня зовут Александра, я работаю в отделе тестирования производительности Тинькофф. Мы продолжаем наш цикл статей, посвященных работе Gatling с различными протоколами. Ранее мы уже рассмотрели работу с HTTP, JDBC и gRPC. В этой статье поговорим о работе Gatling с Kafka. Если вы еще не прочли нашу вводную статью, рекомендую это сделать: в ней мы поделились базовой информацией о работе с Gatling.

Дисклеймер

На момент написания статьи используемые плагины поддерживают версию Gatling не выше 3.7.6. Для получения работающего скрипта в будущем достаточно обновить версии плагинов в соответствии с их документацией.

Пишем скрипты Gatling для Kafka

Kafka — распределенный брокер сообщений с высокой пропускной способностью. Он состоит из нескольких основных элементов:

  1. Broker — сервер.

  2. Producer — отвечает за отправку сообщений в топик.

  3. Consumer — отвечает за считывание обращений.

  4. ZooKeeper — хранилище метаданных и логов.

Также у Kafka особая структура данных: каждое сообщение содержит ключ, значение, временную метку и опционально может содержать метаданные. Сообщения отправляются в топики, которые могут состоять из нескольких партиций. Узнать о Kafka больше можно в официальной документации.

Разворачиваем тестовую Kafka

Для разработки скрипта Gatling развернем тестовую Kafka и Zookeeper. Для локального развертывания сервисов используем файл docker-compose.yml. Актуальные версии можно посмотреть на странице в DockerHub.

services:
 zookeeper:
   image: wurstmeister/zookeeper
   hostname: zoo1
   environment:
     ZOO_MY_ID: "1"
     ZOO_PORT: "2181"
     ZOO_SERVERS: server.1=zoo1:2888:3888
   ports:
     - '2181:2181'
   volumes:
     - "./zookeeper/data:/data"
     - "./zookeeper/logs:/datalog"
   restart: on-failure
 kafka:
   image: wurstmeister/kafka
   depends_on:
     - zookeeper
   environment:
     KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
     KAFKA_ADVERTISED_HOST_NAME: kafka
     KAFKA_LISTENERS: BROKER://:9092,EXTERNAL://:9093
     KAFKA_ADVERTISED_LISTENERS: BROKER://kafka:9092,EXTERNAL://localhost:9093
     KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
     KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
     KAFKA_BROKER_ID: "1"
     KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
     KAFKA_CREATE_TOPICS: myTopic:1:1
   ports:
     - '9093:9093'
     - '9092:9092'
   volumes:
     - /var/run/docker.sock:/var/run/docker.sock

 Для запуска создаем файл docker-compose.yml и выполняем команду docker-compose up. Обратите внимание: для выполнения команды нужно установить и запустить Docker.

После выполнения команды будет развернута тестовая Kafka. Чтобы проверить ее доступность, устанавливаем KafkaTool и подключаемся к ней. Для этого создаем новое подключение. В открывшемся окне произвольно называем кластер, на вкладке Advanced вводим адрес localhost:9093.

Разрабатываем скрипт

Для написания скриптов берем тот же шаблон, что и в предыдущей статье. В качестве IDE для разработки скриптов будем использовать IntelliJ IDEA. 

Шаг 1. Обновление зависимостей

В файле project/Dependencies.scala добавим плагин для работы Gatling с Kafka и его зависимости. Актуальную версию можно посмотреть в GitHub.

lazy val gatlingKafka: Seq[ModuleID] = Seq(
 "ru.tinkoff" %% "gatling-kafka-plugin",
).map(_ % "<current version>")

lazy val avro4s: Seq[ModuleID] = Seq(
  "com.sksamuel.avro4s" %% "avro4s-core")
.map(_ % "<current version>" % "provided")

 В файле build.sbt добавляем новые зависимости.

libraryDependencies ++= gatlingKafka,
libraryDependencies ++= avro4s,

Для обновления зависимостей нажимаем Load sbt Changes через интерфейс IntelliJ IDEA.

Шаг 2. Переменные сервиса

Для локального подключения к Kafka указываем ее адрес в файле src/test/resources/simulation.conf.

kafkaBroker: "localhost:9093"

Шаг 3. Запросы

В директории cases создаем новый scala-класс, где будем писать запросы. Назовем его KafkaActions. Пишем запрос для отправки сообщения в топик myTopic, который мы создали в момент разворачивания Kafka.

package ru.tinkoff.load.myKafkaSample.cases

import io.gatling.core.Predef.
import ru.tinkoff.gatling.kafka.Predef.kafka
import ru.tinkoff.gatling.kafka.request.builder.RequestBuilder

object kafkaActions {
 val sendMyMessage: RequestBuilder[_, Any] = 
  // Указываем имя запроса
  kafka("my message")
  // Указываем ключ и сообщение
   .send("myMessage", "Hello!")
  
 val sendOtherMessage: RequestBuilder[_, Any] = kafka("my other message")
   .send("myMessage", "Hello, ${name}!")
}

Для отправки сообщения sendOtherMessage используем feeder, создаем для него директорию feeders. И в ней же — object Feeders.

package ru.tinkoff.load.myKafkaSample.feeders

import ru.tinkoff.gatling.feeders.RandomStringFeeder

 object Feeders {
   // Создаем фидер для генерации строки 6 случайных символов
   val myRandomStringFeeder = RandomStringFeeder("name", 6)
 }

Шаг 4. Сценарий теста

В CommonScenario меняем код для выполнения наших запросов. При такой реализации запросы будут выполняться последовательно в указанном порядке.

package ru.tinkoff.load.myKafkaSample.scenarios

import io.gatling.core.Predef.
import io.gatling.core.structure.ScenarioBuilder
import ru.tinkoff.load.myKafkaSample.cases.
import ru.tinkoff.gatling.kafka.Predef.
import ru.tinkoff.load.myKafkaSample.feeders.Feeders.myRandomStringFeeder

object CommonScenario {
 def apply(): ScenarioBuilder = new CommonScenario().scn
}

class CommonScenario {
 val scn: ScenarioBuilder = 
  // Указываем название сценария
  scenario("Common Scenario")
  // Подключаем feeder
   .feed(myRandomStringFeeder)
  // Отправляем сообщения
   .exec(kafkaActions.sendMyMessage)
   .exec(kafkaActions.sendOtherMessage)
}

Шаг 5. Описание Kafka-протокола

В файле myKafkaSample.scala описываем протокол для работы с Kafka.

package ru.tinkoff.load

import org.apache.kafka.clients.producer.ProducerConfig
import ru.tinkoff.gatling.config.SimulationConfig.
import ru.tinkoff.gatling.kafka.Predef.kafka
import ru.tinkoff.gatling.kafka.protocol.KafkaProtocolBuilder

package object myKafkaSample {
 val kafkaProtocol: KafkaProtocolBuilder = kafka
  // Указываем название топика
   .topic("myTopic")
   .properties(
     Map(
       ProducerConfig.ACKS_CONFIG                   -> "1",
       // Указываем kafka brokers
       ProducerConfig.BOOTSTRAP_SERVERS_CONFIG      -> getStringParam("kafkaBroker"),
       // Указываем тип ключа и сообщения
       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG   -> "org.apache.kafka.common.serialization.StringSerializer",
       ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
     ),
   )
}

Шаг 6. Нагрузочные тесты

Описываем, как будем подавать нагрузку, в файле Debug.scala.

package ru.tinkoff.load.myKafkaSample

import io.gatling.core.Predef.
import ru.tinkoff.gatling.kafka.Predef._
import ru.tinkoff.load.myKafkaSample.scenarios.CommonScenario

class Debug extends Simulation {
 setUp(
   // Вызываем сценарий
   CommonScenario()
   // Задаем интенсивность
   .inject(atOnceUsers(1)),
 ).protocols(
   // Указываем протокол
   kafkaProtocol,
 )
}

По аналогии изменяем протокол в MaxPerformance и Stability

Шаг 7. Запуск Debug-теста

Для запуска теста используем GatlingRunner. После выполнения скрипта можно подключиться к нашей тестовой Kafka, используя KafkaTool, и проверить, что сообщения появились в топике.

package ru.tinkoff.load.myKafkaSample

import io.gatling.app.Gatling
import io.gatling.core.config.GatlingPropertiesBuilder

object GatlingRunner {
 def main(args: Array[String]): Unit = {

   // Указывает имя симуляции Debug (можно использовать любую симуляцию)
   val simulationClass = classOf[Debug].getName

   val props = new GatlingPropertiesBuilder

   props.simulationClass(simulationClass)

   Gatling.fromMap(props.build)
 }
}

Заключение

В этой статье цикла о Gatling мы узнали, как с помощью плагина можно формировать и отправлять сообщения в Kafka. Если вам нужно использовать avro-схемы, скачать их можно, используя плагин SBT Schema Registry. Мы планируем расширять функционал плагина — в следующих версиях с его помощью можно будет читать сообщения из Kafka. 

Все плагины, использованные в цикле статей о Gatling, есть в открытом доступе. Если вы хотите внести свой вклад в развитие плагинов, которые упоминались в нашем цикле статей, мы будем рады получить PR или issue на GitHub. В следующей статье мы разберемся, как писать скрипты Gatling для протокола AMQP.

Полезные ссылки

  1. Gatling Kafka Plugin

  2. SBT Schema Registry Plugin

  3. Официальная документация Kafka

  4. Gatling Feeders

Источник: https://habr.com/ru/company/tinkoff/blog/666886/


Интересные статьи

Интересные статьи

Всем привет! Недавно X5Tech провела митап, где эксперты компании говорили о том, как работают над проектами по Big Data. Делимся записью выступлений и презентациями спикеров.
В августе Московский метрополитен запустил тестирование виртуальной «Тройки» на Московском транспорте. Суть технологии в том, что для оплаты транспорта вы можете использовать смартфон на Android, в пл...
Недавно мы с коллегами работали над задачей автоматического распознавания русского рукописного текста. В предыдущей статье была описана работа над созданием нашего датасета для обучения моде...
В основе Apache Kafka находится лог — простая структура данных, которая использует последовательные операции, работающие в симбиозе с оборудованием. Эффективное использование дискового ...
Привет! На связи команда Ad-hoc аналитики Big Data из X5 Retail Group. В этой статье мы расскажем о нашей методологии A/B-тестирования и сложностях, с которыми мы ежедневно сталкиваемся. В...