Параллелизм, асинхронность, многопоточность – Reactor почти всё сделает за Вас

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Цель/введение

Реактивные паттерны программирования становятся всё более востребованы при реализации высоконагруженных сервисов. Реактивные фреймворки предоставляют инструменты, позволяющие с минимальными затратами на кодирование использовать механизмы асинхронности и многопоточности.

В качестве примера, предлагаю рассмотреть реализацию сервис индексации данных в ElasticSearch. Данные хранятся в MongoDB, ключевые атрибуты которых синхронизируются с ElasticSearch (функционально похоже на Logstash). В проекте используется стек: Java/Spring Boot/Reactor/WebFlux/WebClient/RabbitMQ/MongoDB. На выбор RabbitMQ и MongoDB повлияло, в том числе, наличие реактивных драйверов.

Описание задачи

  1. Сервис должен принимать поток данных из очереди, выбирать связанные данные из базы и передавать их ElasticSearch. Формат данных очереди: действие (index/delete); id документа; имя индекса; тип индекса (опционально).

  2. Через web-интерфейс должен быть реализован функционал добавления, удаления и перестроения индекса.

  3. Должна быть возможность формирования агрегированных полей, содержащих данные из нескольких исходных полей документов, и добавление данных в индекс из связанных коллекций.

  4. Описание индексируемых данных должно быть в формате JSON.

DFD-диаграмма процесса индексации

Схема процесса индексации запросов, поступающих из очереди, выглядит следующим образом:

Алгоритм перестроения индекса выглядит практически также, за исключением того, что в нем отсутствует обработка запросов, отложенных из-за ошибок.

Описание функционала

Описание функционала коснется только работы реактивной части сервиса. Конфигурационные настройки, обработка формата описания индексируемых данных, формирование данных для запросов к ElasticSearch вынесены за рамки данной статьи, но вы можете посмотреть код на GitHub, по ссылке.

Теперь попробуем реализовать эту схему сквозным потоком Reactor, не используя подписки на отдельные элементы, в том числе отправку через WebClient HTTP-запросов и обработку полученных ответов. Отдадим, почти полностью, синхронизацию выполнения Reactor.

Код, запускающий процесс переиндексации выглядит следующим образом:

Task task = new Task(mongoElasticIndex);
ParallelFlux dataEventsFlux = reactorRepositoryMongoDB
        .findAll(mongoElasticIndex.getCollection(), mongoElasticIndex.getProjection())
        .parallel(appConfig.getIndexParallelism())
        .runOn(Schedulers.boundedElastic());
Flux<Tuple2<String,Document>> processingData = processingData(dataEventsFlux, (p) -> "index",
        (p) -> (Document)p,
        (p) -> mongoElasticIndex,
        Flux.just(),
        task);

task.setDispose(subscribe(processedData, task));

Получаем поток данных из коллекции, настраиваем параллелизм, формируем объект обработки потока и подписываемся на поток. Здесь класс Task – внутренний класс, назначение которого: собирать статистику и предоставлять информацию о выполняемых задачах индексации.

Метод processingData возвращает поток запросов и ответов, отправленных WebClient’ом:

private <T> Flux<Tuple2<String,Document>>
    processingData(ParallelFlux<T> events,
            Function<T, String> getAction,
                   Function<T, Document> getDocument,
                   Function<T, MongoElasticIndex> getMongoElasticIndex,
                   Flux<String> mergeFlux,
                   Task task) {
    return  events
            // Добавление данных к исходному документу из присоединяемых коллекций
        .transform(joinData(getDocument, getMongoElasticIndex))
            // Генерация данных для передачи в ElasticSearch
        .transform(document2ElasticJson(getAction, getDocument, getMongoElasticIndex))
        .sequential()
            // Агрегирование данных для _bulk
        .transform(grouping(task))
            // Добавление потока данных, на которые не получен ответ от ElasticSearch
        .mergeWith(mergeFlux)
            // Отправка запросов в ElasticSearch
        .transform(postBulk(task))
        .subscribeOn(Schedulers.single())
        .doOnNext(testAliveResponses(task))
        .doOnSubscribe(p-> p.request(appConfig.getMaxSizeBuffer() * 2))
        .doOnComplete(() -> { logger.info("Start: {} End: {} read {} write {}",
                formatDate(task.getStartDate()),
                formatDate(new Date()),
                task.getDocumentsRead(),
                task.getIndexesWrite(), getMaxProcessingRequest());
            fileStorage.writeCollection2Files(waitingForResponse);
            removeTask(task);
        });
}

