Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
Допустим, необходимо запустить множество фоновых задач в .Net с возможностью в дальнейшем обратиться к их состоянию. Обращение к членам класса запущенной задачи может быть полезно, если необходимо однозначно определить текущее состояние объекта в момент выполнения.
Одно из частных решений — модель акторов
Модель акторов позволяет запускать фоновые задачи и обращаться к их текущему состоянию.
Интерфейсы
Определим интерфейс, который опишет выполняемую работу — IJob, с обязательной передачей в метод DoAsync
- CancellationToken token
.
public interface IJob<in TIn, out TOut>
where TIn : IJobInput
where TOut : IJobResult
{
Task<bool> DoAsync(TIn input, CancellationToken token);
/// <summary>
/// Describes the state of the fields of the IJob class that are changed by the DoAsync method.
/// </summary>
TOut GetCurrentState(Guid jobId);
}
Интерфейс джоба - IJob определен, но вызов его имплементации напрямую не даст искомых результатов, поэтому необходимо определить контекст выполнения джобов IJobContext, который объединит группу выполняемых однотипных задач. Здесь необходимо предоставить возможность не только запускать фоновые таски CreateJobAsync
, но и ожидать полного выполнения задачи DoJobAsync
.
public interface IJobContext<in TIn, TOut>
where TIn : IJobInput
where TOut : IJobResult
{
/// <summary>
/// Create a background job
/// </summary>
/// <returns>Job Id</returns>
Task<JobCreatedCommandResult> CreateJobAsync(TIn input,
int? maxNrOfRetries = null,
TimeSpan? minBackoff = null,
TimeSpan? maxBackoff = null,
Guid? jobId = null,
TimeSpan? timeout = null);
/// <summary>
/// Waiting for a response about the completion of the job
/// </summary>
Task<JobDoneCommandResult> DoJobAsync(TIn input,
int? maxNrOfRetries = null, TimeSpan? minBackoff = null, TimeSpan? maxBackoff = null, Guid? jobId = null, TimeSpan? timeout = null);
Task<StopJobCommandResult> StopJobAsync(Guid jobId, TimeSpan? timeout = null);
Task<IDictionary<Guid, ReplyWorkerInfo<TOut>>> GetAllJobsCurrentStatesAsync(long requestId, TimeSpan? timeout = null);
}
Реализация
Основной идеей модели акторов является изоляция состояния каждого объекта модели от внешнего вмешательства, что должно гарантировать предсказуемое изменение своего состояния. Общение между акторами происходит через очереди сообщений, в которые для каждого актора упорядочивают все входящие запросы, что дает последовательность выполнения команд и также позволяет избежать побочных эффектов влияния на члены класса актора.
Общая схема запросов будет выглядеть как MasterActor→ GroupActor → ManagerActor → WorkerActor
MasterActor
Определим точку входа в систему акторов — это будет MasterActor.
Его задачами является доставка сообщений до своих дочерних акторов и определение стратегии перезапуска подчиненных элементов в случае ошибки.
Для отправки сообщений из внешних источников, которые не находятся в системе акторов ActorSystem (здесь это JobContext) используется команда ожидания ответа - Ask.
await _masterActor.Ask<JobCreatedCommandResult>(command, currentTimeout);
Ask ожидает ответа от модели акторов JobCreatedCommandResult
, для заданного промежутка времени currentTimeout. Где команда имеет тип DoJobCommand
и принимается MasterActor методом Receive
, заданном в конструкторе.
DoJobCommandHandler
- метод, который передаст сообщение существующему групповому актору или создаст его новый экземпляр в случае отсутсвия с созранением ссылки IActorRef на него.
StopJobCommandHandler
- метод вызываемый в момент запроса на остановку воркера.RequestAllWorkersInfoQueryHandler
- хендлер запросов состояния группы акторов определенного типа IJob
internal class MasterActor<TIn, TOut> : ReceiveActor
where TIn : IJobInput
where TOut : IJobResult
{
private readonly Dictionary<string, IActorRef> _groupIdToActor = new();
private readonly Dictionary<IActorRef, string> _actorToGroupId = new();
public MasterActor()
{
//Commands
Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
Receive<StopJobCommand>(StopJobCommandHandler);
//Queries
Receive<RequestAllWorkersInfo>(RequestAllWorkersInfoQueryHandler);
}
private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
{
if (_groupIdToActor.TryGetValue(doJobCommand.GroupName, out var actorRef))
{
actorRef.Forward(doJobCommand);
return;
}
var groupActorProps = DependencyResolver
.For(Context.System)
.Props<GroupActor<TIn,TOut>>();
var groupActor = Context
.ActorOf(groupActorProps, $"group-{doJobCommand.GroupName}");
Context.Watch(groupActor);
groupActor.Forward(doJobCommand);
_groupIdToActor.Add(doJobCommand.GroupName, groupActor);
_actorToGroupId.Add(groupActor, doJobCommand.GroupName);
}
private void StopJobCommandHandler(StopJobCommand command)
{
if (!_groupIdToActor.ContainsKey(command.GroupName))
{
Sender.Tell(new StopJobCommandResult(false, $"Group Actor list does not contain {command.GroupName}"));
return;
}
_groupIdToActor[command.GroupName].Forward(command);
}
...
GroupActor
DoJobCommandHandler должен создать дочерний актор GroupActor, который опишет группу выполняемых задач. Его функция проста, он должен передавать сообщения управленцам рабочих и сохранять ссылки на дочерние акторы для предоставления в дальнейшем возможности обратиться к их состоянию.
TrySaveWorkerActorRefCommand
- сообщение которое передается от workerActor для сохранения ссылки IActorRef в групповом акторе.ManagerActorTerminatedHandler
- вызывается после остановкиManagerActor
и удаляет ссылки на объектыIActorRef
дляManagerActor
иWorkerActor
.
internal class GroupActor<TIn, TOut> : ReceiveActor
where TIn : IJobInput
where TOut : IJobResult
{
private string? _groupId;
private readonly Dictionary<Guid, IActorRef> _idToManagerActor = new();
private readonly Dictionary<IActorRef, Guid> _managerActorToId = new();
private readonly Dictionary<IActorRef, Guid> _workerActorToId = new();
private readonly Dictionary<Guid, IActorRef> _idToWorkerActor = new();
public GroupActor()
{
//Commands
Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
Receive<StopJobCommand>(StopJobCommandHandler);
//Queries
Receive<RequestAllWorkersInfo>(RequestAllWorkersInfoQueryHandler);
//Internal
Receive<TrySaveWorkerActorRefCommand>(TrySaveWorkerActorRefCommandHandler);
Receive<Terminated>(ManagerActorTerminatedHandler);
}
private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
{
if (_groupId != null && doJobCommand.GroupName != _groupId)
{
var message = "Ignoring Create Worker Actor";
Sender.Tell(doJobCommand.IsCreateCommand
? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
: new JobDoneCommandResult(false, message, doJobCommand.JobId));
return;
}
if (_idToManagerActor.ContainsKey(doJobCommand.JobId))
{
var message = $"{doJobCommand.JobId} Actor Exists.";
Sender.Tell(doJobCommand.IsCreateCommand
? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
: new JobDoneCommandResult(false, message, doJobCommand.JobId));
return;
}
_groupId ??= doJobCommand.GroupName;
var managerActorProps = DependencyResolver
.For(Context.System)
.Props<ManagerActor<TIn,TOut>>();
var managerActor = Context.ActorOf(managerActorProps,
$"manager-{doJobCommand.JobId}");
Context.Watch(managerActor);
_idToManagerActor.Add(doJobCommand.JobId, managerActor);
_managerActorToId.Add(managerActor, doJobCommand.JobId);
managerActor.Forward(doJobCommand);
}
private void ManagerActorTerminatedHandler(Terminated t)
{
var workerId = _managerActorToId[t.ActorRef];
_managerActorToId.Remove(t.ActorRef);
_idToManagerActor.Remove(workerId);
if (!_idToWorkerActor.TryGetValue(workerId, out var workerActorRef))
return;
_workerActorToId.Remove(workerActorRef);
_idToWorkerActor.Remove(workerId);
}
...
ManagerActor
С поведением ManagerActor дела обстоят чуть сложнее, ему необходимо определить стратегию поведения при возникновении ошибок у рабочего WorkerActor, и сохранить начальный запрос «WorkerDoJobCommand» для повторных отправок этого запроса в моменты перезапуска.
internal class ManagerActor<TIn, TOut> : ReceiveActor
where TIn : IJobInput
where TOut : IJobResult
{
//Параметры стратегии перезапуска
private int _currentNrOfRetries;
private int _maxNrOfRetries;
private TimeSpan _minBackoff;
private TimeSpan _maxBackoff;
//Ссылка на отправитель команды на выполнение работы
private IActorRef? _doJobCommandSender;
//BackoffSupervisor
private IActorRef? _workerSupervisorActor;
//Команда для дочернего актора для повторной отправки в момент рестартов
private WorkerDoJobCommand<TIn>? _doJobCommand;
private Guid _jobId;
private bool _startedFlag;
//StopJobCommandHandler переданный от JobContext вызовет отмену задачи
private readonly CancellationTokenSource _cancellationTokenSource = new ();
private readonly ILogger<ManagerActor<TIn, TOut>> _logger;
public ManagerActor(ILogger<ManagerActor<TIn, TOut>> logger)
{
_logger = logger;
//Commands
Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
Receive<StopJobCommand>(StopJobCommandHandler);
//Queries
Receive<ReadWorkerInfoCommand>(ReadWorkerInfoCommandHandler);
//Internal
Receive<TrySaveWorkerActorRefCommand>(TrySaveWorkerActorRefCommandHandler);
Receive<GiveMeWorkerDoJobCommand>(GiveMeWorkerDoJobCommandHandler);
Receive<Terminated>(WorkerActorTerminatedHandler);
}
...
DoJobCommandHandler
- только создает воркера без отправки сообщения на выполнение команды, и задает ему стратегию перезапуска, которая определена «BackoffSupervisor
» в момент создания WorkerActor. Перезапуск воркера будет произведен если возникнет ошибка в самом воркере и будет выполнен количество равноеmaxNrOfRetries
раз в интервале отminBackoff
доmaxBackoff
. Интервал необходим для того, чтобы повторные попытки выполнить работу в случае подключения к внешним ресурсам равномерно распределили запросы в заданный промежуток времени.GiveMeWorkerDoJobCommandHandler
- обработчик сообщенияGiveMeWorkerDoJobCommand
, которое присылает дочеркий воркер актор в момент инициализации своего состояния - это необходимо для перезапусков, так как если отправить сообщение в самомDoJobCommandHandler
, который выполняется один раз, то в момент падения воркер актора его работа не будет начата заново.
private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
{
if (_workerSupervisorActor != null)
{
var message = "Ignoring Create Worker Actor";
Sender.Tell(doJobCommand.IsCreateCommand
? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
: new JobDoneCommandResult(false, message, doJobCommand.JobId));
return;
}
_jobId = doJobCommand.JobId;
_maxNrOfRetries = doJobCommand.MaxNrOfRetries;
_minBackoff = doJobCommand.MinBackoff;
_maxBackoff = doJobCommand.MaxBackoff;
_doJobCommandSender = Sender;
var workerActorProps = DependencyResolver
.For(Context.System)
.Props<WorkerActor<TIn,TOut>>();
var supervisorOfWorkerActorProps = BackoffSupervisor.Props(
Backoff.OnFailure(
workerActorProps,
childName: $"worker-{doJobCommand.JobId}",
minBackoff: _minBackoff,
maxBackoff: _maxBackoff,
randomFactor: 0.2,
maxNrOfRetries: _maxNrOfRetries)
.WithSupervisorStrategy(new OneForOneStrategy(exception =>
{
if (exception is TaskCanceledException
|| exception.InnerException is TaskCanceledException
|| _currentNrOfRetries >= _maxNrOfRetries)
{
var text = $"BackoffSupervisor: jobId: {_jobId}" +
$" {exception?.Message}" +
$" InnerException: {exception?.InnerException?.Message}";
_logger.LogError(text);
_doJobCommandSender.Tell(new JobDoneCommandResult(false, text, _jobId));
return Directive.Stop;
}
_currentNrOfRetries += 1;
return Directive.Restart;
})));
_workerSupervisorActor = Context
.ActorOf(supervisorOfWorkerActorProps, $"supervisor-of-worker-{doJobCommand.JobId}");
Context.Watch(_workerSupervisorActor);
_doJobCommand = new WorkerDoJobCommand<TIn>(
doJobCommand.JobInput,
_doJobCommandSender,
doJobCommand.JobId,
_cancellationTokenSource,
doJobCommand.IsCreateCommand);
}
...
WorkerActor
Далее команда выполнения попадает к рабочему WorkerActor, который отправит сообщения о создании работы JobCreatedCommandResult
или о её завершении JobDoneCommandResult
в зависимости от изначально выбранного метода в IJobContext
CreateJobAsync
или DoJobAsync
соответственно.
По завершении работы WorkerActor положет себе за щеку смертельную пилюлю _self.Tell(PoisonPill.Instance)
, что приведет к его остановке и очистке ресурсов.
internal class WorkerActor<TIn, TOut> : ReceiveActor
where TIn : IJobInput
where TOut : IJobResult
{
private Guid _jobId;
private readonly IServiceScope _scope;
private readonly IActorRef _self;
private IJob<TIn, TOut> _job;
public WorkerActor(IServiceProvider serviceProvider)
{
_self = Self;
_scope = serviceProvider.CreateScope();
//Commands
Receive<WorkerDoJobCommand<TIn>>((msg) =>
{
WorkerDoJobCommandHandlerAsync(msg).PipeTo(_self);
});
//Queries
Receive<ReadWorkerInfoCommand>(ReadWorkerInfoCommandHandler);
//Internal
Receive<Status.Failure>(Failed);
Context.Parent.Tell(new GiveMeWorkerDoJobCommand());
}
private async Task WorkerDoJobCommandHandlerAsync(WorkerDoJobCommand<TIn> command)
{
_jobId = command.JobId;
Context.Parent.Tell(new TrySaveWorkerActorRefCommand(_self, _jobId, command.DoJobCommandSender));
if(command.IsCreateCommand)
command.DoJobCommandSender.Tell(new JobCreatedCommandResult(true, "", _jobId));
var token = command.CancellationTokenSource.Token;
var jobResult = await _job.DoAsync(command.JobInput, token);
if(token.IsCancellationRequested)
{
if(!command.IsCreateCommand)
command.DoJobCommandSender.Tell(new JobDoneCommandResult(false,
"Job was cancelled.",
command.JobId));
return;
}
if(!command.IsCreateCommand)
command.DoJobCommandSender.Tell(new JobDoneCommandResult(jobResult, "Ok", command.JobId));
_self.Tell(PoisonPill.Instance);
}
Возможность отмены задач
Предусмотрена возможность отмены задачи. ManagerActor в момент получения команды StopJobCommand на остановку, вызывает отмену своего _cancellationTokenSource.Cancel(),
кансел токен которого был прокинут в дочерний WorkerActor, и отправку ему смертельной пилюли _workerSupervisorActor.Tell(PoisonPill.Instance).
private void StopJobCommandHandler(StopJobCommand _)
{
if (!_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource.Cancel();
_workerSupervisorActor.Tell(PoisonPill.Instance);
Sender.Tell(new StopJobCommandResult(true, "Ok"));
return;
}
Sender.Tell(new StopJobCommandResult(false, "Cancellation Requested Already."));
}
DI - Microsoft.Extensions.DependencyInjection
В WorkerActor инъектирован интерфейс IServiceProvider serviceProvider
, что позволяет управлять временем жизни скоупа _scope
от момента инициализации воркера до момента остановки актора и очистки ресурсов. Для джоба пользователя скоуп - это все, что находится внутри метода IJob.DoAsync
. Вызов _scope.Dispose(
) также вызовет методы Dispose
, если в имплементации IJob
будет также унаследован IDispoosable
интерфейс.
protected override void PreStart()
{
_job = _scope.ServiceProvider.GetService<IJob<TIn, TOut>>();
}
protected override void PostStop()
{
_scope.Dispose();
}
Запросы
JobContext позволяет запросить состояние акторов, которые были созданы для определенного типа IJob
.
public async Task<IDictionary<Guid, ReplyWorkerInfo<TOut>>> GetAllJobsCurrentStatesAsync(long requestId,
TimeSpan? timeout = null)
{
var currentTimeout = timeout ?? _defaultTimeout;
var query = new RequestAllWorkersInfo(requestId, GetGroupName(), currentTimeout);
RespondAllWorkersInfo<TOut> info = await _masterActor
.Ask<RespondAllWorkersInfo<TOut>>(query, currentTimeout);
return info.WorkersData;
}
Использование
Для регистрации необходимо при старте проекта вызвать методы
//Job library registration
builder.Services.AddScoped<IJob<ForEachJobInput, ForEachJobResult>, ForEachJob>();
builder.Services.AddJobContext();
Где ForEachJob
— это простая тестовая задача, которая перебирает значения от 0 до Count
c ожидаеним секунды между шагами. В данном тестовом джобе ForEachJob
отдельно унаследован интерфейс IDisposable
, что позволяет дополнительно определить механизм очистки ресурсов в момент остановки задачи или после её штатного завершения работы.
public class ForEachJob : IJob<ForEachJobInput, ForEachJobResult>, IDisposable
{
private int _currentState;
private readonly ILogger<ForEachJob> _logger;
//Здесь возможна инъекция любого зарегестрированного в приложении интерфейса
public ForEachJob(ILogger<ForEachJob> logger)
{
_logger = logger;
}
public async Task<bool> DoAsync(ForEachJobInput input, CancellationToken token)
{
foreach (var item in Enumerable.Range(0, input.Count))
{
if (token.IsCancellationRequested)
return false;
_currentState = item;
_logger.LogInformation(item.ToString());
await Task.Delay(1000, token);
}
return true;
}
public ForEachJobResult GetCurrentState(Guid jobId)
{
return new ForEachJobResult
{
Id = jobId,
Data = _currentState
};
}
public void Dispose()
{
_logger.LogInformation("Dispose.");
}
}
Вызов, например, из контроллера будет выглядить так:
public class ForEachJobController : ControllerBase
{
private readonly IJobContext<ForEachJobInput, ForEachJobResult> _jobContext;
public ForEachJobController(
IJobContext<ForEachJobInput, ForEachJobResult> jobContext)
{
_jobContext = jobContext;
}
[HttpPost]
[Route(nameof(CreateJob))]
public async Task<JobCreatedCommandResult> CreateJob([FromBody] ForEachJobInput input)
{
return await _jobContext.CreateJobAsync(input);
}
[HttpPost]
[Route(nameof(DoJob))]
public async Task<JobDoneCommandResult> DoJob([FromBody] ForEachJobInput input)
{
return await _jobContext.DoJobAsync(input);
}
[HttpPost]
[Route(nameof(StopJob))]
public async Task<StopJobCommandResult> StopJob([FromBody] Guid jobId)
{
return await _jobContext.StopJobAsync(jobId);
}
[HttpGet]
[Route(nameof(GetAllJobs))]
public async Task<ICollection<ForEachJobResult?>> GetAllJobs(
[FromQuery] int requestId)
{
var result = await _jobContext
.GetAllJobsCurrentStatesAsync(requestId);
return result.Values.Select(x => x.Result).ToList();
}
}
Итог
Akka один из возможных способов запуска отложенных задач с потенциальной возможностью кластеризации решения.