Создание фоновых задач в .NET с запросом состояния запущенного таска

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Допустим, необходимо запустить множество фоновых задач в .Net с возможностью в дальнейшем обратиться к их состоянию. Обращение к членам класса запущенной задачи может быть полезно, если необходимо однозначно определить текущее состояние объекта в момент выполнения.

Одно из частных решений — модель акторов

Модель акторов позволяет запускать фоновые задачи и обращаться к их текущему состоянию.

Интерфейсы

Определим интерфейс, который опишет выполняемую работу — IJob, с обязательной передачей в метод DoAsync - CancellationToken token.

public interface IJob<in TIn, out TOut>
    where TIn : IJobInput
    where TOut : IJobResult  
{
    Task<bool> DoAsync(TIn input, CancellationToken token);

    /// <summary>
    /// Describes the state of the fields of the IJob class that are changed by the DoAsync method.
    /// </summary>
    TOut GetCurrentState(Guid jobId);
}

Интерфейс джоба - IJob определен, но вызов его имплементации напрямую не даст искомых результатов, поэтому необходимо определить контекст выполнения джобов IJobContext, который объединит группу выполняемых однотипных задач. Здесь необходимо предоставить возможность не только запускать фоновые таски CreateJobAsync, но и ожидать полного выполнения задачи DoJobAsync.

public interface IJobContext<in TIn, TOut>
    where TIn : IJobInput
    where TOut : IJobResult
{

    /// <summary>
    /// Create a background job
    /// </summary>
    /// <returns>Job Id</returns>
    Task<JobCreatedCommandResult> CreateJobAsync(TIn input,
        int? maxNrOfRetries = null,
        TimeSpan? minBackoff = null,
        TimeSpan? maxBackoff = null,  
        Guid? jobId = null,
        TimeSpan? timeout = null);

    /// <summary>
    /// Waiting for a response about the completion of the job
    /// </summary>
    Task<JobDoneCommandResult> DoJobAsync(TIn input,
        int? maxNrOfRetries = null, TimeSpan? minBackoff = null, TimeSpan? maxBackoff = null,  Guid? jobId = null, TimeSpan? timeout = null);

    Task<StopJobCommandResult> StopJobAsync(Guid jobId, TimeSpan? timeout = null);

    Task<IDictionary<Guid, ReplyWorkerInfo<TOut>>> GetAllJobsCurrentStatesAsync(long requestId, TimeSpan? timeout = null);
}

Реализация

Основной идеей модели акторов является изоляция состояния каждого объекта модели от внешнего вмешательства, что должно гарантировать предсказуемое изменение своего состояния. Общение между акторами происходит через очереди сообщений, в которые для каждого актора упорядочивают все входящие запросы, что дает последовательность выполнения команд и также позволяет избежать побочных эффектов влияния на члены класса актора.

Общая схема запросов будет выглядеть как MasterActor→ GroupActor → ManagerActor → WorkerActor

MasterActor

Определим точку входа в систему акторов — это будет MasterActor.

Его задачами является доставка сообщений до своих дочерних акторов и определение стратегии перезапуска подчиненных элементов в случае ошибки.

Для отправки сообщений из внешних источников, которые не находятся в системе акторов ActorSystem (здесь это JobContext) используется команда ожидания ответа - Ask.

await _masterActor.Ask<JobCreatedCommandResult>(command, currentTimeout);

Ask ожидает ответа от модели акторов JobCreatedCommandResult, для заданного промежутка времени currentTimeout. Где команда имеет тип DoJobCommand и принимается MasterActor методом Receive, заданном в конструкторе.

  • DoJobCommandHandler - метод, который передаст сообщение существующему групповому актору или создаст его новый экземпляр в случае отсутсвия с созранением ссылки IActorRef на него.

  • StopJobCommandHandler - метод вызываемый в момент запроса на остановку воркера.

  • RequestAllWorkersInfoQueryHandler - хендлер запросов состояния группы акторов определенного типа IJob

