Организация обработки асинхронных событий с Spring Events и Spring AMQP

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Привет, Хабр!

Асинхронная обработка событий – один из базовых инструментов на сегодняшний день, позволяющий создавать масштабируемые и отзывчивые приложения. Сегодня мы рассмотрим два инструмента из Spring Framework – Spring Events и Spring AMQP, которые помогают управлять асинхронными задачами.

Spring Events

Spring Events – это встроенный механизм для публикации и обработки событий внутри приложения. Spring Events основаны на паттерне проектирования "издатель-подписчик".

Издатель (writer) -подписчик(reader)
Издатель (writer) -подписчик(reader)

Для работы с событиями потребуется три основных компонента:

  1. Класс события: описывает само событие.

  2. Издатель события: компонент, который генерирует и публикует события.

  3. Подписчик на событие: компонент, который обрабатывает опубликованные события.

Начнем с создания класса события. Этот класс должен наследоваться от ApplicationEvent:

public class CustomSpringEvent extends ApplicationEvent {
    private String message;

    public CustomSpringEvent(Object source, String message) {
        super(source);
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
}

Для публикации события потребуется класс-издатель, который будет использовать ApplicationEventPublisher для публикации событий:

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;

@Component
public class CustomSpringEventPublisher implements ApplicationEventPublisherAware {

    private ApplicationEventPublisher applicationEventPublisher;

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public void publishEvent(final String message) {
        System.out.println("Publishing custom event. ");
        CustomSpringEvent customSpringEvent = new CustomSpringEvent(this, message);
        applicationEventPublisher.publishEvent(customSpringEvent);
    }
}

Теперь создадим компонент, который будет обрабатывать события. Для этого можно использовать аннотацию @EventListener:

import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class CustomSpringEventListener {

    @EventListener
    public void handleCustomSpringEvent(CustomSpringEvent event) {
        System.out.println("Received spring custom event - " + event.getMessage());
    }
}

Чтобы события обрабатывались асинхронно нужно включить поддержку асинхронности в конфигурации Spring и пометить метод-обработчик аннотацией @Async.

Для этого необходимо добавить аннотацию @EnableAsync в конфигурационный класс:

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

@Configuration
@EnableAsync
public class AsyncConfig {
}

Теперь изменим слушатель событий, чтобы он обрабатывал события асинхронно:

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.context.event.EventListener;

@Component
public class CustomSpringEventListener {

    @Async
    @EventListener
    public void handleCustomSpringEvent(CustomSpringEvent event) {
        System.out.println("Received spring custom event - " + event.getMessage());
        // доп. логика обработки события
    }
}

Примеры использования

Кэширование

Предположим, есть приложение, которое кэширует данные. Когда данные изменяются, нужно обновить кэш. Можно юзать событие для уведомления всех компонентов о необходимости обновления кэша:

// событие изменения данных
public class DataChangeEvent extends ApplicationEvent {
    private String dataId;

    public DataChangeEvent(Object source, String dataId) {
        super(source);
        this.dataId = dataId;
    }

    public String getDataId() {
        return dataId;
    }
}

// публикация события изменения данных
@Component
public class DataService {
    @Autowired
    private ApplicationEventPublisher eventPublisher;

    public void updateData(String dataId) {
        // логика обновления данных
        eventPublisher.publishEvent(new DataChangeEvent(this, dataId));
    }
}

// обработчик события изменения данных
@Component
public class CacheService {

    @EventListener
    @Async
    public void handleDataChangeEvent(DataChangeEvent event) {
        System.out.println("Updating cache for dataId: " + event.getDataId());
        // логика обновления кэша
    }
}

Уведомление о завершении задач

Для этой задачи можно использовать событие для уведомления других компонентов о завершении длительных задач:

// событие завершения задачи
public class TaskCompleteEvent extends ApplicationEvent {
    private String taskId;

    public TaskCompleteEvent(Object source, String taskId) {
        super(source);
        this.taskId = taskId;
    }

    public String getTaskId() {
        return taskId;
    }
}

// публикация события завершения задачи
@Component
public class TaskService {
    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Async
    public void executeTask(String taskId) {
        // логика выполнения задачи
        eventPublisher.publishEvent(new TaskCompleteEvent(this, taskId));
    }
}

// обработчик события завершения задачи
@Component
public class NotificationService {

    @EventListener
    @Async
    public void handleTaskCompleteEvent(TaskCompleteEvent event) {
        System.out.println("Task completed: " + event.getTaskId());
        // логика уведомления
    }
}

Организация обработки сообщений с помощью Spring AMQP

AMQP — это протокол уровня приложений для мидлвэр-систем, предназначенных для передачи сообщений. Имеет функциональные возможности для межпрограммного взаимодействия и включает концепции очередей, маршрутизации, обменов и привязок.

RabbitMQ — это брокер сообщений, реализующий AMQP. Он позволяет обмениваться сообщениями и управлять их очередями.

Для старта работ с Spring AMQP в проекте Spring Boot необходимо добавить соответствующую зависимость в файл pom.xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Далее необходимо настроить параметры подключения к RabbitMQ в файле application.properties.

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

Создание и отправка сообщений с помощью AmqpTemplate

Для отправки сообщений используется AmqpTemplate.

Для начала создадим конфигурационный класс, который определит все необходимые бины:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    static final String queueName = "myQueue";

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

Теперь создадим компонент, который будет отправлять сообщения:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Sender {

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public Sender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void send(String message) {
        rabbitTemplate.convertAndSend(RabbitConfig.queueName, message);
        System.out.println("Sent: " + message);
    }
}

Для обработки сообщений, поступающих в очередь, используется аннотация @RabbitListener.

Добавим компонент, который будет обрабатывать входящие сообщения:

import org.springframework.stereotype.Component;

@Component
public class Receiver {

