Прежде чем начать я бы хотел отметить, что это всего лишь небольшой туториал по быстрому старту для тех кто, как и я, ни разу не использовал Kafka на практике
И так приступим!
Единственный брокер Kafka и необходимый для его работы ZooKeeper я буду запускать в Docker
Сперва создам отдельную сеть kafkanet
docker network create kafkanet
Запуск контейнера с ZooKeeper
docker run -d --network=kafkanet --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper
Запуск контейнера с Kafka
docker run -d --network=kafkanet --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka
Для того чтобы убедиться в отсутствии ошибок, можно вывести лог docker logs kafka
Далее проверю функционирование брокера Kafka, выполнив простые операции, включающие создание тестовой темы, генерацию сообщений и их потребление
Для этого сценария подключусь к контейнеру kafka
docker exec -it kafka bash
Создам топик demo-topic
/bin/kafka-topics --create --topic demo-topic --bootstrap-server kafka:9092
Выведу список всех топиков
/bin/kafka-topics --list --zookeeper zookeeper:2181
И выведу описание созданного топика
/bin/kafka-topics --describe --topic demo-topic --bootstrap-server kafka:9092
Сгенерирую несколько сообщений
/bin/kafka-console-producer --topic demo-topic --bootstrap-server kafka:9092
И после прочитаю эти сообщения
/bin/kafka-console-consumer --topic demo-topic --from-beginning --bootstrap-server kafka:9092
Далее я создам два небольших .NET приложения: KafkaProducer
, которое будет генерировать сообщения, и KafkaConsumer
, которое будет потреблять сообщения. Для реализации мне понадобятся пакеты Confluent.Kafka и Microsoft.Extensions.Hosting
В проект KafkaProducer
добавлю класс KafkaProducerService
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
namespace KafkaProducer
{
public class KafkaProducerService : IHostedService
{
private readonly ILogger<KafkaProducerService> _logger;
private readonly IProducer<Null, string> _producer;
public KafkaProducerService(ILogger<KafkaProducerService> logger)
{
_logger = logger;
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
_producer = new ProducerBuilder<Null, string>(config).Build();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
for (var i = 0; i < 5; i++)
{
var value = $"Event N {i}";
_logger.LogInformation($"Sending >> {value}");
await _producer.ProduceAsync(
"demo-topic",
new Message<Null, string> { Value = value },
cancellationToken);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_producer?.Dispose();
_logger.LogInformation($"{nameof(KafkaProducerService)} stopped");
return Task.CompletedTask;
}
}
}
Изменю файл Program.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
namespace KafkaProducer
{
class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
Console.ReadKey();
}
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host
.CreateDefaultBuilder(args)
.ConfigureServices((context, collection) =>
collection.AddHostedService<KafkaProducerService>());
}
}
В проект KafkaConsumer
добавлю класс KafkaConsumerService
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
namespace KafkaConsumer
{
public class KafkaConsumerService : IHostedService
{
private readonly ILogger<KafkaConsumerService> _logger;
private readonly IConsumer<Ignore, string> _consumer;
public KafkaConsumerService(ILogger<KafkaConsumerService> logger)
{
_logger = logger;
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "demo-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
_consumer = new ConsumerBuilder<Ignore, string>(config).Build();
}
public Task StartAsync(CancellationToken cancellationToken)
{
_consumer.Subscribe("demo-topic");
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = _consumer.Consume(cancellationToken);
_logger.LogInformation($"Received >> {consumeResult.Message.Value}");
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_consumer?.Dispose();
_logger.LogInformation($"{nameof(KafkaConsumerService)} stopped");
return Task.CompletedTask;
}
}
}
Изменю файл Program.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
namespace KafkaConsumer
{
class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
Console.ReadKey();
}
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host
.CreateDefaultBuilder(args)
.ConfigureServices((context, collection) =>
collection.AddHostedService<KafkaConsumerService>());
}
}
Результат работы приложений (ссылка на репозиторий)