[sobjectizer] Синхронное общение с агентами в реальном проекте

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Давненько мы ничего не писали про SObjectizer. Надо бы исправить это упущение, тем более, что представился достойный повод. Ну а чтобы было интереснее, в этот раз поговорим не об абстрактных фичах в вакууме, а о примерах использования вполне себе конкретной функциональности в коде реального проекта.

Рассказать о том, что именно это за проект мы не можем, т.к. это закрытая заказная разработка. Но о том, как SObjectizer применяется мы все-таки попробуем поговорить. Естественно, речь пойдет не обо всем, а об одном моменте, которые запомнился лично мне.

Использование синхронного взаимодействия

Сама суть SObjectizer-а -- это обмен асинхронными сообщениями. Поэтому в 99% случаев общение между сущностями происходит именно посредством асинхронного взаимодействия. Но иногда асинхронщина создает больше проблем, чем решает. В этих редких ситуациях на помощь приходят синхронные запросы...

Несколько общих слов о причинах использования синхронности

Проект состоит из нескольких взаимодействующих друг с другом процессов. Что-то в этих процессах реализовано на базе SObjectizer-а, что-то нет. В каких-то процессах основная бизнес-логика реализуется SObjectizer-составляющей, в каких-то, например, в GUI-процессе, прикладная часть кода не завязана на SObjectizer и с помощью агентов делаются только какие-то вспомогательные и/или инфраструктурные задачи.

Поэтому некоторые повторно-используемые куски кода оформлены в виде библиотек, публичная часть которых представляет из себя набор обычных C++ных классов и функций, а уже под капотом живут SObjectizer-овские диспетчеры и кооперации агентов.

Это позволяет в не-SObjectizer-части создать экземпляр некоторого класса (скажем, реализацию интерфейса shmem_ipc::consumer::abstract_consumer_t, отвечающую за взаимодействие с другими процессами) и вызвать у него обычный метод не подозревая о том, в какую машинерию из агентов это все превращается в SObjectizer-части.

При этом операции, которые в не-SObjectizer-части должны выглядеть как синхронные, например:

auto consumer = shmem_ipc::consumer::make_consumer(
    env, consumer_params, notification_mbox);
...
consumer->add_producer(shmem_ipc::area_name_t{"data_provider_1"});
consumer->add_producer(shmem_ipc::area_name_t{"data_provider_2"});

на самом деле требует кучи асинхронных операций в SObjectizer-части. И вот для того, чтобы скрыть всю эту асинхронность от не-SObjectizer-части и применяется so_5::extra::sync.

Пример простого синхронного запроса и ответа на него

Рассмотрим упомянутую выше операцию make_consumer. Эта операция должна создать специальный диспетчер и привязать к созданному диспетчеру нового агента под названием consumer_controller. Агент consumer_controller должен стартовать и успешно выполнить свою инициализацию на контексте этого специального диспетчера. У consumer_controller метод so_evt_start отрабатывает на отдельной рабочей нити, поэтому вызов make_consumer должен дождаться пока consumer_controller начнет свою работу.

Реализовано это через отправку агенту consumer_controller синхронного запроса init_status_request, а именно:

  • внутри make_consumer создается экземпляр диспетчера;

  • регистрируется кооперация с агентом consumer_controller;

  • агенту consumer_controller отсылается синхронный запрос init_status_request и make_consumer блокируется до получения ответа init_status_reply.

Если агент consumer_controller стартовал успешно, то он отвечает на init_status_request посредством сообщения init_status_reply. Если же consumer_controller не смог стартовать, то ожидание ответного init_status_reply внутри make_consumer будет прервано исключением.

Более сложный пример: нотификации о дерегистрации кооперации и складирование синхронных запросов

Суть проблемы

Рассмотрим еще несколько операций, в которых участвует упомянутый выше агент consumer_controller:

// Начать получать данные от внешнего процесса data_provider_1.
consumer->add_producer(shmem_ipc::area_name_t{"data_provider_1"});
...
// Прекратить получение данных от внешнего процесса data_provider_1.
consumer->remove_producer(shmem_ipc::area_name_t{"data_provider_1"});

Для того, чтобы получать подлежащие обработке данные нужно создать подключение к внешнему процессу и начать двусторонний обмен пакетами через установленное подключение. Если подключение некоторое время не активно, то следует проверить его жизнеспособность посредством ping-ов. Если подключение "умерло", то нужно начать попытки восстановить его.

