Миграция на инфраструктуру async-await в Rust

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

birds migration
img source


На прошлой неделе для Rust комьюнити случилось огромное событие — вышла версия компилятора 1.39, а в месте с ней и стабилизация async-await фичи. В этом посте я постараюсь резюмировать все релевантные изменения в компиляторе и экосистеме, а также предоставить инструкции по миграции на async-await парадигму. Детального разбора асинхронности в Rust я делать не буду, есть всё ещё актуальные статьи на хабре, которые помогут войти в тему:


  • чат на mio: часть 1, часть 2;
  • tokio + futures-rs 0.1;
  • обзор tokio;
  • генераторы (тут используется непринятый вариант синтаксиса с макросом await!, он уже работать не будет, но подноготная осталась такой же).

Помимо указанных статей можно также обратиться к документациям стандартной библиотеке и нужных крейтов, а также почитать async-book (на англ.).


Все примеры, рассматриваемые в статье работают на стабильном компиляторе 1.38 и должны работать на всех последующих версиях. Конечный код доступен на github.


Для реализации асинхронного кода использовалась библиотека futures-0.1. Она предоставляет базовые типажи futures::Future и futures::Stream для работы с отложенными вычислениями. Они оперируют с типами Result<..> и предоставляют набор комбинаторов. Помимо этого, библиотека предоставляет каналы для общения между задачами (task), различные интерфейсы для работы с экзекьютором и его системой задач и прочее.


Рассмотрим пример, который генерирует числовой ряд из старших 32 бит чисел Фибоначчи и отправляет их в Sink:


// futures 0.1.29
use futures::prelude::*;
use futures::{stream, futures};

fn sink_fibb_series(
    sink: impl Sink<SinkItem = u32, SinkError = ()>,
) -> impl Future<Item = (), Error = ()> {
    stream::unfold((1u32, 1), |(mut fact, n)| {
        while fact.checked_mul(n).is_none() {
            fact >>= 1;
        }
        fact *= n;
        Some(future::ok((fact, (fact, n + 1))))
    })
    .forward(sink)
    .map(|_v| ())
}

Зам.: считать CPU-bound задачи на корутинах не самое лучшее применение, зато пример самодостаточен и прост.


Как можно заметить, код выглядит достаточно громоздко: необходимо указывать возвращаемое значение, несмотря на то, что никакого полезного значения в нем нет. В futures 0.3 код становится немного проще:


// futures 0.3.1
use futures::prelude::*;
use futures::stream;

async fn sink_fibb_series(sink: impl Sink<u32>) {
    stream::unfold((1u32, 1), |(mut fact, n)| {
        async move {
            while fact.checked_mul(n).is_none() {
                fact >>= 1;
            }
            fact *= n;
            Some((fact, (fact, n + 1)))
        }
    })
    .map(Ok)
    .forward(sink)
    .map(|_v| ())
    .await;
}

Здесь у функции добавляется ключевое слово async, которое оборачивает возвращаемое значение функции в Future. Поскольку в нашем случае это кортеж нулевого размера, то его можно попросту опустить, как и в обычных функциях.


Для ожидания выполнения в конце цепочки вызовов используется ключевое слово await. Этот вызов приостанавливает выполнение в текущем async-контексте и передает управление планировщику до тех пор, пока ожидаемое Future значение не будет готово. Затем выполнение возобновляется с последнего await (в нашем примере завершая функцию), т.е. поток управления становится нелинейным по сравнению с аналогичным синхронным кодом.


Ещё одно значительное различие — наличие async-блока в теле замыкания внутри stream::unfold. Эта обёртка является полным аналогом объявлением новой async-функции с таким же телом и вызовом вместо async-блока.


#[feature(async_closure)

Возможно это замыкание в скором времени написать с помощью фичи async_closure, но увы, она пока не реализована:


async |(mut fact, n)| {
    while fact.checked_mul(n).is_none() {
        fact >>= 1;
    }
    fact *= n;
    Some((fact, (fact, n + 1)))
}

