Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
Цель/введение
Реактивные паттерны программирования становятся всё более востребованы при реализации высоконагруженных сервисов. Реактивные фреймворки предоставляют инструменты, позволяющие с минимальными затратами на кодирование использовать механизмы асинхронности и многопоточности.
В качестве примера, предлагаю рассмотреть реализацию сервис индексации данных в ElasticSearch. Данные хранятся в MongoDB, ключевые атрибуты которых синхронизируются с ElasticSearch (функционально похоже на Logstash). В проекте используется стек: Java/Spring Boot/Reactor/WebFlux/WebClient/RabbitMQ/MongoDB. На выбор RabbitMQ и MongoDB повлияло, в том числе, наличие реактивных драйверов.
Описание задачи
Сервис должен принимать поток данных из очереди, выбирать связанные данные из базы и передавать их ElasticSearch. Формат данных очереди: действие (index/delete); id документа; имя индекса; тип индекса (опционально).
Через web-интерфейс должен быть реализован функционал добавления, удаления и перестроения индекса.
Должна быть возможность формирования агрегированных полей, содержащих данные из нескольких исходных полей документов, и добавление данных в индекс из связанных коллекций.
Описание индексируемых данных должно быть в формате JSON.
DFD-диаграмма процесса индексации
Схема процесса индексации запросов, поступающих из очереди, выглядит следующим образом:
Алгоритм перестроения индекса выглядит практически также, за исключением того, что в нем отсутствует обработка запросов, отложенных из-за ошибок.
Описание функционала
Описание функционала коснется только работы реактивной части сервиса. Конфигурационные настройки, обработка формата описания индексируемых данных, формирование данных для запросов к ElasticSearch вынесены за рамки данной статьи, но вы можете посмотреть код на GitHub, по ссылке.
Теперь попробуем реализовать эту схему сквозным потоком Reactor, не используя подписки на отдельные элементы, в том числе отправку через WebClient HTTP-запросов и обработку полученных ответов. Отдадим, почти полностью, синхронизацию выполнения Reactor.
Код, запускающий процесс переиндексации выглядит следующим образом:
Task task = new Task(mongoElasticIndex);
ParallelFlux dataEventsFlux = reactorRepositoryMongoDB
.findAll(mongoElasticIndex.getCollection(), mongoElasticIndex.getProjection())
.parallel(appConfig.getIndexParallelism())
.runOn(Schedulers.boundedElastic());
Flux<Tuple2<String,Document>> processingData = processingData(dataEventsFlux, (p) -> "index",
(p) -> (Document)p,
(p) -> mongoElasticIndex,
Flux.just(),
task);
task.setDispose(subscribe(processedData, task));
Получаем поток данных из коллекции, настраиваем параллелизм, формируем объект обработки потока и подписываемся на поток. Здесь класс Task – внутренний класс, назначение которого: собирать статистику и предоставлять информацию о выполняемых задачах индексации.
Метод processingData возвращает поток запросов и ответов, отправленных WebClient’ом:
private <T> Flux<Tuple2<String,Document>>
processingData(ParallelFlux<T> events,
Function<T, String> getAction,
Function<T, Document> getDocument,
Function<T, MongoElasticIndex> getMongoElasticIndex,
Flux<String> mergeFlux,
Task task) {
return events
// Добавление данных к исходному документу из присоединяемых коллекций
.transform(joinData(getDocument, getMongoElasticIndex))
// Генерация данных для передачи в ElasticSearch
.transform(document2ElasticJson(getAction, getDocument, getMongoElasticIndex))
.sequential()
// Агрегирование данных для _bulk
.transform(grouping(task))
// Добавление потока данных, на которые не получен ответ от ElasticSearch
.mergeWith(mergeFlux)
// Отправка запросов в ElasticSearch
.transform(postBulk(task))
.subscribeOn(Schedulers.single())
.doOnNext(testAliveResponses(task))
.doOnSubscribe(p-> p.request(appConfig.getMaxSizeBuffer() * 2))
.doOnComplete(() -> { logger.info("Start: {} End: {} read {} write {}",
formatDate(task.getStartDate()),
formatDate(new Date()),
task.getDocumentsRead(),
task.getIndexesWrite(), getMaxProcessingRequest());
fileStorage.writeCollection2Files(waitingForResponse);
removeTask(task);
});
}
Методом transform Reactor соединяем отдельные обработчики потоков. Здесь есть одно существенное ограничение: входящий и исходящий потоки должны быть однотипными (Flux или ParallelFlux). Нельзя, например, с помощью transform встроить обработчик у которого вход Flux, а выход ParallelFlux.
В метод subscribe сервиса инкапсулирована подписка на поток. Ниже приведена его реализация:
private Disposable subscribe(Flux<Tuple2<String,Document>> events, Task task) {
return events
.subscribe(
p -> {
if(isNull(task.getMongoElasticIndex())) { // Если задача не переиндексация
waitingForResponse.remove(p.getT1());
}
int count = Optional.ofNullable(p.getT2().get("items", List.class))
.map(List::size)
.orElse(0);
task.addIndexesWrite(count);
},
e -> {
if(task != rabbitMQTask)removeTask(task);
fileStorage.writeCollection2Files(waitingForResponse);
logger.error("Error: {}", e.getMessage());
}
);
}
Далее коротко об отдельных функциях обработки потока.
Загрузка документов
Имеются два варианта загрузки:
Для всех документов основной коллекции индекса
ParallelFlux dataEventsFlux = reactorRepositoryMongoDB
.findAll(mongoElasticIndex.getCollection(), mongoElasticIndex.getProjection())
.parallel(appConfig.getIndexParallelism())
.runOn(Schedulers.boundedElastic());
Метод findAll возвращает поток для всех документов коллекции. Parallel и runOn настраивают многопоточность для выборки и дальнейшей обработки.
Для единичного запроса, приходящему из очереди
ParallelFlux dataEventsFlux = reactiveQueue.inboundFlux()
.parallel(appConfig.getIndexParallelism())
.runOn(Schedulers.boundedElastic())
.map(msg -> {
IndexEvent indexEvent = reactiveQueue.msg2IndexEvent(msg);
try {
return CreateIndexItem(indexEvent);
} catch (IllegalObjectIdException | IOException | ConvertDataException e) {
logger.error("{} For message: {}", String.join(", ",throwable2ListMessage(e)),
new String(msg.getBody(), StandardCharsets.UTF_8));
return new IndexItem(null, null, null);
}
})
.filter(e -> nonNull(e.getAction()))
.flatMap(item ->
Flux.zip("delete".equals(item.getAction())
// Для операции удаления создаётся Document, содержащий _id удаляемого документа
? Flux.just(new Document().append("_id", item.getIdDocument().get("_id")))
// Для операции обновления индекса Document загружается из базы данных
: reactorRepositoryMongoDB.find(
item.getMongoElasticIndex().getCollection(),
item.getIdDocument(),
item.getMongoElasticIndex().getProjection()),
Flux.just(item)
)
.map(d -> new EventDocument(d.getT2().getAction(),
d.getT1(),
d.getT2().getMongoElasticIndex()))
);
Метод inboundFlux интерфейса reactiveQueue возвращает поток для очереди. Parallel и runOn идентичны предыдущему варианту. Далее событие преобразуется из JSON в объект IndexEvent, по содержимому которого документ извлекаются из базы, или создаётся объект для удаления документа из ElasticSearch.
Добавление данных к исходному документу из присоединяемых коллекций
private <T> Function<ParallelFlux<T>, ParallelFlux<T>>
joinData(Function<T, Document> getDocument,
Function<T, MongoElasticIndex> getMongoElasticIndex) {
return (ParallelFlux<T> items) ->
items.flatMap(p -> {
if(getDocument.apply(p).size() == 1) {
return Flux.just(p);
}
return
Flux.fromIterable(getMongoElasticIndex.apply((T) p).getJoinConditions(getDocument.apply(p)))
.flatMap(it -> Flux.zip(Flux.just(it.getCollection().getJoinedFieldName()),
reactorRepositoryMongoDB.find(getMongoElasticIndex.apply((T) p).getCollection(),
it.getCondition(),
it.getCollection().getProjection())))
.reduce(p, (acc, t) -> {
getDocument.apply(acc).put(t.getT1(), t.getT2());
return acc;
});
}
);
}
Метод joinData возвращает функциональный объект, добавляющий данные к исходному документу из документов присоединяемых коллекций. Использование flatMap и Flux.zip позволяет асинхронно запускать и обрабатывать потоки, в том числе и потоки, создаваемые запросами к базе данных mongodb. Все вопросы, связанные с синхронизацией, берет на себя Reactor.
Генерация JSON для ElasticSearch
private <T> Function<ParallelFlux<T>, ParallelFlux<String>>
document2ElasticJson(
Function<T, String> getAction,
Function<T, Document> getDocument,
Function<T, MongoElasticIndex> getMongoElasticIndex) {
return (ParallelFlux<T> items) -> items.map(item -> {
String elasticSend;
try {
Document document = getDocument.apply(item);
MongoElasticIndex mongoElasticIndex = getMongoElasticIndex.apply(item);
elasticSend = "delete".equals(getAction.apply(item))
? mongoElasticIndex.deleteBuild(document)
: mongoElasticIndex.indexBuild(document);
} catch (ConvertDataException e) {
throw new RuntimeException(e);
} catch (JsonProcessingException e) {
throw new UncheckedIOException(e);
}
return elasticSend;
});
}
Из полученного документа формируется JSON-объект модификации индекса в ElasticSearch. Контролируемые исключения приходится конвертировать в неконтролируемые.
Агрегирование данных для _bulk-запроса
Function<Flux<String>, Flux<String>> grouping(Task task) {
return (Flux<String> source) -> source
.bufferTimeout(appConfig.getMaxSizeBuffer(),
Duration.ofMillis(appConfig.getMaxDurationBuffer()))
.doOnNext(p -> task.addDocumentsRead(p.size()))
.map(p -> String.join("\n", p)
);
}
Использование _bulk-запроса к ElasticSearch позволяет существенно снизить трафик и повысить производительность индексации. Объединение отправляемых данных несложно сделать при помощи bufferTimeout. Значениями максимального размера буфера и времени ожидания можно найти компромисс между оперативностью обновления данных в ElasticSearch, размером запроса и производительностью.
Отправка запросов ElasticSearch
public Function<Flux<String>, Flux<Tuple2<String, Document>>> postBulk(Task task) {
return (Flux<String> source) -> source
.flatMap(buffer -> {
if(isNull(task.getMongoElasticIndex())) { // Если задача не переиндексация
waitingForResponse.add(buffer);
}
return Flux.zip(Flux.just(buffer),
webClientElastic.post()
.uri("/_bulk")
.body(BodyInserters.fromValue(buffer))
.retrieve()
.onStatus(httpStatus -> httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS),
response -> Mono.error(new HttpServiceException("System is overloaded",
response.rawStatusCode())))
.onStatus(httpStatus -> httpStatus.is4xxClientError() && !httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS),
response -> Mono.error(new RuntimeException("API not found")))
.onStatus(HttpStatus::is5xxServerError,
response -> Mono.error(new HttpServiceException("Server is not responding",
response.rawStatusCode())))
.bodyToFlux(Document.class)
.retryWhen(Retry.backoff(appConfig.getWebClientRetryMaxAttempts(),
Duration.ofSeconds(appConfig.getWebClientRetryMinBackoff()))
.filter(throwable -> throwable instanceof HttpServiceException)
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
throw new HttpServiceException("External Service failed to process after max retries",
HttpStatus.SERVICE_UNAVAILABLE.value());
}))
);
});
}
Создаётся поток, отправляющий через WebClient запросы к ElasticSearch. Поток, формируемый методом post WebClient’а, Flux.zip объединяет с запросом, это позволяет при обработке ответа связать полученный ответ с отправленным запросом. С помощью retryWhen, Retry.backoff настроена обработка некоторых ошибок.
Обработка ответов ElasticSearch
private Disposable subscribe(Flux<Tuple2<String,Document>> events, Task task) {
return events
.subscribe(
p -> {
if(isNull(task.getMongoElasticIndex())) { // Если задача не переиндексация
waitingForResponse.remove(p.getT1());
}
int count = Optional.ofNullable(p.getT2().get("items", List.class))
.map(List::size)
.orElse(0);
task.addIndexesWrite(count);
},
e -> {
if(task != rabbitMQTask)removeTask(task);
fileStorage.writeCollection2Files(waitingForResponse);
logger.error("Error: {}", e.getMessage());
}
);
}
Обработка ответов ElasticSearch минимальна. Если ответ получен на контролируемый запрос (не запрос на переиндексацию), то запрос удаляется из множества запросов, для которых контролируется получение ответа. Ответы на переиндексацию не контролируются. В полученном ответе атрибут items должен быть списком, содержащим информацию об обработанных документах. На количество элементов в списке увеличивается счетчик обработанных документов.
Настройка WebClient
Основная часть настройки делается в конфигурационном классе, бин возвращает объект WebClient.Builder:
@Bean
@Qualifier("elastic")
public WebClient.Builder webClientWithTimeout() {
final TcpClient tcpClient = TcpClient
.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
.doOnConnected(connection -> {
connection.addHandlerLast(new ReadTimeoutHandler(timeout, TimeUnit.MILLISECONDS));
connection.addHandlerLast(new WriteTimeoutHandler(timeout, TimeUnit.MILLISECONDS));
});
return WebClient.builder()
.baseUrl(baseUrl +":" + port.toString())
.filter(basicAuthentication(user, password))
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
}
В конструкторе сервиса добавляются фильтры, вызываемые при отправке запроса и получении ответа:
this.webClientElastic = webClientElastic
.filter(onRequest())
.filter(onResponse())
.build();
Методы, возвращающие фильтры:
private ExchangeFilterFunction onRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
addSendRequest();
int sleepCycleCount = 0;
while (getProcessingRequest() > getMaxProcessingRequest()) {
try {
logger.info("Sleep: {} ProcessingRequest reached {} (MaxProcessingRequest {})", getSleepOverRequest(),
getProcessingRequest() - 1, getMaxProcessingRequest());
sleep(getSleepOverRequest());
if (sleepCycleCount++ > appConfig.getSleepCycleCountMax()) {
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
return Mono.just(clientRequest);
});
}
private ExchangeFilterFunction onResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
addReceiveResponse();
logger.info("Response Status {}", clientResponse.statusCode());
return Mono.just(clientResponse);
});
}
Фильтры выводят информацию об отправке запросов, получении ответов и модифицируют счетчики отправленных запросов и полученных ответов. Перед отправкой запроса, если превышено количество не полученных ответов, процесс “засыпает” на некоторое время.
Настройка среды выполнения
Для того чтобы запустить этот сервис нам нужны: rabbitmq, mongodb и elasticsearch. Всё это проще установить в Docker. Ещё в самом начале проекта установил Docker Desktop и настроил контейнеры для запуска нужных cервисов. Как это делается можно посмотреть, например, в этой статье. По аналогии установил rabbitmq, mongodb. Добавил конфигурационные файлы и внес изменения в файл docker-compose.yml. Получившиеся настройки Docker можно найти в папке проекта docker-elk. Ниже скриншот запущенного контейнера:
Запуск сервиса
Для тестирования загрузил в базу mongodb 1000 документов. Из Postman и отправляю запрос:
В полученном логе видно, что обработка выполняется в разных потоках:
2022-11-02 15:23:17.396 INFO 8336 --- [ Thread-6] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.418 INFO 8336 --- [ Thread-42] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.447 INFO 8336 --- [ Thread-5] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.462 INFO 8336 --- [ Thread-7] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.475 INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:17.477 INFO 8336 --- [ Thread-33] org.mongodb.driver.connection : Opened connection [connectionId{localValue:855, serverValue:83}] to localhost:27017
2022-11-02 15:23:17.484 INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:17.485 INFO 8336 --- [ Thread-5] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.489 INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:17.493 INFO 8336 --- [ Thread-4] org.mongodb.driver.connection : Opened connection [connectionId{localValue:856, serverValue:84}] to localhost:27017
2022-11-02 15:23:17.566 INFO 8336 --- [ Thread-6] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.588 INFO 8336 --- [ Thread-7] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.622 INFO 8336 --- [ Thread-4] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.651 INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:17.651 INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:17.658 INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:17.658 INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:17.673 INFO 8336 --- [ Thread-52] org.mongodb.driver.connection : Opened connection [connectionId{localValue:940, serverValue:85}] to localhost:27017
2022-11-02 15:23:17.676 INFO 8336 --- [ Thread-7] org.mongodb.driver.connection : Opened connection [connectionId{localValue:941, serverValue:86}] to localhost:27017
2022-11-02 15:23:17.715 INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:17.738 INFO 8336 --- [ Thread-5] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.748 INFO 8336 --- [ Thread-34] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.772 INFO 8336 --- [ Thread-7] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.790 INFO 8336 --- [ Thread-34] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.811 INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:17.818 INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:17.849 INFO 8336 --- [ Thread-36] org.mongodb.driver.connection : Opened connection [connectionId{localValue:993, serverValue:87}] to localhost:27017
2022-11-02 15:23:17.851 INFO 8336 --- [ Thread-36] org.mongodb.driver.connection : Opened connection [connectionId{localValue:994, serverValue:88}] to localhost:27017
2022-11-02 15:23:17.895 INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:17.898 INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:17.911 INFO 8336 --- [ Thread-6] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.926 INFO 8336 --- [ Thread-36] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.944 INFO 8336 --- [ Thread-36] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.966 INFO 8336 --- [ Thread-36] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.993 INFO 8336 --- [ Thread-36] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1078, serverValue:90}] to localhost:27017
2022-11-02 15:23:18.002 INFO 8336 --- [ Thread-48] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1079, serverValue:89}] to localhost:27017
2022-11-02 15:23:18.041 INFO 8336 --- [ Thread-34] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:18.044 INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:18.044 INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:18.044 INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:18.044 INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:18.059 INFO 8336 --- [ Thread-31] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:18.076 INFO 8336 --- [ Thread-7] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:18.083 INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:18.096 INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:18.135 INFO 8336 --- [ Thread-34] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1156, serverValue:92}] to localhost:27017
2022-11-02 15:23:18.138 INFO 8336 --- [ Thread-39] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1155, serverValue:91}] to localhost:27017
2022-11-02 15:23:18.140 INFO 8336 --- [ Thread-39] ru.mvz.elasticsearch.service.Indexer : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:18.180 INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:18.180 INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer : Response Status 200 OK
2022-11-02 15:23:18.181 INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer : Start: 2022-11-02 15:23:17.250 End: 2022-11-02 15:23:18.181 read 1000 write 1000
Теперь проверим, что загрузилось.
Запрос к ElasticSearch показывает наличие индекса с 1000 документами:
И попробуем найти что-то в ElasticSearch:
Получен ответ ElasticSearch с найденным документом!
Заключение
В этом материале мне хотелось привести пример сервиса, реализованного с использованием Spring Boot, WebFlux, WebClient, Reactor - надеюсь, что у меня это получилось.
Несколько выводов:
Реактивные фреймворки, в том числе и Reactor, делает за нас существенную часть работы по реализации асинхронных многопоточных алгоритмов, позволяя сосредоточиться на предметной области.
С их помощью можно, достаточно просто, создавать высоконагруженные сервисы.
Для получения максимального эффекта от перехода на реактивные паттерны программирование нужно чтобы вся цепочка вычислений была реактивной, начиная с драйверов доступа к базам данных, очередям, файлам и т.д.
Ещё раз, репозиторий с кодом и настройками находится здесь
Несколько ссылок на используемые материалы:
Шпаргалка по Spring Boot WebClient
Reactive Programming: Reactor и Spring WebFlux — часть 2
Реактивное программирование со Spring, часть 2 Project Reactor