Один consumer_controller может обслуживать сразу несколько подключений к разным внешним процессам. Для этого под каждое подключение consumer_controller создает отдельную кооперацию с агентом connection. Т.е. каждый вызов:

consumer->add_producer(...);

прячет за собой регистрацию новой кооперации. При этом создание новой кооперации происходит асинхронно, но вызов add_producer оформлен как синхронный: возврат из add_producer должен произойти только после того, как новая кооперация запущена. Для этого также применяется синхронный запрос к агенту consumer_controller.

Синхронным является и вызов:

consumer->remove_producer(...);

Но с вызовом remove_producer ситуация гораздо интереснее, чем с add_producer.

Во-первых, прекращение взаимодействия с внешним процессом -- это дерегистрация кооперации, которая отвечала за обслуживание подключения к этому процессу. А дерегистрация кооперации -- это непростая штука, которая может занять непонятно сколько времени.

Когда SObjectizer инициирует дерегистрацию кооперации, агенты из этой кооперации еще некоторое время продолжают свою обычную работу. SObjectizer дает им возможность завершить обработку тех сообщений, которые успели попасть в очередь к агентам, входящим в дерегистрируемую кооперацию. Лишь когда все агенты кооперации обработают все доставшиеся им сообщения, SObjectizer уничтожит кооперацию. Т.е. рано или поздно кооперация будет изъята из SObjectizer Environment, но когда именно это произойдет заранее неизвестно.

Т.е. вызов remove_producer должен заснуть на непонятно какое время. При этом, что важно, сам consumer_controller, инициировав асинхронную дерегистрацию кооперации, должен продолжить свою обычную работу. А потом consumer_controller как-то должен узнать, что процесс дерегистрации полностью завершен и что пора будить заснувшего remove_producer.

Во-вторых, вполне может случиться так, что на разных рабочих нитях процесса будет сделан один и тот же вызов remove_producer, например:

consumer->remove_producer(shmem_ipc::area_name_t{"data_provider_1"});

Получится, что сразу несколько нитей будут заблокированы на remove_producer в ожидании, когда же кооперация, отвечающая за общение с data_provider_1, будет уничтожена. И consumer_controller должен как-то разбудить все эти заснувшие рабочие нити.

Суть решения

Решение задачи с обслуживанием remove_producer построено на использовании двух фич: нотификаторы дерегистрации кооперации и синхронных запросов.

Сперва про нотификаторы дерегистрации.

В SObjectizer-е при регистрации кооперации можно связать с кооперацией объекты-нотификаторы двух типов. Первый тип -- это нотификаторы завершения регистрации. SObjectizer дергает эти нотификаторы когда процесс регистрации кооперации успешно завершается.

Второй тип -- это нотификаторы завершения дерегистрации. SObjectizer дергает эти нотификаторы когда кооперация завершила свою работу и уже полностью вычеркнута из SObjectizer Environment.

Нотификаторы дерегистрации могут использоваться для разных целей. Например, их можно применять как своего рода супервизоров: если кооперация была дерегистрирована из-за какой-то проблемы, то нотификатор может зарегистрировать кооперацию заново (есть даже штатный пример, демонстрирующий этот сценарий). Но обычно нотификаторы отсылают кому-то сообщение о том, что кооперация прекратила свое существование.

Вот и в случае с remove_producer для кооперации с агентом connection устанавливается нотификатор дерегистрации, который уведомляет consumer_controller о том, что кооперации больше нет.

Теперь про синхронный запрос, посредством которого инициируется удаление подключения к внешнему процессу.

Внутри remove_producer агенту consumer_controller отсылается запрос remove_producer_request.

Этот запрос прилетает к consumer_controller в виде обычного асинхронного сообщения. С которым можно делать что угодно. Например, сохранить до поры до времени в каком-то контейнере.

Что, собственно, consumer_controller и делает.

Когда к consumer_controller прилетает сообщение с запросом remove_producer_request, то consumer_controller инициирует дерегистрацию нужной кооперации (если она еще жива, конечно же), после чего сохраняет remove_producer_request у себя в отдельном ассоциативном контейнере.

А вот когда от нотификатора дерегистрации прилетает уведомление о том, что кооперация полностью дерегистрирована, тогда consumer_controller извлекает из своего ассоциативного контейнера все сообщения-запросы remove_producer_request, относящиеся к данной кооперации, и отвечает на эти ждущие завершения запросы.

Получив ответы от consumer_controller заснувшие вызовы remove_producer просыпаются и возвращают управление вызвавшему их коду.

Вот и все, мы получили нужный нам результат.

Почему запомнилось именно использование синхронного взаимодействия?