Как можно заметить, новый типаж Stream работает не только с элементами типа Result<..>, как это было ранее. Аналогичные изменения коснулись типажа Future, определения по версиям следующие:


// futures 0.1
trait Future {
    type Item;
    type Error;

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>;
}

enum Async<T> {
    Ready(T),
    NotReady
}

// futures 0.3
trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending
}

Помимо того, что тип возвращаемого значения может быть произвольным, также поменялись и входные параметры для Future::poll. Появился новый параметр Context, который предоставляет явный интерфейс для пробуждения текущей задачи. Ранее то же самое можно было достигнуть через глобальные переменные конкретного экзекьютора (например через вызов tokio::prelude::task::current().notify()).


Более фундаментальное отличие интерфейса в том, что ссылку на себя требуется оборачивать в Pin. Эта обертка над указателем гарантирует "неподвижность" данных в памяти (более подробное описание Pin есть 1.33 релизе компилятора на хабре, либо на английском, в документации стандартной библиотеки std::pin).


Попробуем теперь запустить наш пример. В качестве Sink возьмем половину канала из futures и на выходной стороне будем печатать результат с некоторой задержкой между итерациями. На futures-0.1 такой код можно написать следующим образом:


use std::time::{Duration, Instant};

// futures 0.1.29
use futures::prelude::*;
use futures::sync::mpsc;
// tokio 0.1.22
use tokio::runtime::Runtime;
use tokio::timer::Delay;

fn main() {
    let mut rt = Runtime::new().unwrap();

    let (tx, rx) = mpsc::channel(32);
    rt.spawn(Box::new(sink_fibb_series(tx.sink_map_err(|_e| ()))));

    let fut = rx.take(100).for_each(|val| {
        println!("{}", val);
        Delay::new(Instant::now() + Duration::from_millis(50))
            .map(|_| ())
            .map_err(|_| ())
    });
    rt.spawn(Box::new(fut));

    rt.shutdown_on_idle().wait().unwrap();
}

Аналогичный код с новым tokio (который на момент написания ещё alpha) и futures-0.3 может выглядеть так:


use std::time::Duration;

// futures 0.3.1
use futures::channel::mpsc;
use futures::prelude::*;
// tokio 0.2.0-alpha.5
use tokio::timer;

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(32);

    tokio::spawn(sink_fibb_series(tx));

    rx.take(100)
        .for_each(|val| {
            println!("{}", val);
            timer::delay_for(Duration::from_millis(50))
        })
        .await;
}

Как можно заметить, код с новыми футурами стал значительно короче. По опыту автора, количество строк всегда выходит ощутимо меньше (порой даже при переписывании синхронного кода). Но как мне кажется, куда более весомое отличие в читабельности и отсутствия мешанины вызовов map/map_err, которые были необходимы из-за вариативности ошибок у стандартных типов в Result<..>.


Комбинаторы над элементами типа Result<..> тем не менее остались и находятся в отдельных типажах, некоторые со слегка обновленным названием. Теперь они разбиты по двум разным типажам; те, которые реализованы для:


  • всех элементов: futures::FuturesExt и futures::StreamExt;
  • элементов типа Result<..>: futures::FuturesExt и futures::StreamExt.

Чуть более сложным оказывается реализация типажей Future и Stream. Для примера попробуем реализовать Stream для уже рассмотренного числового ряда. Общий тип для обеих версий футур будет следующий:


struct FactStream {
    fact: u32,
    n: u32,
}

impl FactStream {
    fn new() -> Self {
        Self { fact: 1, n: 1 }
    }
}

Для futures-0.1 реализация будет следующая:


impl Stream for FactStream {
    type Item = u32;
    type Error = ();

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        while self.fact.checked_mul(self.n).is_none() {
            self.fact >>= 1;
        }
        self.fact *= self.n;
        self.n += 1;
        Ok(Async::Ready(Some(self.fact)))
    }
}

В этом примере реализация Stream::poll фактически является полной копией замыкания stream::unfold. В случае с futures-0.3 реализация оказывается эквивалентной:


