Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
img source
На прошлой неделе для Rust комьюнити случилось огромное событие — вышла версия компилятора 1.39, а в месте с ней и стабилизация async-await фичи. В этом посте я постараюсь резюмировать все релевантные изменения в компиляторе и экосистеме, а также предоставить инструкции по миграции на async-await парадигму. Детального разбора асинхронности в Rust я делать не буду, есть всё ещё актуальные статьи на хабре, которые помогут войти в тему:
- чат на mio: часть 1, часть 2;
- tokio + futures-rs 0.1;
- обзор tokio;
- генераторы (тут используется непринятый вариант синтаксиса с макросом
await!
, он уже работать не будет, но подноготная осталась такой же).
Помимо указанных статей можно также обратиться к документациям стандартной библиотеке и нужных крейтов, а также почитать async-book (на англ.).
Все примеры, рассматриваемые в статье работают на стабильном компиляторе 1.38 и должны работать на всех последующих версиях. Конечный код доступен на github.
Для реализации асинхронного кода использовалась библиотека futures-0.1. Она предоставляет базовые типажи futures::Future
и futures::Stream
для работы с отложенными вычислениями. Они оперируют с типами Result<..>
и предоставляют набор комбинаторов. Помимо этого, библиотека предоставляет каналы для общения между задачами (task), различные интерфейсы для работы с экзекьютором и его системой задач и прочее.
Рассмотрим пример, который генерирует числовой ряд из старших 32 бит чисел Фибоначчи и отправляет их в Sink
:
// futures 0.1.29
use futures::prelude::*;
use futures::{stream, futures};
fn sink_fibb_series(
sink: impl Sink<SinkItem = u32, SinkError = ()>,
) -> impl Future<Item = (), Error = ()> {
stream::unfold((1u32, 1), |(mut fact, n)| {
while fact.checked_mul(n).is_none() {
fact >>= 1;
}
fact *= n;
Some(future::ok((fact, (fact, n + 1))))
})
.forward(sink)
.map(|_v| ())
}
Зам.: считать CPU-bound задачи на корутинах не самое лучшее применение, зато пример самодостаточен и прост.
Как можно заметить, код выглядит достаточно громоздко: необходимо указывать возвращаемое значение, несмотря на то, что никакого полезного значения в нем нет. В futures 0.3 код становится немного проще:
// futures 0.3.1
use futures::prelude::*;
use futures::stream;
async fn sink_fibb_series(sink: impl Sink<u32>) {
stream::unfold((1u32, 1), |(mut fact, n)| {
async move {
while fact.checked_mul(n).is_none() {
fact >>= 1;
}
fact *= n;
Some((fact, (fact, n + 1)))
}
})
.map(Ok)
.forward(sink)
.map(|_v| ())
.await;
}
Здесь у функции добавляется ключевое слово async
, которое оборачивает возвращаемое значение функции в Future
. Поскольку в нашем случае это кортеж нулевого размера, то его можно попросту опустить, как и в обычных функциях.
Для ожидания выполнения в конце цепочки вызовов используется ключевое слово await
. Этот вызов приостанавливает выполнение в текущем async-контексте и передает управление планировщику до тех пор, пока ожидаемое Future
значение не будет готово. Затем выполнение возобновляется с последнего await
(в нашем примере завершая функцию), т.е. поток управления становится нелинейным по сравнению с аналогичным синхронным кодом.
Ещё одно значительное различие — наличие async-блока в теле замыкания внутри stream::unfold
. Эта обёртка является полным аналогом объявлением новой async-функции с таким же телом и вызовом вместо async-блока.
Возможно это замыкание в скором времени написать с помощью фичи async_closure
, но увы, она пока не реализована:
async |(mut fact, n)| {
while fact.checked_mul(n).is_none() {
fact >>= 1;
}
fact *= n;
Some((fact, (fact, n + 1)))
}
Как можно заметить, новый типаж Stream
работает не только с элементами типа Result<..>
, как это было ранее. Аналогичные изменения коснулись типажа Future
, определения по версиям следующие:
// futures 0.1
trait Future {
type Item;
type Error;
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>;
}
enum Async<T> {
Ready(T),
NotReady
}
// futures 0.3
trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending
}
Помимо того, что тип возвращаемого значения может быть произвольным, также поменялись и входные параметры для Future::poll
. Появился новый параметр Context
, который предоставляет явный интерфейс для пробуждения текущей задачи. Ранее то же самое можно было достигнуть через глобальные переменные конкретного экзекьютора (например через вызов tokio::prelude::task::current().notify()
).
Более фундаментальное отличие интерфейса в том, что ссылку на себя требуется оборачивать в Pin
. Эта обертка над указателем гарантирует "неподвижность" данных в памяти (более подробное описание Pin
есть 1.33 релизе компилятора на хабре, либо на английском, в документации стандартной библиотеки std::pin).
Попробуем теперь запустить наш пример. В качестве Sink
возьмем половину канала из futures и на выходной стороне будем печатать результат с некоторой задержкой между итерациями. На futures-0.1 такой код можно написать следующим образом:
use std::time::{Duration, Instant};
// futures 0.1.29
use futures::prelude::*;
use futures::sync::mpsc;
// tokio 0.1.22
use tokio::runtime::Runtime;
use tokio::timer::Delay;
fn main() {
let mut rt = Runtime::new().unwrap();
let (tx, rx) = mpsc::channel(32);
rt.spawn(Box::new(sink_fibb_series(tx.sink_map_err(|_e| ()))));
let fut = rx.take(100).for_each(|val| {
println!("{}", val);
Delay::new(Instant::now() + Duration::from_millis(50))
.map(|_| ())
.map_err(|_| ())
});
rt.spawn(Box::new(fut));
rt.shutdown_on_idle().wait().unwrap();
}
Аналогичный код с новым tokio (который на момент написания ещё alpha) и futures-0.3 может выглядеть так:
use std::time::Duration;
// futures 0.3.1
use futures::channel::mpsc;
use futures::prelude::*;
// tokio 0.2.0-alpha.5
use tokio::timer;
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(32);
tokio::spawn(sink_fibb_series(tx));
rx.take(100)
.for_each(|val| {
println!("{}", val);
timer::delay_for(Duration::from_millis(50))
})
.await;
}
Как можно заметить, код с новыми футурами стал значительно короче. По опыту автора, количество строк всегда выходит ощутимо меньше (порой даже при переписывании синхронного кода). Но как мне кажется, куда более весомое отличие в читабельности и отсутствия мешанины вызовов map
/map_err
, которые были необходимы из-за вариативности ошибок у стандартных типов в Result<..>
.
Комбинаторы над элементами типа Result<..>
тем не менее остались и находятся в отдельных типажах, некоторые со слегка обновленным названием. Теперь они разбиты по двум разным типажам; те, которые реализованы для:
- всех элементов:
futures::FuturesExt
иfutures::StreamExt
; - элементов типа
Result<..>
:futures::FuturesExt
иfutures::StreamExt
.
Чуть более сложным оказывается реализация типажей Future
и Stream
. Для примера попробуем реализовать Stream
для уже рассмотренного числового ряда. Общий тип для обеих версий футур будет следующий:
struct FactStream {
fact: u32,
n: u32,
}
impl FactStream {
fn new() -> Self {
Self { fact: 1, n: 1 }
}
}
Для futures-0.1 реализация будет следующая:
impl Stream for FactStream {
type Item = u32;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
while self.fact.checked_mul(self.n).is_none() {
self.fact >>= 1;
}
self.fact *= self.n;
self.n += 1;
Ok(Async::Ready(Some(self.fact)))
}
}
В этом примере реализация Stream::poll
фактически является полной копией замыкания stream::unfold
. В случае с futures-0.3 реализация оказывается эквивалентной:
impl Stream for FactStream {
type Item = u32;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while self.fact.checked_mul(self.n).is_none() {
self.fact >>= 1;
}
self.fact *= self.n;
self.n += 1;
Poll::Ready(Some(self.fact))
}
}
Однако, если тип какого-нибудь поля структуры не реализует Unpin
, то std::ops::DerefMut
не будет реализовать на Pin<&mut T>
и тем самым не будет мутабельного доступа ко всем полям:
use std::marker::PhantomPinned;
struct Fact {
inner: u32,
// маркер убирает реализацию Unpin у структуры
_pin: PhantomPinned,
}
struct FactStream {
fact: Fact,
n: u32,
}
impl Stream for FactStream {
type Item = u32;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while self.fact.inner.checked_mul(self.n).is_none() {
self.fact.inner >>= 1; // <- ошибка компиляции
// trait `DerefMut` is required to modify
// through a dereference, but it is not
// implemented for `std::pin::Pin<&mut FactStream>`
}
self.fact.inner *= self.n; // <- тут аналогично
self.n += 1; // <-
Poll::Ready(Some(self.fact.inner))
}
}
В таком случае, в том или ином виде придется воспользоваться unsafe функциями Pin::get_unchecked_mut
и Pin::map_unchecked_mut
для того, чтобы получить "проекцию" !Unpin
поля (в документации есть более развернутое описание). К счастью, для таких случаев существует безопасная обёртка реализованная в крейте pin_project (детали реализации можно найти в документации библиотеки).
use pin_project::pin_project;
#[pin_project]
struct FactStream {
fact: Fact,
n: u32,
}
impl Stream for FactStream {
type Item = u32;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
while this.fact.inner.checked_mul(*this.n).is_none() {
this.fact.inner >>= 1;
}
this.fact.inner *= *this.n;
*this.n += 1;
Poll::Ready(Some(this.fact.inner))
}
}
Последний момент, который хотелось бы освятить это интеропреабельность между типажами разных версий. Для этого существует модуль futures::compat, который позволяет конвертировать из старых типов в новые и наоборот. К примеру можно проитерироваться по Stream
из futures-0.1 с помощью async-await:
use std::fmt::Display;
// futures 0.3
use new_futures::compat::Compat01As03 as Compat;
use new_futures::StreamExt as _;
// futures 0.1
use old_futures::Stream as OldStream;
async fn stream_iterate<E>(
old_stream: impl OldStream<Item = impl Display, Error = E>,
) -> Result<(), E> {
let stream = Compat::new(old_stream);
let mut stream = Box::pin(stream);
while let Some(item) = stream.as_mut().next().await.transpose()? {
println!("{}", item);
}
Ok(())
}
Примечание: в статье рассматривается только экзекьютор tokio, как наиболее долгоживущий и распространенный. Тем не менее на нём мир не заканчивается, например существует альтернативный async-std
, который помимо этого предоставляет футурные обертки для типов стандартной библиотеки, а также ThreadPool
и LocalPool
из рассмотренной библиотеки futures-0.3.