First touch of Kafka

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем начать я бы хотел отметить, что это всего лишь небольшой туториал по быстрому старту для тех кто, как и я, ни разу не использовал Kafka на практике

И так приступим!

Единственный брокер Kafka и необходимый для его работы ZooKeeper я буду запускать в Docker

Сперва создам отдельную сеть kafkanet

docker network create kafkanet

Запуск контейнера с ZooKeeper

docker run -d --network=kafkanet --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper

Запуск контейнера с Kafka

docker run -d --network=kafkanet --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka

Для того чтобы убедиться в отсутствии ошибок, можно вывести лог docker logs kafka

Далее проверю функционирование брокера Kafka, выполнив простые операции, включающие создание тестовой темы, генерацию сообщений и их потребление

Для этого сценария подключусь к контейнеру kafka

docker exec -it kafka bash

Создам топик demo-topic

/bin/kafka-topics --create --topic demo-topic --bootstrap-server kafka:9092

Выведу список всех топиков

/bin/kafka-topics --list --zookeeper zookeeper:2181

И выведу описание созданного топика

/bin/kafka-topics --describe --topic demo-topic --bootstrap-server kafka:9092

Сгенерирую несколько сообщений

/bin/kafka-console-producer --topic demo-topic --bootstrap-server kafka:9092

И после прочитаю эти сообщения

/bin/kafka-console-consumer --topic demo-topic --from-beginning --bootstrap-server kafka:9092

Далее я создам два небольших .NET приложения: KafkaProducer, которое будет генерировать сообщения, и KafkaConsumer, которое будет потреблять сообщения. Для реализации мне понадобятся пакеты Confluent.Kafka и Microsoft.Extensions.Hosting

В проект KafkaProducer добавлю класс KafkaProducerService

using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;

namespace KafkaProducer
{
    public class KafkaProducerService : IHostedService
    {
        private readonly ILogger<KafkaProducerService> _logger;
        private readonly IProducer<Null, string> _producer;

        public KafkaProducerService(ILogger<KafkaProducerService> logger)
        {
            _logger = logger;
            var config = new ProducerConfig
            {
                BootstrapServers = "localhost:9092"
            };
            _producer = new ProducerBuilder<Null, string>(config).Build();
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            for (var i = 0; i < 5; i++)
            {
                var value = $"Event N {i}";
                _logger.LogInformation($"Sending >> {value}");
                await _producer.ProduceAsync(
                    "demo-topic",
                    new Message<Null, string> { Value = value },
                    cancellationToken);
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _producer?.Dispose();
            _logger.LogInformation($"{nameof(KafkaProducerService)} stopped");
            return Task.CompletedTask;
        }
    }
}

Изменю файл Program.cs

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;

namespace KafkaProducer
{
    class Program
    {
        static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
            Console.ReadKey();
        }

        private static IHostBuilder CreateHostBuilder(string[] args) =>
            Host
                .CreateDefaultBuilder(args)
                .ConfigureServices((context, collection) =>
                    collection.AddHostedService<KafkaProducerService>());
    }
}

В проект KafkaConsumer добавлю класс KafkaConsumerService

using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;

namespace KafkaConsumer
{
    public class KafkaConsumerService : IHostedService
    {
        private readonly ILogger<KafkaConsumerService> _logger;
        private readonly IConsumer<Ignore, string> _consumer;

        public KafkaConsumerService(ILogger<KafkaConsumerService> logger)
        {
            _logger = logger;
            var config = new ConsumerConfig
            {
                BootstrapServers = "localhost:9092",
                GroupId = "demo-group",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
            _consumer = new ConsumerBuilder<Ignore, string>(config).Build();
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _consumer.Subscribe("demo-topic");
            while (!cancellationToken.IsCancellationRequested)
            {
                var consumeResult = _consumer.Consume(cancellationToken);
                _logger.LogInformation($"Received >> {consumeResult.Message.Value}");
            }
            return Task.CompletedTask;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _consumer?.Dispose();
            _logger.LogInformation($"{nameof(KafkaConsumerService)} stopped");
            return Task.CompletedTask;
        }
    }
}

Изменю файл Program.cs

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;

namespace KafkaConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
            Console.ReadKey();
        }

        private static IHostBuilder CreateHostBuilder(string[] args) =>
            Host
                .CreateDefaultBuilder(args)
                .ConfigureServices((context, collection) =>
                    collection.AddHostedService<KafkaConsumerService>());
    }
}

Результат работы приложений (ссылка на репозиторий)

Источник: https://habr.com/ru/post/543732/


Интересные статьи

Интересные статьи

Введение Для анализа потоковых данных необходимы источники этих данных. Так же важна сама информация, которая предоставляется источниками. А источники с текстовой информацией, к примеру,...
Привет, Хабр! Меня зовут Саша, я лид-разработчик в GlowByte Consulting. Мы с командой сделали неплохой стриминговый движок для одного крупного банка. Сейчас в продакшене крутится о...
Компании растут и меняются. Если для небольшого бизнеса легко прогнозировать последствия любых изменений, то у крупного для такого предвидения — необходимо изучение деталей.
Получить трафик для интернет-магазина сегодня не проблема. Есть много каналов его привлечения: органическая выдача, контекстная реклама, контент-маркетинг, RTB-сети и т. д. Вопрос в том, как вы распор...
На сегодняшний день у сервиса «Битрикс24» нет сотен гигабит трафика, нет огромного парка серверов (хотя и существующих, конечно, немало). Но для многих клиентов он является основным инструментом ...