Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
ValueTask и IValueTaskSource
Обычно ValueTask
используют ради оптимизации.
Например, возврат закэшированного результата, или ValueTask.FromCancelled
с переданным CancellationToken
.
Но нет предела оптимизациям и одним ранним выходом теперь не обойтись. Поэтому был добавлен IValueTaskSource
.
В ValueTask
можно создать не только передав готовый результат или Task
, но
и упомянутый выше IValueTaskSource
.
// Конструкторы
public ValueTask(IValueTaskSource source, short token);
public ValueTask(Task task);
public ValueTask<T>(T result);
Что это такое мы с вами сейчас и узнаем
Устройство и алгоритм работы IValueTaskSource
Интерфейс IValueTaskSource
- набор из 3 методов:
public interface IValueTaskSource<out TResult>
{
// Получить статус выполнения текущей операции
ValueTaskSourceStatus GetStatus(short token);
// Запланировать продолжение на выполнение при завершении работы,
// если на момент проверки работа не завершена
void OnCompleted(
Action<object?> continuation,
object? state,
short token,
ValueTaskSourceOnCompletedFlags flags);
// Получить готовый результат операции
TResult GetResult(short token);
}
GetStatus
- получает статус выполнения.
Статус представляется перечислением ValueTaskSourceStatus
public enum ValueTaskSourceStatus
{
// В процессе
Pending,
// Успешно завершилась
Succeeded,
// Завершилась ошибкой (исключением)
Faulted,
// Отменена
Canceled,
}
Этот метод вызывается 1 раз, после создания ValueTask
.
Если операция уже завершилась, то вызывается GetResult
для получения результата.
Если операция в процессе, то вызывается OnCompleted
для регистирования продолжения на выполнение.
OnCompleted
- регистрирует переданное продолжение на выполнение по окончании операции.
Вызывается после GetStatus
.
На вход ему подаются:
Action<object> continuation
- само продолжениеobject state
- объект состояния, который передаетсяcontinuation
ValueTaskSourceOnCompletedFlags flags
- специальные флаги, указывающие поведение при вызове продолжения
Флаги представляются перечислением ValueTaskSourceOnCompletedFlags
:
[Flags]
public enum ValueTaskSourceOnCompletedFlags
{
// Без указаний
None = 0,
// Необходимо использовать текущий SynchronizationContext для продолжения
UseSchedulingContext = 1,
// В продолжении нужно использовать текущий ExecutionContext
FlowExecutionContext = 2,
}
Между вызовом GetStatus
и OnCompleted
может пройти какое-то время и операция завершится.
Поэтому во время выполнения OnCompleted
работа может быть уже закончена.
В таких случаях, продолжение обычно выполняется тут же.
GetResult
- получает результат операции.
Этот метод вызывается 1 раз при завершении работы для получения результата: возвращаемый объект или исключение.
Он должен быть вызван тогда, когда операция только завершилась.
Моей ошибкой во время первой реализации было то, что я использовать семафор для ожидания выполнения.
Но из-за неправильных вызовов случился дедлок:Фоновый поток завершил операцию и выставил результат
В этот момент вызвалось продолжение
Продолжение зашло в
GetResult
и остановилось на семафореФоновый поток не получил обратно управление, т.к. продолжение было вызвано, но семафор еще не выставлен
Также во всех методах присутствует token
. Это специальное значение для обнаружения множественных await
. Зачем они нужны поговорим далее.
Алгоритм работы зависит от GetStatus
:
Pending
- операция не завершилась, поэтому нужно запланировать дальнейшее выполнение:GetStatus
OnCompleted
GetResult
В остальных случаях выполнение уже завершилось, поэтому получаем результат сразу же:
GetStatus
GetResult
Здесь можно провести аналогию с тем, как работает магия async/await
и ее машины состояний. Грубо говоря, мы создали свой собственный Task
с блэкджеком, но пока не такой эффективный.
Реализуем своими руками
Теперь сделаем свою реализацию.
Представим, перед нами задача получения статистики ПК.
У нас есть класс PcMonitor
, отдающий эту статистику.
Он вызывается очень часто, поэтому для оптимизации мы решили:
Опрашивать ПК не на каждый вызов, а с определенным интервалом и хранить полученные значения в кэше
Если при вызове значение из кэша еще актуально, то вернуть его, иначе ждать до следующего сбора
Детали реализации
Статистика представляется структурой PcStatistics
.
Пока там только температура процессора, но вы придумайте, что туда еще можно добавить.
public readonly record struct PcStatistics(double CpuTemperature);
В реализации используется класс ValueTaskSourcePcMonitor
.
Сбор статистики реализован с помощью System.Threading.Timers.Timer
,
который с определенным интервалом кладет в кэш новое значение и обновляет время сбора.
Время сбора представляет TimeSpan
, получаемый с помощью Stopwatch
(не самая лучшая идея, но сойдет)
Наша реализация IValueTaskSource
представляется классом ManualValueTaskSource
.
Он хранит в себе необходимые для работы данные.
public class ManualValueTaskSource: IValueTaskSource<PcStatistics>
{
// Результат работы
private CancellationToken _cancellationToken;
private PcStatistics _cachedResult = new();
private Exception? _exception;
// Инфраструтура для работы IValueTaskSource
private object? _state;
private object? _scheduler;
private Action<object?>? _continuation;
private short _version;
private ExecutionContext? _ec;
// Инфраструктура бизнес-логики
private readonly Timer _timer;
private TimeSpan _lastMeasurementTime = TimeSpan.Zero;
private CustomPcMonitor? _monitor;
}
GetStatus
Пожалуй, его реализация самая простая
public ValueTaskSourceStatus GetStatus(short token)
{
CheckVersion(token);
if (_exception is not null)
{
return ValueTaskSourceStatus.Faulted;
}
if (_cachedResult != default)
{
// Предположим, что настоящий результат не должен быть default
return ValueTaskSourceStatus.Succeeded;
}
return ValueTaskSourceStatus.Pending;
}
Дальше OnCompleted
public void OnCompleted(Action<object?> continuation,
object? state,
short token,
ValueTaskSourceOnCompletedFlags flags)
{
CheckVersion(token);
if (UseExecutionContext())
{
_ec = ExecutionContext.Capture();
}
if (UseSchedulingContext() &&
GetScheduler() is {} scheduler)
{
_scheduler = scheduler;
}
// Здесь может быть состояние гонки, когда
// результат выставляется быстрее, чем заканчивается вызов OnCompleted.
// В нашем случае, такое может случиться, когда время ожидания таймера было очень мало
_state = state;
var prev = Interlocked.CompareExchange(ref _continuation, continuation, null);
if (prev is null)
{
return;
}
_state = null;
// Sentinel - маркер, выставляемый, когда операция завершилась,
// но колбэк еще не был выставлен
if (!ReferenceEquals(prev, Sentinel))
{
throw new InvalidOperationException("Обнаружено множественное ожидание");
}
// Вызываем продолжение синхронно, т.к. уже результат уже готов
InvokeContinuation(continuation, state, synchronously: true);
bool UseExecutionContext() => ( flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext ) is not ValueTaskSourceOnCompletedFlags.None;
bool UseSchedulingContext() =>
( flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext ) is not ValueTaskSourceOnCompletedFlags.None;
object GetScheduler() => ( object? ) SynchronizationContext.Current ?? TaskScheduler.Current;
}
Теперь доходим до реализации GetResult
public PcStatistics GetResult(short version)
{
CheckVersion(version);
if (_exception is not null)
{
throw _exception;
}
if (_cachedResult == default)
{
// Результат еще не готов
throw new InvalidOperationException("Работа еще не завершена");
}
return _cachedResult;
}
Все приведенные выше методы довольно просты в реализации, но на прод их не принесешь:
Нет поддержки отмены
Плохая работа с конкурентностью
ExecutionContext
не используется
Реализация InvokeContinuation
private void InvokeContinuation(Action<object?>? continuation, object? state, bool synchronously)
{
if (continuation is null)
{
return;
}
if (_scheduler is not null)
{
if (_scheduler is SynchronizationContext sc)
{
sc.Post(s =>
{
var t = ( Tuple<Action<object?>, object?> ) s!;
t.Item1(t.Item2);
}, Tuple.Create(continuation, state));
}
else
{
var ts = ( TaskScheduler ) _scheduler;
Task.Factory.StartNew(continuation,
state, CancellationToken.None,
TaskCreationOptions.DenyChildAttach, ts);
}
}
else if (synchronously)
{
continuation(state);
}
else
{
ThreadPool.QueueUserWorkItem(continuation, state, true);
}
}
Добавляем ManualResetValueTaskSourceCore
Реализацию написали. Мы молодцы. А теперь все выбрасываем, так как реализация за нас уже сделана - ManualResetValueTaskSourceCore
.
Она реализует все выше приведенные методы логики IValueTaskSource
.
Теперь перепишем старые методы с его использованием.
public PcStatistics GetResult(short token)
{
return _source.GetResult(token);
}
public ValueTaskSourceStatus GetStatus(short token)
{
return _source.GetStatus(token);
}
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
{
_source.OnCompleted(continuation, state, token, flags);
}
Благодаря этому, можно писать свои ValueTaskSource
и задумываться, только о бизнес-логике
Добавляем пулинг + почему нельзя заново await'ить
Теперь время для оптимизаций.
Заметим, что GetResult
вызывается только 1 раз, причем самым последним и наш IValueTaskSource
после этого трогать не должны.
Почему бы не переиспользовать создаваемые ValueTaskSource
и возвращать их обратно при вызове GetResult
.
Так и сделаем.
В PcMonitor
добавим пул этих объектов: при вызове GetStatisticsAsync
берем из, а в GetResult
будем возвращать обратно в.
// PcStatisticsManualResetValueTaskSource
private ObjectPool<PcStatisticsManualResetValueTaskSource>? _pool;
public PcStatistics GetResult(short token)
{
try
{
var result = _source.GetResult(token);
Reset();
return result;
}
catch (Exception e) when (e is not InvalidOperationException)
{
Reset();
throw;
}
void Reset()
{
// Возвращаем в пул
_pool!.Return(this);
// Перетираем состояние, чтобы переиспользовать
_source.Reset();
}
}
// PcMonitor
private ObjectPool<PcStatisticsManualResetValueTaskSource> _pool;
public ValueTask<PcStatistics> GetStatisticsAsync(CancellationToken token = default)
{
var source = _pool.Get();
return source.Start(this, _pool, token);
}
Теперь мы можем создать ограниченное количество ValueTaskSource
и постоянно их переиспользовать без лишних аллокаций памяти!
Но что, если кто-то попытается заawait
'ить ValueTask
несколько раз?
Тогда весь путь вызовов повторится и в лучшем случае ему вернется старый результат.
Но даже если эти вызовы были крайне близки во времени, никто не гарантирует, что за это время тот же самый IValueTaskSource
не будет передан в другой ValueTask
и будет хранить новое значение.
Вот тут и нужен short token
, передававшийся в любой метод.
Он призван проверять, что вызвавший код, обращается к ValueTask
, в которой находится актуальный IValueTaskSource
.
В ManualResetValueTaskSourceCore
он реализован в виде простого счетчика, поэтому там он называется Version
. Но в общем случае это не обязательно - сойдет любое неповторяющееся значение.
Этот токен задается в самом начале и не изменяется в процессе работы ValueTask
public ValueTask<PcStatistics> Start(ValueTaskSourcePcMonitor monitor, ObjectPool<PcStatisticsManualResetValueTaskSource> pool, CancellationToken token = default)
{
// ...
// ManualResetValueTaskSourceCore.Version - токен, который инкрементируется при вызове Reset()
return new ValueTask<PcStatistics>(this, _source.Version);
}
Использование в .NET
Когда кто-то представляет IValueTaskSource
, почти всегда в пример приводят сокет.
Я не буду исключением.
Читать или писать в сокет можно только одним потоком (только один или читает или пишет).
Растратно каждый раз создавать новые Task
'и на каждый чих (особенно учитывая что "Сеть надежна").
Поэтому внутри себя сокет содержит 2 буфера IValueTaskSource
- для чтения и записи
public partial class Socket
{
/// <summary>Cached instance for receive operations that return <see cref="ValueTask{Int32}"/>. Also used for ConnectAsync operations.</summary>
private AwaitableSocketAsyncEventArgs? _singleBufferReceiveEventArgs;
/// <summary>Cached instance for send operations that return <see cref="ValueTask{Int32}"/>. Also used for AcceptAsync operations.</summary>
private AwaitableSocketAsyncEventArgs? _singleBufferSendEventArgs;
// ...
internal sealed class AwaitableSocketAsyncEventArgs
: SocketAsyncEventArgs,
IValueTaskSource,
IValueTaskSource<int>,
IValueTaskSource<Socket>,
IValueTaskSource<SocketReceiveFromResult>,
IValueTaskSource<SocketReceiveMessageFromResult>
{
// ...
}
}
Например, при чтении из сокета буфер используется таким образом:
internal ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, bool fromNetworkStream, CancellationToken cancellationToken)
{
// Получаем закэшированный IValueTaskSource или создаем новый (потом положим обратно в кэш)
AwaitableSocketAsyncEventArgs saea =
Interlocked.Exchange(ref _singleBufferReceiveEventArgs, null) ??
new AwaitableSocketAsyncEventArgs(this, isReceiveForCaching: true);
// Обновляем состояние IValueTaskSource для новой работы
saea.SetBuffer(buffer);
saea.SocketFlags = socketFlags;
saea.WrapExceptionsForNetworkStream = fromNetworkStream;
// Запускаем асинхронную операцию
return saea.ReceiveAsync(this, cancellationToken);
}
internal sealed class AwaitableSocketAsyncEventArgs
{
public ValueTask<int> ReceiveAsync(Socket socket, CancellationToken cancellationToken)
{
if (socket.ReceiveAsync(this, cancellationToken))
{
// Операция не завершена синхронно - запускаем асинхронную операцию
_cancellationToken = cancellationToken;
return new ValueTask<int>(this, _token);
}
// ...
// Операция завершилась синхронно
return error == SocketError.Success ?
new ValueTask<int>(bytesTransferred) :
ValueTask.FromException<int>(CreateException(error));
}
}
IValueTaskSource
используется также в Channel
'ах.
Он используется как в Bounded
так и в Unbounded
, но пример сделаю на Bounded
.
В BoundedChannel
есть следующие поля
internal sealed class BoundedChannel<T> : Channel<T>, IDebugEnumerable<T>
{
/// <summary>Readers waiting to read from the channel</summary>
private readonly Deque<AsyncOperation<T>> _blockedReaders = new Deque<AsyncOperation<T>>();
/// <summary>Writers waiting to write to the channel.</summary>
private readonly Deque<VoidAsyncOperationWithData<T>> _blockedWriters = new Deque<VoidAsyncOperationWithData<T>>();
/// <summary>Linked list of WaitToReadAsync waiters.</summary>
private AsyncOperation<bool>? _waitingReadersTail;
/// <summary>Linked list of WaitToWriteAsync waiters.</summary>
private AsyncOperation<bool>? _waitingWritersTail;
// ...
}
Как можно заметить, здесь есть поля, использующие AsyncOperation
- тот самый IValueTaskSource
и в нем есть уже знакомые поля:
internal partial class AsyncOperation<TResult>
: AsyncOperation,
IValueTaskSource,
IValueTaskSource<TResult>
{
// Предназначен ли для пулинга
private readonly bool _pooled;
// Асинхронное продолжение
private readonly bool _runContinuationsAsynchronously;
// Результат операции
private TResult? _result;
// Исключение в процессе работы
private ExceptionDispatchInfo? _error;
// continuation из OnCompleted
private Action<object?>? _continuation;
// state из OnCompleted
private object? _continuationState;
// SynchronizationContext или TaskScheduler
private object? _schedulingContext;
private ExecutionContext? _executionContext;
// token
private short _currentId;
}
Для чтения из канала используется ValueTask<T> ReadAsync
:
public override ValueTask<T> ReadAsync(CancellationToken cancellationToken)
{
BoundedChannel<T> parent = _parent;
lock (parent.SyncObj)
{
// Если есть свободные элементы - вернуть их
if (!parent._items.IsEmpty)
{
return new ValueTask<T>(DequeueItemAndPostProcess());
}
// Используем закэшированный IValueTaskSource
if (!cancellationToken.CanBeCanceled)
{
AsyncOperation<T> singleton = _readerSingleton;
if (singleton.TryOwnAndReset())
{
parent._blockedReaders.EnqueueTail(singleton);
return singleton.ValueTaskOfT;
}
}
// Возвращаем новый IValueTaskSource
var reader = new AsyncOperation<T>(parent._runContinuationsAsynchronously | cancellationToken.CanBeCanceled, cancellationToken);
parent._blockedReaders.EnqueueTail(reader);
return reader.ValueTaskOfT;
}
}
Для записи - ValueTask WriteAsync
:
public override ValueTask WriteAsync(T item, CancellationToken cancellationToken)
{
// Количество элемент в очереди
int count = parent._items.Count;
if (count == 0)
{
// Добавляем элемент в свободную очередь или заблокированного читателя
}
else if (count < parent._bufferedCapacity)
{
// Синхронно добавляем элемент в свободную очередь
}
else if (parent._mode == BoundedChannelFullMode.Wait)
{
// Очередь полна, создаем асинхронную операцию записи
// Используем закэшированный IValueTaskSource
if (!cancellationToken.CanBeCanceled)
{
VoidAsyncOperationWithData<T> singleton = _writerSingleton;
if (singleton.TryOwnAndReset())
{
singleton.Item = item;
parent._blockedWriters.EnqueueTail(singleton);
return singleton.ValueTask;
}
}
// Создаем новый IValueTaskSource
var writer = new VoidAsyncOperationWithData<T>(runContinuationsAsynchronously: true, cancellationToken);
writer.Item = item;
parent._blockedWriters.EnqueueTail(writer);
return writer.ValueTask;
}
else if (parent._mode == BoundedChannelFullMode.DropWrite)
{
// Отбрасываем элемент, т.к. очередь полна
}
else
{
// Удаляем последний/первый элемент в очереди и записываем новый
}
return default;
}
Полезные ссылки
Надеюсь, теперь стало понятно, как работают IValueTaskSource
и почему переawait
'ить ValueTask
плохая затея.
Если кому-то стала интересна эта тема, то прилагаю полезные ссылки:
Немного про
ValueTask
Проект из примера
ManualResetValueTaskSourceCore
AwaitableSocketAsyncEventArgs
AsyncOperation
Статья, с которой скопировал реализацию