Пример микросервисной архитектуры с Saga на MassTransit

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Привет, Хабр! В общем работаю я значит Архитектором Програмных Решений. Мы тут монолиты на микросервисы переводим поэтому я решил для наших разработчиков написать пример того как работают саги и за одно может оно и вам понадобиться поэтому выложил сюда. Статья будет дополняться по мере поступления вопросов от вас и от наших разработчиков.

Саги используются для описания процесса изменения состояния системы для исполнения, которого нужно сделать запрос более чем в один сетевой сервис. Они используются:

1)      Для обеспечения порядка исполнения. Для систем, работающих на событиях это не гарантировано. Чтобы действие Б выполнилось строго после действия А нужно посылать ответные события на событие А и строго после того как получено это ответное событие запускать Б.

2)      Для компенсации действий. Если действие Б провалилось по какой-то причине, то сага может выполнить компенсирующее запросы для действия А.

3)      Для описания процесса. Если процесс большой и сложный, то его удобно расписать в виде Саги чтобы в одном месте видеть, что происходит. Тут Сага служит как схема BPM описывающая что мы Делам А потом делаем Б если Б успешно, то делаем С иначе делаем D и т. д.

4)      Для гарантированного выполнения каково-то набора действий. Если мы сделали А, то Сага может повторять пытаться выполнить действие Б до фиксированного количества ретраев.

5)      Для обеспечения консистентной системы. Например, когда нужно изменить состояние микросервиса А и микросервиса Б строго в соответствии друг с другом.

Тут небольшое отступление. Обычно в внутри системы между микросервисами используют один формат  (например AMPQ или gRPC) а снаружи могут быть системы использующие SOAP, HTTP и так далее. Для того чтобы изолировать внутренние сервисы от особенностей внешней среды обычно использую два типа сервисов переводчиков. Gateway и Proxy. Их основная задач переводит из одного протокола в другой. Например из HTTP в AMPQ. Отличия в том что Gateway используется для того чтобы переводить и изолировать запросы снаружи внутрь. Например SpaGateway используется для перевода запросов с фронта. Proxy используется для переводов изнутри наружу. Например SmtpServerProxy для перевода запросов от нас к почтовому серверу.

Если каждый микросервис рассматривать как какой-то объект или библиотеку, то для их архитектуры принципы SOLID и другие в этом роде очень даже применимы. В целом Saga исполняет роль «Мозга», у которого есть «Органы» это микросервисы. Ну или можно рассматривать Сагу как начальника у которого есть подчиненные микросервисы. В сложных процессах выделяют главную сагу. Главный процесс и дочерние саги – под процессы. Родительская сага отдает команды дочерним Сагам (под процессы) которые уже в свою очередь отдают команды микросервисам.

Наша сага будет максимально простой. Без компенсирующих действий. Суть ее логики: У нас есть отдельно микросервис предметов из которого можно взять определенное количество предметов или добавить. Есть микросервис денег откуда можно либо взять, либо добавить определенное количество денег. Мы проработаем простой сценарий где сага сначала берет из микросервиса денег определенную сумму (холдирует ее) и потом забирает из микросервиса предметов определенное количество (уменьшает количество на складе) предметов.

Диаграмма размещения:

Диаграмма последовательности:

Схема запросов:

MoneyMicroservice

Для начала необходимые зависимости можно поставить командами:

Install-Package MassTransit.AspNetCore
Install-Package MassTransit.RabbitMQ
Install-Package MassTransit.EntityFramework

Так же необходимо поставить на RabbitMq Delayed Message Exchange

Создаем сообщения запрос и ответ для работы с шиной

//Рекомендуется для контрактов использовать интерфейсы чтобы не было возможности 
//в них прикрутить какую-то логику.
//Хотя уже с новой версией C# где можно реализации и интерфейсам прикреплять 
//подход больше не актуален
public interface IGetMoneyRequest
{
    public Guid OrderId { get; }
}

public interface IGetMoneyResponse
{
    public Guid OrderId { get; }
}

Создаем обработчик этого сообщения