Методом transform Reactor соединяем отдельные обработчики потоков. Здесь есть одно существенное ограничение: входящий и исходящий потоки должны быть однотипными (Flux или ParallelFlux). Нельзя, например, с помощью transform встроить обработчик у которого вход Flux, а выход ParallelFlux.

В метод subscribe сервиса инкапсулирована подписка на поток. Ниже приведена его реализация:

private Disposable subscribe(Flux<Tuple2<String,Document>> events, Task task) {
    return  events
        .subscribe(
            p -> {
                if(isNull(task.getMongoElasticIndex())) { // Если задача не переиндексация
                    waitingForResponse.remove(p.getT1());
                }
                int count = Optional.ofNullable(p.getT2().get("items", List.class))
                        .map(List::size)
                        .orElse(0);
                task.addIndexesWrite(count);
            },
            e -> {
                if(task != rabbitMQTask)removeTask(task);
                fileStorage.writeCollection2Files(waitingForResponse);
                logger.error("Error: {}", e.getMessage());
            }
        );
}

Далее коротко об отдельных функциях обработки потока.

Загрузка документов

Имеются два варианта загрузки:

  • Для всех документов основной коллекции индекса

ParallelFlux dataEventsFlux = reactorRepositoryMongoDB
        .findAll(mongoElasticIndex.getCollection(), mongoElasticIndex.getProjection())
        .parallel(appConfig.getIndexParallelism())
        .runOn(Schedulers.boundedElastic());

Метод findAll возвращает поток для всех документов коллекции. Parallel и runOn настраивают многопоточность для выборки и дальнейшей обработки.

  • Для единичного запроса, приходящему из очереди

ParallelFlux dataEventsFlux = reactiveQueue.inboundFlux()
        .parallel(appConfig.getIndexParallelism())
        .runOn(Schedulers.boundedElastic())
        .map(msg -> {
            IndexEvent indexEvent = reactiveQueue.msg2IndexEvent(msg);
            try {
                return CreateIndexItem(indexEvent);
            } catch (IllegalObjectIdException | IOException | ConvertDataException e) {
                logger.error("{} For message: {}", String.join(", ",throwable2ListMessage(e)),
                        new String(msg.getBody(), StandardCharsets.UTF_8));
                return new IndexItem(null, null, null);
            }
        })
        .filter(e -> nonNull(e.getAction()))
        .flatMap(item ->
            Flux.zip("delete".equals(item.getAction())
                        // Для операции удаления создаётся Document, содержащий _id удаляемого документа
                    ? Flux.just(new Document().append("_id", item.getIdDocument().get("_id")))
                        // Для операции обновления индекса Document загружается из базы данных
                    : reactorRepositoryMongoDB.find(
                        item.getMongoElasticIndex().getCollection(),
                        item.getIdDocument(),
                        item.getMongoElasticIndex().getProjection()),
                Flux.just(item)
            )
            .map(d -> new EventDocument(d.getT2().getAction(),
                    d.getT1(),
                    d.getT2().getMongoElasticIndex()))
        );

Метод inboundFlux интерфейса reactiveQueue возвращает поток для очереди. Parallel и runOn идентичны предыдущему варианту. Далее событие преобразуется из JSON в объект IndexEvent, по содержимому которого документ извлекаются из базы, или создаётся объект для удаления документа из ElasticSearch.

Добавление данных к исходному документу из присоединяемых коллекций

private <T> Function<ParallelFlux<T>, ParallelFlux<T>>
    joinData(Function<T, Document> getDocument,
            Function<T, MongoElasticIndex> getMongoElasticIndex) {
    return (ParallelFlux<T> items) ->
            items.flatMap(p -> {
            if(getDocument.apply(p).size() == 1) {
                return Flux.just(p);
            }
            return
                Flux.fromIterable(getMongoElasticIndex.apply((T) p).getJoinConditions(getDocument.apply(p)))
                        .flatMap(it -> Flux.zip(Flux.just(it.getCollection().getJoinedFieldName()),
                                reactorRepositoryMongoDB.find(getMongoElasticIndex.apply((T) p).getCollection(),
                                        it.getCondition(),
                                        it.getCollection().getProjection())))
                        .reduce(p, (acc, t) -> {
                            getDocument.apply(acc).put(t.getT1(), t.getT2());
                            return acc;
                        });
                }
        );
}