internal class MasterActor<TIn, TOut> : ReceiveActor
    where TIn : IJobInput
    where TOut : IJobResult
{

    private readonly Dictionary<string, IActorRef> _groupIdToActor = new();
    private readonly Dictionary<IActorRef, string> _actorToGroupId = new();

    public MasterActor()
    {
        //Commands
        Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
        Receive<StopJobCommand>(StopJobCommandHandler);

        //Queries
        Receive<RequestAllWorkersInfo>(RequestAllWorkersInfoQueryHandler);

    }


    private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
    {

        if (_groupIdToActor.TryGetValue(doJobCommand.GroupName, out var actorRef))
        {
            actorRef.Forward(doJobCommand);
            return;
        }

        var groupActorProps = DependencyResolver
            .For(Context.System)
            .Props<GroupActor<TIn,TOut>>();

        var groupActor = Context
            .ActorOf(groupActorProps, $"group-{doJobCommand.GroupName}");

        Context.Watch(groupActor);

        groupActor.Forward(doJobCommand);

        _groupIdToActor.Add(doJobCommand.GroupName, groupActor);
        _actorToGroupId.Add(groupActor, doJobCommand.GroupName);

    }

    private void StopJobCommandHandler(StopJobCommand command)
    {
        if (!_groupIdToActor.ContainsKey(command.GroupName))
        {
            Sender.Tell(new StopJobCommandResult(false, $"Group Actor list does not contain {command.GroupName}"));
            return;
        }

        _groupIdToActor[command.GroupName].Forward(command);
    }
...

GroupActor

DoJobCommandHandler должен создать дочерний актор GroupActor, который опишет группу выполняемых задач. Его функция проста, он должен передавать сообщения управленцам рабочих и сохранять ссылки на дочерние акторы для предоставления в дальнейшем возможности обратиться к их состоянию.

  • TrySaveWorkerActorRefCommand - сообщение которое передается от workerActor для сохранения ссылки IActorRef в групповом акторе.

  • ManagerActorTerminatedHandler - вызывается после остановки ManagerActor и удаляет ссылки на объекты IActorRef для ManagerActor и WorkerActor.

internal class GroupActor<TIn, TOut> : ReceiveActor
    where TIn : IJobInput
    where TOut : IJobResult
{

    private string? _groupId;

    private readonly Dictionary<Guid, IActorRef> _idToManagerActor = new();
    private readonly Dictionary<IActorRef, Guid> _managerActorToId = new();

    private readonly Dictionary<IActorRef, Guid> _workerActorToId = new();
    private readonly Dictionary<Guid, IActorRef> _idToWorkerActor = new();

 
    public GroupActor()
    {
        //Commands
        Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
        Receive<StopJobCommand>(StopJobCommandHandler);

        //Queries
        Receive<RequestAllWorkersInfo>(RequestAllWorkersInfoQueryHandler);

        //Internal
        Receive<TrySaveWorkerActorRefCommand>(TrySaveWorkerActorRefCommandHandler);
        Receive<Terminated>(ManagerActorTerminatedHandler);

    }

    private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
    {
        if (_groupId != null && doJobCommand.GroupName != _groupId)
        {
            var message = "Ignoring Create Worker Actor";
            Sender.Tell(doJobCommand.IsCreateCommand
                    ? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
                    : new JobDoneCommandResult(false, message, doJobCommand.JobId));
            return;
        }

        if (_idToManagerActor.ContainsKey(doJobCommand.JobId))
        {
            var message = $"{doJobCommand.JobId} Actor Exists.";
            Sender.Tell(doJobCommand.IsCreateCommand
                ? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
                : new JobDoneCommandResult(false, message, doJobCommand.JobId));
            return;
        }

        _groupId ??= doJobCommand.GroupName;

        var managerActorProps = DependencyResolver
            .For(Context.System)
            .Props<ManagerActor<TIn,TOut>>();
        
        var managerActor = Context.ActorOf(managerActorProps,
            $"manager-{doJobCommand.JobId}");

        Context.Watch(managerActor);

        _idToManagerActor.Add(doJobCommand.JobId, managerActor);
        _managerActorToId.Add(managerActor, doJobCommand.JobId);
        managerActor.Forward(doJobCommand);
    }

    private void ManagerActorTerminatedHandler(Terminated t)
    {
        var workerId = _managerActorToId[t.ActorRef];
        _managerActorToId.Remove(t.ActorRef);
        _idToManagerActor.Remove(workerId);

        if (!_idToWorkerActor.TryGetValue(workerId, out var workerActorRef))
            return;
        _workerActorToId.Remove(workerActorRef);
        _idToWorkerActor.Remove(workerId);
    }
...

ManagerActor

С поведением ManagerActor дела обстоят чуть сложнее, ему необходимо определить стратегию поведения при возникновении ошибок у рабочего WorkerActor, и сохранить начальный запрос «WorkerDoJobCommand» для повторных отправок этого запроса в моменты перезапуска.

internal class ManagerActor<TIn, TOut> : ReceiveActor
    where TIn : IJobInput
    where TOut : IJobResult

{
    //Параметры стратегии перезапуска
    private int _currentNrOfRetries;
    private int _maxNrOfRetries;
    private TimeSpan _minBackoff;
    private TimeSpan _maxBackoff;

    //Ссылка на отправитель команды на выполнение работы
    private IActorRef? _doJobCommandSender;
    //BackoffSupervisor
    private IActorRef? _workerSupervisorActor;
    //Команда для дочернего актора для повторной отправки в момент рестартов
    private WorkerDoJobCommand<TIn>? _doJobCommand;

    private Guid _jobId;
    private bool _startedFlag;

    //StopJobCommandHandler переданный от JobContext вызовет отмену задачи
    private readonly CancellationTokenSource _cancellationTokenSource = new ();

    private readonly ILogger<ManagerActor<TIn, TOut>> _logger;

    public ManagerActor(ILogger<ManagerActor<TIn, TOut>> logger)
    {
        _logger = logger;

        //Commands

        Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
        Receive<StopJobCommand>(StopJobCommandHandler);

        //Queries

        Receive<ReadWorkerInfoCommand>(ReadWorkerInfoCommandHandler);

        //Internal
        Receive<TrySaveWorkerActorRefCommand>(TrySaveWorkerActorRefCommandHandler);
        Receive<GiveMeWorkerDoJobCommand>(GiveMeWorkerDoJobCommandHandler);
        Receive<Terminated>(WorkerActorTerminatedHandler);

    }

 ...
  • DoJobCommandHandler - только создает воркера без отправки сообщения на выполнение команды, и задает ему стратегию перезапуска, которая определена «BackoffSupervisor» в момент создания WorkerActor. Перезапуск воркера будет произведен если возникнет ошибка в самом воркере и будет выполнен количество равное maxNrOfRetries раз в интервале от minBackoff до maxBackoff. Интервал необходим для того, чтобы повторные попытки выполнить работу в случае подключения к внешним ресурсам равномерно распределили запросы в заданный промежуток времени.

  • GiveMeWorkerDoJobCommandHandler - обработчик сообщения GiveMeWorkerDoJobCommand, которое присылает дочеркий воркер актор в момент инициализации своего состояния - это необходимо для перезапусков, так как если отправить сообщение в самом DoJobCommandHandler, который выполняется один раз, то в момент падения воркер актора его работа не будет начата заново.

    private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
    {
        if (_workerSupervisorActor != null)
        {
            var message = "Ignoring Create Worker Actor";
            Sender.Tell(doJobCommand.IsCreateCommand
                ? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
                : new JobDoneCommandResult(false, message, doJobCommand.JobId));
            return;
        }


        _jobId = doJobCommand.JobId;
        _maxNrOfRetries = doJobCommand.MaxNrOfRetries;
        _minBackoff = doJobCommand.MinBackoff;
        _maxBackoff = doJobCommand.MaxBackoff;
        _doJobCommandSender = Sender;

        var workerActorProps = DependencyResolver
            .For(Context.System)
            .Props<WorkerActor<TIn,TOut>>();

        var supervisorOfWorkerActorProps = BackoffSupervisor.Props(
            Backoff.OnFailure(
                    workerActorProps,
                    childName: $"worker-{doJobCommand.JobId}",
                    minBackoff: _minBackoff,
                    maxBackoff: _maxBackoff,
                    randomFactor: 0.2,
                    maxNrOfRetries: _maxNrOfRetries)
                .WithSupervisorStrategy(new OneForOneStrategy(exception =>
                {
                    if (exception is TaskCanceledException  
                        || exception.InnerException is TaskCanceledException
                        || _currentNrOfRetries >= _maxNrOfRetries)
                    {
                        var text = $"BackoffSupervisor: jobId: {_jobId}" +
                                   $" {exception?.Message}" +
                                   $" InnerException: {exception?.InnerException?.Message}";
                        _logger.LogError(text);
                        _doJobCommandSender.Tell(new JobDoneCommandResult(false, text, _jobId));
                        return Directive.Stop;
                    }
                  
                    _currentNrOfRetries += 1;
                    return Directive.Restart;
                })));

        _workerSupervisorActor = Context
            .ActorOf(supervisorOfWorkerActorProps, $"supervisor-of-worker-{doJobCommand.JobId}");

        Context.Watch(_workerSupervisorActor);
      
        _doJobCommand = new WorkerDoJobCommand<TIn>(
            doJobCommand.JobInput,
            _doJobCommandSender,  
            doJobCommand.JobId,
            _cancellationTokenSource,
            doJobCommand.IsCreateCommand);
    }
...

WorkerActor

Далее команда выполнения попадает к рабочему WorkerActor, который отправит сообщения о создании работы JobCreatedCommandResult или о её завершении JobDoneCommandResult в зависимости от изначально выбранного метода в IJobContext CreateJobAsync или DoJobAsync соответственно.

По завершении работы WorkerActor положет себе за щеку смертельную пилюлю _self.Tell(PoisonPill.Instance), что приведет к его остановке и очистке ресурсов.

internal class WorkerActor<TIn, TOut> : ReceiveActor
    where TIn : IJobInput
    where TOut : IJobResult
{
    private Guid _jobId;

    private readonly IServiceScope _scope;
    private readonly IActorRef _self;
    private IJob<TIn, TOut> _job;

 
    public WorkerActor(IServiceProvider serviceProvider)
    {
        _self = Self;
        _scope = serviceProvider.CreateScope();

        //Commands
        Receive<WorkerDoJobCommand<TIn>>((msg) =>
        {
            WorkerDoJobCommandHandlerAsync(msg).PipeTo(_self);
        });

        //Queries
        Receive<ReadWorkerInfoCommand>(ReadWorkerInfoCommandHandler);

        //Internal
        Receive<Status.Failure>(Failed);
        Context.Parent.Tell(new GiveMeWorkerDoJobCommand());

    }

    private async Task WorkerDoJobCommandHandlerAsync(WorkerDoJobCommand<TIn> command)
    {
        _jobId = command.JobId;

        Context.Parent.Tell(new TrySaveWorkerActorRefCommand(_self, _jobId, command.DoJobCommandSender));

        if(command.IsCreateCommand)
            command.DoJobCommandSender.Tell(new JobCreatedCommandResult(true, "", _jobId));

        var token = command.CancellationTokenSource.Token;
        var jobResult = await _job.DoAsync(command.JobInput, token);

        if(token.IsCancellationRequested)
        {
            if(!command.IsCreateCommand)
                command.DoJobCommandSender.Tell(new JobDoneCommandResult(false,  
                    "Job was cancelled.",  
                    command.JobId));
            return;
        }

        if(!command.IsCreateCommand)
            command.DoJobCommandSender.Tell(new JobDoneCommandResult(jobResult, "Ok", command.JobId));

        _self.Tell(PoisonPill.Instance);
    }

Возможность отмены задач

Предусмотрена возможность отмены задачи. ManagerActor в момент получения команды StopJobCommand на остановку, вызывает отмену своего _cancellationTokenSource.Cancel(), кансел токен которого был прокинут в дочерний WorkerActor, и отправку ему смертельной пилюли _workerSupervisorActor.Tell(PoisonPill.Instance).

   private void StopJobCommandHandler(StopJobCommand _)
    {
        if (!_cancellationTokenSource.IsCancellationRequested)
        {
            _cancellationTokenSource.Cancel();
            _workerSupervisorActor.Tell(PoisonPill.Instance);
            Sender.Tell(new StopJobCommandResult(true, "Ok"));
            return;
        }
        Sender.Tell(new StopJobCommandResult(false, "Cancellation Requested Already."));
    }

DI - Microsoft.Extensions.DependencyInjection

В WorkerActor инъектирован интерфейс IServiceProvider serviceProvider, что позволяет управлять временем жизни скоупа _scope от момента инициализации воркера до момента остановки актора и очистки ресурсов. Для джоба пользователя скоуп - это все, что находится внутри метода IJob.DoAsync. Вызов _scope.Dispose() также вызовет методы Dispose, если в имплементации IJob будет также унаследован IDispoosable интерфейс.

    protected override void PreStart()
    {
        _job = _scope.ServiceProvider.GetService<IJob<TIn, TOut>>();
    }
    protected override void PostStop()
    {
        _scope.Dispose();
    }

Запросы

JobContext позволяет запросить состояние акторов, которые были созданы для определенного типа IJob.


    public async Task<IDictionary<Guid, ReplyWorkerInfo<TOut>>> GetAllJobsCurrentStatesAsync(long requestId,
        TimeSpan? timeout = null)
    {
        var currentTimeout = timeout ?? _defaultTimeout;
        var query = new RequestAllWorkersInfo(requestId, GetGroupName(), currentTimeout);
        RespondAllWorkersInfo<TOut> info = await _masterActor
            .Ask<RespondAllWorkersInfo<TOut>>(query, currentTimeout);
        return info.WorkersData;
    }

Использование

Для регистрации необходимо при старте проекта вызвать методы

//Job library registration
builder.Services.AddScoped<IJob<ForEachJobInput, ForEachJobResult>, ForEachJob>();
builder.Services.AddJobContext();

Где ForEachJob — это простая тестовая задача, которая перебирает значения от 0 до Count c ожидаеним секунды между шагами. В данном тестовом джобе ForEachJob отдельно унаследован интерфейс IDisposable, что позволяет дополнительно определить механизм очистки ресурсов в момент остановки задачи или после её штатного завершения работы.

public class ForEachJob : IJob<ForEachJobInput, ForEachJobResult>, IDisposable
{
    private int _currentState;

    private readonly ILogger<ForEachJob> _logger;
    //Здесь возможна инъекция любого зарегестрированного в приложении интерфейса 
    public ForEachJob(ILogger<ForEachJob> logger)
    {
        _logger = logger;
    }

    public async Task<bool> DoAsync(ForEachJobInput input, CancellationToken token)
    {
        foreach (var item in Enumerable.Range(0, input.Count))
        {
            if (token.IsCancellationRequested)
                return false;

            _currentState = item;
            _logger.LogInformation(item.ToString());
            await Task.Delay(1000, token);
        }

        return true;
    }

    public ForEachJobResult GetCurrentState(Guid jobId)
    {
        return new ForEachJobResult
        {
            Id = jobId,
            Data = _currentState
        };
    }

    public void Dispose()
    {
        _logger.LogInformation("Dispose.");
    }

}

Вызов, например, из контроллера будет выглядить так:

public class ForEachJobController : ControllerBase
{
    private readonly IJobContext<ForEachJobInput, ForEachJobResult> _jobContext;
    
    public ForEachJobController(
      IJobContext<ForEachJobInput, ForEachJobResult> jobContext)
    {
        _jobContext = jobContext;
    }
    
    [HttpPost]
    [Route(nameof(CreateJob))]
    public async Task<JobCreatedCommandResult> CreateJob([FromBody] ForEachJobInput input)
    {
        return await _jobContext.CreateJobAsync(input);
    }
    
    [HttpPost]
    [Route(nameof(DoJob))]
    public async Task<JobDoneCommandResult> DoJob([FromBody] ForEachJobInput input)
    {
        return await _jobContext.DoJobAsync(input);
    }
    
    [HttpPost]
    [Route(nameof(StopJob))]
    public async Task<StopJobCommandResult> StopJob([FromBody] Guid jobId)
    {
        return await _jobContext.StopJobAsync(jobId);
    }
    
    [HttpGet]
    [Route(nameof(GetAllJobs))]
    public async Task<ICollection<ForEachJobResult?>> GetAllJobs(
      [FromQuery] int requestId)
    {
        var result = await _jobContext
            .GetAllJobsCurrentStatesAsync(requestId);
        return result.Values.Select(x => x.Result).ToList();
    }
}

Итог

Akka один из возможных способов запуска отложенных задач с потенциальной возможностью кластеризации решения.

Источник: https://habr.com/ru/articles/741044/


Интересные статьи

Интересные статьи

В последние годы команда .NET усиленно рекламирует ASP.NET Core как один из самых быстрых веб-фреймворков на рынке. Источником этих утверждений всегда были бенчмарки TechEmpower Framework Benchmarks.С...
Предлагаю поразмять мозги и как в прошлом году, порешать задачки с математической олимпиады в комментариях к этой статье. Задачек 6 штук, и на них отводилось 2 дня по 4,5 часа. (Чур, в ответы не п...
Снова привет, Хабр! В первой части статьи мы разбирали, что такое изоляция микросервисов, как в этом помогают переключатели функциональности, и как создать простое ASP.NET приложение в с поддержк...
В этом посте мы на несколько минут заглянем в Параллельную вселенную и посмотрим, как у них организованы рабочие процессы, задана мотивация и как они ищут новые продукты....
Недавно мне повезло пообщаться с Крисом Бэйконом, который написал DotNetAnywhere (альтернативный вариант .NET Runtime), и я остроумно заметил: … ты, наверное, один из тех немногих, кто создал со...