Привет! Меня зовут Александра, я работаю в отделе тестирования производительности Тинькофф. Мы продолжаем наш цикл статей, посвященных работе Gatling с различными протоколами. Ранее мы уже рассмотрели работу с HTTP, JDBC и gRPC. В этой статье поговорим о работе Gatling с Kafka. Если вы еще не прочли нашу вводную статью, рекомендую это сделать: в ней мы поделились базовой информацией о работе с Gatling.
Дисклеймер
На момент написания статьи используемые плагины поддерживают версию Gatling не выше 3.7.6. Для получения работающего скрипта в будущем достаточно обновить версии плагинов в соответствии с их документацией.
Пишем скрипты Gatling для Kafka
Kafka — распределенный брокер сообщений с высокой пропускной способностью. Он состоит из нескольких основных элементов:
Broker — сервер.
Producer — отвечает за отправку сообщений в топик.
Consumer — отвечает за считывание обращений.
ZooKeeper — хранилище метаданных и логов.
Также у Kafka особая структура данных: каждое сообщение содержит ключ, значение, временную метку и опционально может содержать метаданные. Сообщения отправляются в топики, которые могут состоять из нескольких партиций. Узнать о Kafka больше можно в официальной документации.
Разворачиваем тестовую Kafka
Для разработки скрипта Gatling развернем тестовую Kafka и Zookeeper. Для локального развертывания сервисов используем файл docker-compose.yml. Актуальные версии можно посмотреть на странице в DockerHub.
services:
zookeeper:
image: wurstmeister/zookeeper
hostname: zoo1
environment:
ZOO_MY_ID: "1"
ZOO_PORT: "2181"
ZOO_SERVERS: server.1=zoo1:2888:3888
ports:
- '2181:2181'
volumes:
- "./zookeeper/data:/data"
- "./zookeeper/logs:/datalog"
restart: on-failure
kafka:
image: wurstmeister/kafka
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_LISTENERS: BROKER://:9092,EXTERNAL://:9093
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka:9092,EXTERNAL://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
KAFKA_BROKER_ID: "1"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CREATE_TOPICS: myTopic:1:1
ports:
- '9093:9093'
- '9092:9092'
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Для запуска создаем файл docker-compose.yml и выполняем команду docker-compose up. Обратите внимание: для выполнения команды нужно установить и запустить Docker.
После выполнения команды будет развернута тестовая Kafka. Чтобы проверить ее доступность, устанавливаем KafkaTool и подключаемся к ней. Для этого создаем новое подключение. В открывшемся окне произвольно называем кластер, на вкладке Advanced вводим адрес localhost:9093.
Разрабатываем скрипт
Для написания скриптов берем тот же шаблон, что и в предыдущей статье. В качестве IDE для разработки скриптов будем использовать IntelliJ IDEA.
Шаг 1. Обновление зависимостей
В файле project/Dependencies.scala добавим плагин для работы Gatling с Kafka и его зависимости. Актуальную версию можно посмотреть в GitHub.
lazy val gatlingKafka: Seq[ModuleID] = Seq(
"ru.tinkoff" %% "gatling-kafka-plugin",
).map(_ % "<current version>")
lazy val avro4s: Seq[ModuleID] = Seq(
"com.sksamuel.avro4s" %% "avro4s-core")
.map(_ % "<current version>" % "provided")
В файле build.sbt добавляем новые зависимости.
libraryDependencies ++= gatlingKafka,
libraryDependencies ++= avro4s,
Для обновления зависимостей нажимаем Load sbt Changes через интерфейс IntelliJ IDEA.
Шаг 2. Переменные сервиса
Для локального подключения к Kafka указываем ее адрес в файле src/test/resources/simulation.conf.
kafkaBroker: "localhost:9093"
Шаг 3. Запросы
В директории cases создаем новый scala-класс, где будем писать запросы. Назовем его KafkaActions. Пишем запрос для отправки сообщения в топик myTopic, который мы создали в момент разворачивания Kafka.
package ru.tinkoff.load.myKafkaSample.cases
import io.gatling.core.Predef.
import ru.tinkoff.gatling.kafka.Predef.kafka
import ru.tinkoff.gatling.kafka.request.builder.RequestBuilder
object kafkaActions {
val sendMyMessage: RequestBuilder[_, Any] =
// Указываем имя запроса
kafka("my message")
// Указываем ключ и сообщение
.send("myMessage", "Hello!")
val sendOtherMessage: RequestBuilder[_, Any] = kafka("my other message")
.send("myMessage", "Hello, ${name}!")
}
Для отправки сообщения sendOtherMessage используем feeder, создаем для него директорию feeders. И в ней же — object Feeders.
package ru.tinkoff.load.myKafkaSample.feeders
import ru.tinkoff.gatling.feeders.RandomStringFeeder
object Feeders {
// Создаем фидер для генерации строки 6 случайных символов
val myRandomStringFeeder = RandomStringFeeder("name", 6)
}
Шаг 4. Сценарий теста
В CommonScenario меняем код для выполнения наших запросов. При такой реализации запросы будут выполняться последовательно в указанном порядке.
package ru.tinkoff.load.myKafkaSample.scenarios
import io.gatling.core.Predef.
import io.gatling.core.structure.ScenarioBuilder
import ru.tinkoff.load.myKafkaSample.cases.
import ru.tinkoff.gatling.kafka.Predef.
import ru.tinkoff.load.myKafkaSample.feeders.Feeders.myRandomStringFeeder
object CommonScenario {
def apply(): ScenarioBuilder = new CommonScenario().scn
}
class CommonScenario {
val scn: ScenarioBuilder =
// Указываем название сценария
scenario("Common Scenario")
// Подключаем feeder
.feed(myRandomStringFeeder)
// Отправляем сообщения
.exec(kafkaActions.sendMyMessage)
.exec(kafkaActions.sendOtherMessage)
}
Шаг 5. Описание Kafka-протокола
В файле myKafkaSample.scala описываем протокол для работы с Kafka.
package ru.tinkoff.load
import org.apache.kafka.clients.producer.ProducerConfig
import ru.tinkoff.gatling.config.SimulationConfig.
import ru.tinkoff.gatling.kafka.Predef.kafka
import ru.tinkoff.gatling.kafka.protocol.KafkaProtocolBuilder
package object myKafkaSample {
val kafkaProtocol: KafkaProtocolBuilder = kafka
// Указываем название топика
.topic("myTopic")
.properties(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
// Указываем kafka brokers
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> getStringParam("kafkaBroker"),
// Указываем тип ключа и сообщения
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
),
)
}
Шаг 6. Нагрузочные тесты
Описываем, как будем подавать нагрузку, в файле Debug.scala.
package ru.tinkoff.load.myKafkaSample
import io.gatling.core.Predef.
import ru.tinkoff.gatling.kafka.Predef._
import ru.tinkoff.load.myKafkaSample.scenarios.CommonScenario
class Debug extends Simulation {
setUp(
// Вызываем сценарий
CommonScenario()
// Задаем интенсивность
.inject(atOnceUsers(1)),
).protocols(
// Указываем протокол
kafkaProtocol,
)
}
По аналогии изменяем протокол в MaxPerformance и Stability.
Шаг 7. Запуск Debug-теста
Для запуска теста используем GatlingRunner. После выполнения скрипта можно подключиться к нашей тестовой Kafka, используя KafkaTool, и проверить, что сообщения появились в топике.
package ru.tinkoff.load.myKafkaSample
import io.gatling.app.Gatling
import io.gatling.core.config.GatlingPropertiesBuilder
object GatlingRunner {
def main(args: Array[String]): Unit = {
// Указывает имя симуляции Debug (можно использовать любую симуляцию)
val simulationClass = classOf[Debug].getName
val props = new GatlingPropertiesBuilder
props.simulationClass(simulationClass)
Gatling.fromMap(props.build)
}
}
Заключение
В этой статье цикла о Gatling мы узнали, как с помощью плагина можно формировать и отправлять сообщения в Kafka. Если вам нужно использовать avro-схемы, скачать их можно, используя плагин SBT Schema Registry. Мы планируем расширять функционал плагина — в следующих версиях с его помощью можно будет читать сообщения из Kafka.
Все плагины, использованные в цикле статей о Gatling, есть в открытом доступе. Если вы хотите внести свой вклад в развитие плагинов, которые упоминались в нашем цикле статей, мы будем рады получить PR или issue на GitHub. В следующей статье мы разберемся, как писать скрипты Gatling для протокола AMQP.
Полезные ссылки
Gatling Kafka Plugin
SBT Schema Registry Plugin
Официальная документация Kafka
Gatling Feeders