    public void receiveMessage(String message) {
        System.out.println("Received: " + message);
    }
}

С аннтоацией @RabbitListener можно декларативно указать методы, которые будут обрабатывать сообщения из очередей:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class MessageListener {

    @RabbitListener(queues = RabbitConfig.queueName)
    public void processMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

Пример

Полный пример с отправкой и получением сообщений:

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableRabbit
public class SpringAmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringAmqpApplication.class, args);
    }

    @Bean
    CommandLineRunner runner(Sender sender) {
        return args -> {
            System.out.println("Sending message...");
            sender.send("Hello, RabbitMQ!");
        };
    }
}

Примеры использования двух инструментов вместе

Обработка заказов с локальными событиями и асинхронной передачей

В этом примере система обрабатывает заказы локально, публикует событие о новом заказе с помощью Spring Events и затем отправляет заказ на дальнейшую обработку в другую службу через RabbitMQ:

// событие заказа
public class OrderCreatedEvent extends ApplicationEvent {
    private String orderId;

    public OrderCreatedEvent(Object source, String orderId) {
        super(source);
        this.orderId = orderId;
    }

    public String getOrderId() {
        return orderId;
    }
}

// публикация события заказа
@Component
public class OrderService {
    @Autowired
    private ApplicationEventPublisher eventPublisher;

    public void createOrder(String orderId) {
        // логика создания заказа
        System.out.println("Order created: " + orderId);
        eventPublisher.publishEvent(new OrderCreatedEvent(this, orderId));
    }
}

// обработка локального события и отправка в RabbitMQ
@Component
public class OrderCreatedListener {
    @Autowired
    private AmqpTemplate amqpTemplate;

    @EventListener
    @Async
    public void handleOrderCreatedEvent(OrderCreatedEvent event) {
        System.out.println("Handling order created event for order: " + event.getOrderId());
        // доп. логика обработки
        amqpTemplate.convertAndSend("orderQueue", event.getOrderId());
    }
}

// конфигурация RabbitMQ
@Configuration
public class RabbitConfig {
    @Bean
    public Queue orderQueue() {
        return new Queue("orderQueue", false);
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}

Обновление состояния задач и уведомление через RabbitMQ

При обновлении состояния задачи генерируется локальное событие, которое затем отправляется в очередь RabbitMQ для уведомления других микросервисов о статусе задачи:

// событие обновления задачи
public class TaskStatusUpdateEvent extends ApplicationEvent {
    private String taskId;
    private String status;

    public TaskStatusUpdateEvent(Object source, String taskId, String status) {
        super(source);
        this.taskId = taskId;
        this.status = status;
    }

    public String getTaskId() {
        return taskId;
    }

    public String getStatus() {
        return status;
    }
}

// публикация события обновления задачи
@Component
public class TaskService {
    @Autowired
    private ApplicationEventPublisher eventPublisher;

    public void updateTaskStatus(String taskId, String status) {
        System.out.println("Updating task status: " + taskId + " to " + status);
        eventPublisher.publishEvent(new TaskStatusUpdateEvent(this, taskId, status));
    }
}

// обработка события обновления задачи и отправка в RabbitMQ
@Component
public class TaskStatusUpdateListener {
    @Autowired
    private AmqpTemplate amqpTemplate;

    @EventListener
    @Async
    public void handleTaskStatusUpdateEvent(TaskStatusUpdateEvent event) {
        System.out.println("Sending task status update to RabbitMQ: " + event.getTaskId() + " - " + event.getStatus());
        amqpTemplate.convertAndSend("taskStatusQueue", event.getTaskId() + " - " + event.getStatus());
    }
}

// конфигурация RabbitMQ
@Configuration
public class RabbitConfig {
    @Bean
    public Queue taskStatusQueue() {
        return new Queue("taskStatusQueue", false);
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}

В заключение напоминаю об открытых уроках, которые пройдут в рамках курса «Разработчик на Spring Framework» в Otus:

  • 1 июля: Тестирование Spring приложений. Интеграционные тесты с контекстом. Тестирование слоя репозиториев и сервисов. Запись по ссылке

  • 16 июля: Тестирование Spring приложений. Интеграционные тесты контроллеров, интеграций с внешними API и безопасности. Запись по ссылке

Источник: https://habr.com/ru/companies/otus/articles/825528/


Интересные статьи

Интересные статьи

MTS Web Services (MWS) и компания Directum, разработчик интеллектуальных продуктов для управления цифровыми бизнес-процессами и документами, представляют новую систему с ИИ-инструментами для обработки...
Всем привет, данная статья является, своего рода моей первой, но все же постараюсь максимально просто рассказать вам о том, как создать бота, прикрутив к нему все обещанные выше свистелки-тарахтелки.С...
Четкое разделение бизнес логики с другими сквозными задачами является обязательным условием для создания чистого и читабельного кода. И говоря о сквозных задачах я имею ввиду управление транзакциями, ...
А что, если я скажу, что подобное #application.properties spring.datasource.url=${SPRING_DATASOURCE_URL}?someProperty=${PROPERTY} содержит ошибку. Не согласны? Разбор под катом.
Сегодня, в пятом уроке курса по Vue.js для начинающих, речь пойдёт о том, как обрабатывать события. → Vue.js для начинающих, урок 1: экземпляр Vue → Vue.js для начинающих, уро...