Метод joinData возвращает функциональный объект, добавляющий данные к исходному документу из документов присоединяемых коллекций. Использование flatMap и Flux.zip позволяет асинхронно запускать и обрабатывать потоки, в том числе и потоки, создаваемые запросами к базе данных mongodb. Все вопросы, связанные с синхронизацией, берет на себя Reactor.

Генерация JSON для ElasticSearch

private <T> Function<ParallelFlux<T>, ParallelFlux<String>>
    document2ElasticJson(
            Function<T, String> getAction,
            Function<T, Document> getDocument,
            Function<T, MongoElasticIndex> getMongoElasticIndex) {
    return (ParallelFlux<T> items) -> items.map(item -> {
        String elasticSend;
        try {
            Document document = getDocument.apply(item);
            MongoElasticIndex mongoElasticIndex = getMongoElasticIndex.apply(item);
            elasticSend = "delete".equals(getAction.apply(item))
                    ? mongoElasticIndex.deleteBuild(document)
                    : mongoElasticIndex.indexBuild(document);
        } catch (ConvertDataException e) {
            throw new RuntimeException(e);
        } catch (JsonProcessingException e) {
            throw new UncheckedIOException(e);
        }
        return elasticSend;
    });
}

Из полученного документа формируется JSON-объект модификации индекса в ElasticSearch. Контролируемые исключения приходится конвертировать в неконтролируемые.

Агрегирование данных для _bulk-запроса

Function<Flux<String>, Flux<String>> grouping(Task task) {
    return (Flux<String> source) -> source
            .bufferTimeout(appConfig.getMaxSizeBuffer(),
                    Duration.ofMillis(appConfig.getMaxDurationBuffer()))
            .doOnNext(p -> task.addDocumentsRead(p.size()))
            .map(p -> String.join("\n", p)
            );
}

Использование _bulk-запроса к ElasticSearch позволяет существенно снизить трафик и повысить производительность индексации. Объединение отправляемых данных несложно сделать при помощи bufferTimeout. Значениями максимального размера буфера и времени ожидания можно найти компромисс между оперативностью обновления данных в ElasticSearch, размером запроса и производительностью.

Отправка запросов ElasticSearch

public Function<Flux<String>, Flux<Tuple2<String, Document>>> postBulk(Task task) {
    return (Flux<String> source) -> source
        .flatMap(buffer -> {
            if(isNull(task.getMongoElasticIndex())) { // Если задача не переиндексация
                waitingForResponse.add(buffer);
            }
            return Flux.zip(Flux.just(buffer),
                webClientElastic.post()
                    .uri("/_bulk")
                    .body(BodyInserters.fromValue(buffer))
                    .retrieve()
                    .onStatus(httpStatus -> httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS),
                            response -> Mono.error(new HttpServiceException("System is overloaded",
                                    response.rawStatusCode())))
                    .onStatus(httpStatus -> httpStatus.is4xxClientError() && !httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS),
                            response -> Mono.error(new RuntimeException("API not found")))
                    .onStatus(HttpStatus::is5xxServerError,
                            response -> Mono.error(new HttpServiceException("Server is not responding",
                                    response.rawStatusCode())))
                    .bodyToFlux(Document.class)
                    .retryWhen(Retry.backoff(appConfig.getWebClientRetryMaxAttempts(),
                                Duration.ofSeconds(appConfig.getWebClientRetryMinBackoff()))
                        .filter(throwable -> throwable instanceof HttpServiceException)
                        .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
                            throw new HttpServiceException("External Service failed to process after max retries",
                                    HttpStatus.SERVICE_UNAVAILABLE.value());
                        }))
            );
        });

}

Создаётся поток, отправляющий через WebClient запросы к ElasticSearch. Поток, формируемый методом post WebClient’а, Flux.zip объединяет с запросом, это позволяет при обработке ответа связать полученный ответ с отправленным запросом. С помощью retryWhen, Retry.backoff настроена обработка некоторых ошибок.