public class GetMoneyConsumer : IConsumer<IGetMoneyRequest>
{
    public Task Consume(ConsumeContext<IGetMoneyRequest> context)
    {
        return context.RespondAsync<IGetMoneyResponse>(new { context.Message.OrderId });
    }
}

Дальше добавляем в стартапе наш обработчик в список обработчиков

builder.Services.AddMassTransit(cfg =>
{
  //Для сообщения AddMoneyRequest будет установлен адрес  add-money-request
    cfg.SetKebabCaseEndpointNameFormatter();
  //Чтобы работала обработка запросов надо поставить расширение на RabbitMq rabbitmq_delayed_message_exchange
    cfg.AddDelayedMessageScheduler();
    //Тут регистрируем наши обработчики сообщений
    cfg.AddConsumer<AddMoneyConsumer>();
    cfg.AddConsumer<GetMoneyConsumer>();
  //Настройка подлючения к RabbitMq
    cfg.UsingRabbitMq((brc, rbfc) =>
    {
      //Использовать паттерн OutBox - либо все сообщений одной пачкой сразу отправляются 
      //либо не будет отправлено ни одно из сообщений. 
      //Это нужно когда вам например нужно послать две команды сразу CreateOrder 
      // и SendEmail только при условии что отправяться оба либо ни одно из них.
        rbfc.UseInMemoryOutbox();
      //Повторные попытки обработать запрос.
        rbfc.UseMessageRetry(r =>
        {
          //Инкрементально повторять 3 раза каждый раз увеличивая между поторами 
          //интервал на 1 секунду. Начать с интервала в 1 секунду.
            r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
        });
      //Использовать отложенные сообщения в том числе с помошью них можно 
      //делать таймауты
        rbfc.UseDelayedMessageScheduler();
        rbfc.Host("localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
      //Записываем какие сообщения мы слушаем. Вызывать этот метод обязательно
      //иначе консумеры не будут реагировать на сообщения.
        rbfc.ConfigureEndpoints(brc);
    });
}) 
    .AddMassTransitHostedService();

ItemsMicroservice

По сути точно такой же как и предыдущий

public interface IGetItemsRequest
{
    public Guid OrderId { get; }
}

public interface IGetItemsResponse
{
    public Guid OrderId { get; }
}

public class GetItemsConsumer : IConsumer<IGetItemsRequest>
{
    public Task Consume(ConsumeContext<IGetItemsRequest> context)
    {
        return context.RespondAsync<IGetItemsResponse>(new { OrderId = context.Message.OrderId });
    }
}
    
builder.Services.AddMassTransit(cfg =>
{
    cfg.SetKebabCaseEndpointNameFormatter();
    cfg.AddDelayedMessageScheduler();
    cfg.AddConsumer<AddItemsConsumer>();
    cfg.AddConsumer<GetItemsConsumer>();
    cfg.UsingRabbitMq((brc, rbfc) =>
    {
        rbfc.UseInMemoryOutbox();
        rbfc.UseMessageRetry(r =>
        {
            r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
        });
        rbfc.UseDelayedMessageScheduler();
        rbfc.Host("localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        rbfc.ConfigureEndpoints(brc);
    });
})
    .AddMassTransitHostedService();

SagasMicroservice

Тут наши саги находятся и они оркестрируют процес покупки придметов.

Сообщений запрос и ответ от саги:

public class BuyItemsRequest
{
    public Guid OrderId { get; set; }
}

public class BuyItemsResponse
{
    public Guid OrderId { get; set; }
    public string ErrorMessage { get; set; }
}

Состояние саги:

public sealed class BuyItemsSagaState : SagaStateMachineInstance
{
		//Идентификатор по которому мы отличаем один процес от другого.
    public Guid CorrelationId { get; set; }
    //Текущее состояние саги ака Failed, GetItemsPending и т.д.
    public string? CurrentState { get; set; }
    //Тут мы сохраняем идентификатор запроса что запустил нашу сагу
    //чтобы ответить на него
    public Guid? RequestId { get; set; }
    //Тут мы сохраняем адресс откуда пришел запрос который запустил нашу сагу
    //чтобы ответить на него
    public Uri? ResponseAddress { get; set; }
}

Сага для процесса покупки предметов:

public sealed class BuyItemsSaga : MassTransitStateMachine<BuyItemsSagaState>
{
    private readonly ILogger<BuyItemsSaga> _logger;

    public BuyItemsSaga(ILogger<BuyItemsSaga> logger)
    {
        _logger = logger;
        //Указываем куда будем записывать текущее состояние саги (Pending,Faulted)
        InstanceState(x => x.CurrentState);
        //Указываем что слушаем событие OrderId у которого равен нашему CorrelationId у саги
        //Либо если нет саги с таким CorrelationId то создаем его с ним.
        Event<BuyItemsRequest>(() => BuyItems, x => x.CorrelateById(y => y.Message.OrderId));
       //Узказываем какие запросы будем делать из саги
       Request(
            () => GetMoney
            );
        Request(
         () => GetItems
         );
        //Узказываем как будем реагировать на сообщения в стартовом стостоянии
        Initially(

            When(BuyItems)
            .Then(x =>
            {
            //Сохраняем идентификатор запроса и его адрес при старте саги чтобы потом на него ответить
                if (!x.TryGetPayload(out SagaConsumeContext<BuyItemsSagaState, BuyItemsRequest> payload))
                    throw new Exception("Unable to retrieve required payload for callback data.");
                x.Saga.RequestId = payload.RequestId;
                x.Saga.ResponseAddress = payload.ResponseAddress;
            })
            //Соверщаем запрос к микросевису MoneyMicroservice
            .Request(GetMoney, x => x.Init<IGetMoneyRequest>(new { OrderId = x.Data.OrderId }))
           //Переводим сагу в состояние GetMoney.Pending
           .TransitionTo(GetMoney.Pending)

            );

        //Описываем то как наша сага будет реагировать на сообщения находясь в 
        //состоянии GetMoney.Pending
        During(GetMoney.Pending,
            //Когда приходи сообщение что запрос прошел успешно делаем новый запрос
            //теперь уже в микросервис ItemsMicroservice
            When(GetMoney.Completed)
            .Request(GetItems, x => x.Init<IGetItemsRequest>(new { OrderId = x.Data.OrderId }))
            .TransitionTo(GetItems.Pending),
            //При ошибке отвечем тому кто иницировал запрос сообщением с текстом ошибки
            When(GetMoney.Faulted)
              .ThenAsync(async context =>
              { 
                //Тут можно сделать какие-то компенсирующие действия. 
               //Например вернуть деньги куда-то на счет.
                  await RespondFromSaga(context, "Faulted On Get Money " + string.Join("; ", context.Data.Exceptions.Select(x => x.Message)));
              })
            .TransitionTo(Failed),
            //При таймате отвечаем с сообщением что произошел таймаут
            When(GetMoney.TimeoutExpired)
               .ThenAsync(async context =>
               {
                   await RespondFromSaga(context, "Timeout Expired On Get Money");
               })
            .TransitionTo(Failed)

             );

        During(GetItems.Pending,
            //При успешном ответе от микросервиса предметов 
            //отвечаем без ошибки и переводим сагу в финальное состояние.
            When(GetItems.Completed)
              .ThenAsync(async context =>
              {
                  await RespondFromSaga(context, null);
              })
            .Finalize(),

            When(GetItems.Faulted)
              .ThenAsync(async context =>
              {
                   //Тут можно сделать какие-то компенсирующие действия. 
                  //Например вернуть деньги куда-то на счет.
                  await RespondFromSaga(context, "Faulted On Get Items " + string.Join("; ", context.Data.Exceptions.Select(x => x.Message)));
              })
            .TransitionTo(Failed),

            When(GetItems.TimeoutExpired)
               .ThenAsync(async context =>
               {
                   await RespondFromSaga(context, "Timeout Expired On Get Items");
               })
            .TransitionTo(Failed)

            );
    }
    //Запрос на получение денег
    public Request<BuyItemsSagaState, IGetMoneyRequest, IGetMoneyResponse> GetMoney { get; set; }
    //Запрос на получение предметов
    public Request<BuyItemsSagaState, IGetItemsRequest, IGetItemsResponse> GetItems { get; set; }
   //Событие стартующее нашу сагу.
   public Event<BuyItemsRequest> BuyItems { get; set; }
   //Одно из наших кастомных состояний в которое может перейти сага
    public State Failed { get; set; }
    //Метод для ответного сообщения
    //Тут нужно явно использовать ResponseAddress и RequestId 
    //сохраненные ранее чтобы ответить ровно тому кто сделал запрос
    private static async Task RespondFromSaga<T>(BehaviorContext<BuyItemsSagaState, T> context, string error) where T : class
    {
        var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
        await endpoint.Send(new BuyItemsResponse
        {
            OrderId = context.Saga.CorrelationId,
            ErrorMessage = error
        }, r => r.RequestId = context.Saga.RequestId);
    }
}

Регистрируем сагу в стартапе

builder.Services.AddMassTransit(cfg =>
{
    cfg.SetKebabCaseEndpointNameFormatter();
    cfg.AddDelayedMessageScheduler();
    //Тут добляем сагу с указанием что будем сохранять ее в БД 
    //с помошью EF и будем использовать пессеместичный режим конкуренции за ресурсы
    cfg.AddSagaStateMachine<BuyItemsSaga, BuyItemsSagaState>()
    .EntityFrameworkRepository(r =>
    {
        r.ConcurrencyMode = ConcurrencyMode.Pessimistic;
        r.ExistingDbContext<SagasDbContext>();
        r.LockStatementProvider = new PostgresLockStatementProvider();
    });
    cfg.UsingRabbitMq((brc, rbfc) =>
    {
        rbfc.UseInMemoryOutbox();
        rbfc.UseMessageRetry(r =>
        {
            r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
        });
        rbfc.UseDelayedMessageScheduler();
        rbfc.Host("localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        rbfc.ConfigureEndpoints(brc);
    });
});

ApiGateway

Его работа это просто перевести http запрос в ampq запрос в шину. Дождаться ответа и вернуть ответ ползователю (фронтенду)

public class BuyItemsRequstModel
{
    public Guid OrderId { get; set; }
}

[ApiController]
[Route("api/v1/items")]
public class ItemsController : ControllerBase
{
    //Интерфейс MassTransit через который идет работа с сообщениями
    private readonly IBus _bus;
    private readonly ILogger<ItemsController> logger;

    public ItemsController(IBus bus, ILogger<ItemsController> logger)
    {
        _bus = bus;
        this.logger = logger;
    }

    [HttpPost("buy")]
    public async Task<BuyItemsResponse> BuyAsync(BuyItemsRequstModel model)
    {
       //Делаем запрос в шину и ждем ответа от саги. 
       //Ответ прийдет из RabbitMq или словим ошибку таймаута запроса
        logger.LogInformation("Start!");
        var response = await _bus.Request<BuyItemsRequest, BuyItemsResponse>(model);
        logger.LogInformation("End!");
        //Возвращаем сообщение что было в ответе
        return response.Message;
    }
}

Так же можно все делать просто на событиях без Request/Response только это потребует больше кода и можно их через SignalR гонять на фронт.

Исходники

Microservices With Sagas

Источник: https://habr.com/ru/post/664962/


Интересные статьи

Интересные статьи

«За всю историю человечества было выдано 50 млн. патентов. Задача — сделать 1 млрд. новых изобретений». @MagisterLudiКаждый землянин, даже совсем не научно-технический революционер или патентный военн...
 В JavaScript есть два основных способа обработки асинхронного кода: Promise (ES6) и async / await (ES7). Эти синтаксисы дают нам равные базовые функции, но по-разному влияют на читаемость и обла...
«Скажи мне, Рождённый Женщиной, — вопросил Кришна, Куда движутся эти миры, Зачем злой Парвана по ночам охотится за своей второй сущностью, И почему у ласточки Бшакти две ноги, а у Меня дв...
В этой статье мы рассмотрим, как система управления 1С-Битрикс справляется с большими нагрузками. Данный вопрос особенно актуален сегодня, когда электронная торговля начинает конкурировать по обороту ...
Теория архитектуры включает в себя множество аспектов и по своей сути является сплавом многочисленных художественных, социальных и психологических техник. Однако, вне зависимости от архитекту...