Потому что SObjectizer появился на свет без поддержки синхронности и добавление синхронности в SObjectizer происходило не быстро и не просто.

На протяжении многих лет моя принципиальная позиция состояла в том, что синхронности в SObjectizer не место.

Но, как говорится, время не щадит ни колес, ни дорог. Червь сомнения и многочисленные вопросы по поводу отсутствия синхронности подточили мои уверенность в том, что SObjectizer -- это асинхронность, только асинхронность и ничего кроме асинхронности.

Так что в конце-концов синхронное взаимодействие между агентами было добавлено в SO-5.3. Правда, выглядело оно страшно как сам черт. Несколько поправить это мы смогли лишь когда перестали оглядываться на компиляторы без полноценной поддержки variadic templates и накопили некоторый опыт работы с синхронными запросами.

В итоге, в SO-5.5 синхронное взаимодействие постепенно "причесали". Но остался принципиальный просчет в тогдашнем подходе к реализации синхронных запросов: получив синхронный запрос агент должен был ответить на него возвратом значения из обработчика события. Т.е. было что-то вроде:

reply_type some_agent::evt_handler(mhood_t<request_type> cmd) {
  ... // Какая-то обработка содержимого cmd.
  return { ... }; // Вот это значение и будет автоматически возвращено
                  // SObjectizer-ом как результат обработки запроса.
                  // Никаких дополнительных действий после возврата
                  // значения агент уже не может сделать.
}

Такой подход породил массу сложностей в реализации и развитии потрохов SObjectizer-а. Но, главное, он предполагал, что запрос будет полностью обработан в единственном event-handler-а агента получателя. Нельзя было ни отложить обработку запроса, ни делегировать эту обработку какому-то другому агенту.

Этот просчет был учтен в 2019-ом, когда мы решились нарушить совместимость и вместо продолжения развития ветки 5.5 открыли новую ветку 5.6. Начиная с SObjectizer-5.6 обработка синхронного запроса выглядит иначе:

void some_agent::evt_handler(
  so_5::extra::sync::request_mhood_t<request_type, reply_type> cmd)
{
  ... // Какая-то обработка содержимого cmd.
  // Отсылка ответа в виде асинхронного сообщения.
  cmd->make_reply(...);
  ... // Можем выполнить еще какие-то действия.
}

Теперь полученные синхронные запросы можно сохранять и/или делегировать их выполнение кому-то еще. Что и позволило воплотить в жизнь решение, которое было описано выше.

Вместо заключения или зачем была написана эта статья

Позволю себе быть откровенным. Главная цель данной статьи в том, чтобы напомнить о существовании такого проекта, как SObjectizer. Все-таки живых и все еще развивающихся оперсорсных акторных фреймворков для C++ не так уж и много. А SObjectizer как раз все еще жив и все еще развивается, не смотря ни на что. Как раз недавно удалось выкатить свежие версии SObjectizer-5.7.3 и so5extra-1.5.0.

После весьма непростых 2020 и 2021 годов нам таки удалось выкроить время, чтобы добавить в SObjectizer еще несколько новых фич. Что выглядит как хороший повод рассказать о нашей разработке еще раз.

Ну а если еще и информация в статье для кого-то оказалась полезной, то тогда вообще хорошо.

Посему большое спасибо всем, кто дочитал статью. И слова искренней признательности всем тем, кто решился на использование SObjectizer в своих проектах.

Источник: https://habr.com/ru/post/600173/


Интересные статьи

Интересные статьи

Я работаю инженером в Call-центре. В круг моих обязанностей входит повышение эффективности прозвона call-листов. В этой статье речь идет о некоторых функциональных возможностях специального программно...
ВведениеВ данной статье я бы хотел рассмотреть проблему обновления PHP в виртуальной машине BitrixVM, и действия, которые возможно применить если выполнение переезда на машину с обновленным ПО невозмо...
Или как добиться большего на следующем вашем хакатоне Некоторое время назад я принимал участие в ежегодном игровом мероприятии Itch.io Game Off 2020, участники которого за ноябрь делали ...
В 1С-Битрикс: Управление сайтом (как и в Битрикс24) десятки, если не сотни настраиваемых типов данных (или сущностей): инфоблоки, пользователи, заказы, склады, форумы, блоги и т.д. Стр...
Привет! Мы продолжаем наш митап-тур по Санкт-Петербургу. В следующий четверг, 5 сентября, будет митап для тестировщиков. Как всегда, бесплатный, и как всегда — сначала надо зарегистрироваться....