Обработка ответов ElasticSearch

private Disposable subscribe(Flux<Tuple2<String,Document>> events, Task task) {
    return  events
        .subscribe(
            p -> {
                if(isNull(task.getMongoElasticIndex())) { // Если задача не переиндексация
                    waitingForResponse.remove(p.getT1());
                }
                int count = Optional.ofNullable(p.getT2().get("items", List.class))
                        .map(List::size)
                        .orElse(0);
                task.addIndexesWrite(count);
            },
            e -> {
                if(task != rabbitMQTask)removeTask(task);
                fileStorage.writeCollection2Files(waitingForResponse);
                logger.error("Error: {}", e.getMessage());
            }
        );
}

Обработка ответов ElasticSearch минимальна. Если ответ получен на контролируемый запрос (не запрос на переиндексацию), то запрос удаляется из множества запросов, для которых контролируется получение ответа. Ответы на переиндексацию не контролируются. В полученном ответе атрибут items должен быть списком, содержащим информацию об обработанных документах. На количество элементов в списке увеличивается счетчик обработанных документов.

Настройка WebClient

Основная часть настройки делается в конфигурационном классе, бин возвращает объект WebClient.Builder:

@Bean
@Qualifier("elastic")
public WebClient.Builder webClientWithTimeout() {
    final TcpClient tcpClient = TcpClient
            .create()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
            .doOnConnected(connection -> {
                connection.addHandlerLast(new ReadTimeoutHandler(timeout, TimeUnit.MILLISECONDS));
                connection.addHandlerLast(new WriteTimeoutHandler(timeout, TimeUnit.MILLISECONDS));
            });

    return WebClient.builder()
            .baseUrl(baseUrl +":" + port.toString())
            .filter(basicAuthentication(user, password))
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
}

В конструкторе сервиса добавляются фильтры, вызываемые при отправке запроса и получении ответа:

this.webClientElastic = webClientElastic
        .filter(onRequest())
        .filter(onResponse())
        .build();

Методы, возвращающие фильтры:

private ExchangeFilterFunction onRequest() {
    return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
        addSendRequest();
        int sleepCycleCount = 0;
        while (getProcessingRequest() > getMaxProcessingRequest()) {
            try {
                logger.info("Sleep: {} ProcessingRequest reached {} (MaxProcessingRequest {})", getSleepOverRequest(),
                        getProcessingRequest() - 1, getMaxProcessingRequest());
                sleep(getSleepOverRequest());
                if (sleepCycleCount++ > appConfig.getSleepCycleCountMax()) {
                    break;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
        return Mono.just(clientRequest);
    });
}

private ExchangeFilterFunction onResponse() {
    return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
        addReceiveResponse();
        logger.info("Response Status {}", clientResponse.statusCode());
        return Mono.just(clientResponse);
    });
}

Фильтры выводят информацию об отправке запросов, получении ответов и модифицируют счетчики отправленных запросов и полученных ответов. Перед отправкой запроса, если превышено количество не полученных ответов, процесс “засыпает” на некоторое время.

Настройка среды выполнения

Для того чтобы запустить этот сервис нам нужны: rabbitmq, mongodb и elasticsearch. Всё это проще установить в Docker. Ещё в самом начале проекта установил Docker Desktop и настроил контейнеры для запуска нужных cервисов. Как это делается можно посмотреть, например, в этой статье. По аналогии установил rabbitmq, mongodb. Добавил конфигурационные файлы и внес изменения в файл docker-compose.yml. Получившиеся настройки Docker можно найти в папке проекта docker-elk. Ниже скриншот запущенного контейнера:

Запуск сервиса

Для тестирования загрузил в базу mongodb 1000 документов. Из Postman и отправляю запрос:

В полученном логе видно, что обработка выполняется в разных потоках:

2022-11-02 15:23:17.396  INFO 8336 --- [       Thread-6] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.418  INFO 8336 --- [      Thread-42] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.447  INFO 8336 --- [       Thread-5] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.462  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.475  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.477  INFO 8336 --- [      Thread-33] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:855, serverValue:83}] to localhost:27017
2022-11-02 15:23:17.484  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.485  INFO 8336 --- [       Thread-5] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.489  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.493  INFO 8336 --- [       Thread-4] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:856, serverValue:84}] to localhost:27017
2022-11-02 15:23:17.566  INFO 8336 --- [       Thread-6] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.588  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.622  INFO 8336 --- [       Thread-4] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.651  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.651  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.658  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.658  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.673  INFO 8336 --- [      Thread-52] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:940, serverValue:85}] to localhost:27017
2022-11-02 15:23:17.676  INFO 8336 --- [       Thread-7] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:941, serverValue:86}] to localhost:27017
2022-11-02 15:23:17.715  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.738  INFO 8336 --- [       Thread-5] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.748  INFO 8336 --- [      Thread-34] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.772  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.790  INFO 8336 --- [      Thread-34] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.811  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.818  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.849  INFO 8336 --- [      Thread-36] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:993, serverValue:87}] to localhost:27017
2022-11-02 15:23:17.851  INFO 8336 --- [      Thread-36] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:994, serverValue:88}] to localhost:27017
2022-11-02 15:23:17.895  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.898  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.911  INFO 8336 --- [       Thread-6] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.926  INFO 8336 --- [      Thread-36] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.944  INFO 8336 --- [      Thread-36] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.966  INFO 8336 --- [      Thread-36] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.993  INFO 8336 --- [      Thread-36] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1078, serverValue:90}] to localhost:27017
2022-11-02 15:23:18.002  INFO 8336 --- [      Thread-48] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1079, serverValue:89}] to localhost:27017
2022-11-02 15:23:18.041  INFO 8336 --- [      Thread-34] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.059  INFO 8336 --- [      Thread-31] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:18.076  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:18.083  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.096  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.135  INFO 8336 --- [      Thread-34] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1156, serverValue:92}] to localhost:27017
2022-11-02 15:23:18.138  INFO 8336 --- [      Thread-39] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1155, serverValue:91}] to localhost:27017
2022-11-02 15:23:18.140  INFO 8336 --- [      Thread-39] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:18.180  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.180  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.181  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Start: 2022-11-02 15:23:17.250 End: 2022-11-02 15:23:18.181 read 1000 write 1000

Теперь проверим, что загрузилось.

Запрос к ElasticSearch показывает наличие индекса с 1000 документами:

И попробуем найти что-то в ElasticSearch:

Получен ответ ElasticSearch с найденным документом!

Заключение

В этом материале мне хотелось привести пример сервиса, реализованного с использованием Spring Boot, WebFlux, WebClient, Reactor - надеюсь, что у меня это получилось.

Несколько выводов:

  1. Реактивные фреймворки, в том числе и Reactor, делает за нас существенную часть работы по реализации асинхронных многопоточных алгоритмов, позволяя сосредоточиться на предметной области.

  2. С их помощью можно, достаточно просто, создавать высоконагруженные сервисы.

  3. Для получения максимального эффекта от перехода на реактивные паттерны программирование нужно чтобы вся цепочка вычислений была реактивной, начиная с драйверов доступа к базам данных, очередям, файлам и т.д.

Ещё раз, репозиторий с кодом и настройками находится здесь

Несколько ссылок на используемые материалы:

Шпаргалка по Spring Boot WebClient

Reactive Programming: Reactor и Spring WebFlux — часть 2

Реактивное программирование со Spring, часть 2 Project Reactor

Источник: https://habr.com/ru/post/699112/


Интересные статьи

Интересные статьи

Я изучаю Битрикс где-то пару недель. Зачем?.. Хотелось чего-то новенького, тут подвернулась учёба. Даром, с наставниками, с возможным трудоустройством дальше хотя бы на пару месяцев - на испытательный...
На текущей работе пишем на Reactor. Технология классная, но как всегда есть много НО. Некоторые вещи раздражают, код сложнее писать и читать, с ThreadLocal совсем беда. Р...
Предыстория Когда-то у меня возникла необходимость проверять наличие неотправленных сообщений в «1С-Битрикс: Управление сайтом» (далее Битрикс) и получать уведомления об этом. Пробле...
Многие знают, что ABBYY занимается обработкой и извлечением данных из разных документов. Но у наших продуктов есть и другие интересные возможности. В частности, с помощью решения ABBY...
Я хотел бы поделиться воспоминаниями о своих попытках сделать «3D графический движок» «своими руками» практически в до-интернетную эпоху (он как бы уже где-то там конечно был, но у меня его факти...