impl Stream for FactStream {
    type Item = u32;

    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        while self.fact.checked_mul(self.n).is_none() {
            self.fact >>= 1;
        }
        self.fact *= self.n;
        self.n += 1;

        Poll::Ready(Some(self.fact))
    }
}

Однако, если тип какого-нибудь поля структуры не реализует Unpin, то std::ops::DerefMut не будет реализовать на Pin<&mut T> и тем самым не будет мутабельного доступа ко всем полям:


use std::marker::PhantomPinned;

struct Fact {
    inner: u32,
    // маркер убирает реализацию Unpin у структуры
    _pin: PhantomPinned,
}

struct FactStream {
    fact: Fact,
    n: u32,
}

impl Stream for FactStream {
    type Item = u32;

    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        while self.fact.inner.checked_mul(self.n).is_none() {
            self.fact.inner >>= 1; // <- ошибка компиляции
                                   // trait `DerefMut` is required to modify
                                   // through a dereference, but it is not
                                   // implemented for `std::pin::Pin<&mut FactStream>`
        }
        self.fact.inner *= self.n;  // <- тут аналогично
        self.n += 1;                // <-

        Poll::Ready(Some(self.fact.inner))
    }
}

В таком случае, в том или ином виде придется воспользоваться unsafe функциями Pin::get_unchecked_mut и Pin::map_unchecked_mut для того, чтобы получить "проекцию" !Unpin поля (в документации есть более развернутое описание). К счастью, для таких случаев существует безопасная обёртка реализованная в крейте pin_project (детали реализации можно найти в документации библиотеки).


use pin_project::pin_project;

#[pin_project]
struct FactStream {
    fact: Fact,
    n: u32,
}

impl Stream for FactStream {
    type Item = u32;

    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        while this.fact.inner.checked_mul(*this.n).is_none() {
            this.fact.inner >>= 1;
        }
        this.fact.inner *= *this.n;
        *this.n += 1;

        Poll::Ready(Some(this.fact.inner))
    }
}

Последний момент, который хотелось бы освятить это интеропреабельность между типажами разных версий. Для этого существует модуль futures::compat, который позволяет конвертировать из старых типов в новые и наоборот. К примеру можно проитерироваться по Stream из futures-0.1 с помощью async-await:


use std::fmt::Display;

// futures 0.3
use new_futures::compat::Compat01As03 as Compat;
use new_futures::StreamExt as _;
// futures 0.1
use old_futures::Stream as OldStream;

async fn stream_iterate<E>(
    old_stream: impl OldStream<Item = impl Display, Error = E>,
) -> Result<(), E> {
    let stream = Compat::new(old_stream);
    let mut stream = Box::pin(stream);

    while let Some(item) = stream.as_mut().next().await.transpose()? {
        println!("{}", item);
    }

    Ok(())
}

Примечание: в статье рассматривается только экзекьютор tokio, как наиболее долгоживущий и распространенный. Тем не менее на нём мир не заканчивается, например существует альтернативный async-std, который помимо этого предоставляет футурные обертки для типов стандартной библиотеки, а также ThreadPool и LocalPool из рассмотренной библиотеки futures-0.3.

Источник: https://habr.com/ru/post/475272/


Интересные статьи

Интересные статьи

Настройка среды. "Голый" бинарник, или Исполняемый файл без main() Первый шаг в написании своей ОСи — создание бинарника, не зависящего от стандартных библиотек, это делает воз...
Эта публикация написана после неоднократных обращений как клиентов, так и (к горести моей) партнеров. Темы обращений были разные, но причиной в итоге оказывался один и тот же сценарий, реализу...
С каждым годом растет количество атак в корпоративном секторе: например в 2017 году зафиксировали на 13% больше уникальных инцидентов чем в 2016 г., а по итогам 2018 — на 27% больше инциденто...
Одной из «киллер-фич» 12й версии Битрикса была объявлена возможность отдавать статические файлы из CDN, тем самым увеличивая скорость работы сайта. Попробуем оценить практический выигрыш от использова...
Один из самых острых вопросов при разработке на Битрикс - это миграции базы данных. Какие же способы облегчить эту задачу есть на данный момент?