RESTinio — это асинхронный HTTP-сервер. Простой пример из практики: отдача большого объема данных в ответ

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.


Недавно мне довелось поработать над приложением, которое должно было контролировать скорость своих исходящих подключений. Например, подключаясь к одному URL приложение должно было ограничить себя, скажем, 200KiB/sec. А подключаясь к другому URL — всего 30KiB/sec.


Самым интересным моментом здесь оказалось тестирование этих самых ограничений. Мне потребовался HTTP-сервер, который бы отдавал трафик с какой-то заданной скоростью, например, 512KiB/sec. Тогда бы я мог видеть, действительно ли приложение выдерживает скорость 200KiB/sec или же оно срывается на более высокие скорости.


Но где взять такой HTTP-сервер?


Поскольку я имею некоторое отношение к встраиваемому в С++ приложения HTTP-серверу RESTinio, то не придумал ничего лучше, чем быстренько набросать на коленке простой тестовый HTTP-сервер, который способен отдавать клиенту длинный поток исходящих данных.


О том, насколько это было просто и хотелось бы рассказать в статье. Заодно узнать в комментариях, действительно ли это просто или же я сам себя обманываю. В принципе, данную статью можно рассматривать как продолжение предыдущей статьи про RESTinio под названием "RESTinio — это асинхронный HTTP-сервер. Асинхронный". Посему, если кому-то интересно прочитать о реальном, пусть и не очень серьезном применении RESTinio, то милости прошу под кат.


Общая идея


Общая идея упомянутого выше тестового сервера очень проста: когда клиент подключается к серверу и выполняет HTTP GET запрос, то взводится таймер, срабатывающий раз в секунду. Когда таймер срабатывает, то клиенту отсылается очередной блок данных заданного размера.


Но все несколько сложнее


Если клиент вычитывает данные с меньшим темпом, нежели отсылает сервер, то просто отсылать по N килобайт раз в секунду не есть хорошая идея. Поскольку данные начнут скапливаться в сокете и ни к чему хорошему это не приведет.


Поэтому при отсылке данных желательно на стороне HTTP-сервера контролировать готовность сокета к записи. Пока сокет готов (т.е. в нем не скопилось еще слишком много данных), то новую порцию отсылать можно. А вот если не готов, то нужно подождать пока сокет не перейдет в состояние готовности к записи.


Звучит разумно, но ведь операции ввода-вывода скрыты в потрохах RESTinio… Как тут узнать, можно ли записывать следующую порцию данных или нет?


Из данной ситуации можно выйти, если использовать after-write нотификаторы, которые есть в RESTinio. Например, мы можем написать так:


void request_handler(restinio::request_handle_t req) {
   req->create_response() // Начинаем формировать ответ.
      ... // Наполняем ответ содержимым.
      .done([](const auto & ec) {
          ... // Вот этот код будет вызван когда запись ответа закончится.
      });
}

Лямбда, переданная в метод done() будет вызвана когда RESTinio завершит запись исходящих данных. Соответственно, если сокет какое-то время был не готов к записи, то лямбда будет вызвана не сразу, а после того, как сокет придет в должное состояние и примет все исходящие данные.


За счет использования after-write нотификаторов логика работы тестового сервера будет такой:


  • отсылаем очередную порцию данных, вычисляем время, когда нам нужно было бы отослать следующую порцию при нормальном развитии событий;
  • вешаем after-write нотификатор на очередную порцию данных;
  • когда after-write нотификатор вызывается, мы проверяем, наступило ли время отсылки следующей порции. Если наступило, то сразу же инициируем отсылку следующей порции. Если не наступило, то взводим таймер.

В результате получится, что как только запись начнет притормаживать, отсылка новых данных приостановится. И возобновиться когда сокет будет готов принимать новые исходящие данные.


И еще немного сложного: chunked_output


RESTinio поддерживает три способа формирования ответа на HTTP-запрос. Самый простой способ, который применяется по умолчанию, в данном случае не подходит, т.к. мне требуется практически бесконечный поток исходящих данных. И такой поток, естественно, нельзя отдать в единственный вызов метода set_body.


Поэтому в описываемом тестовом сервере используется т.н. chunked_output. Т.е. при создании ответа я указываю RESTinio, что ответ будет формироваться частями. После чего просто периодически вызываю методы append_chunk для добавления к ответу очередной части и flush для записи накопленных частей в сокет.


А давайте уже посмотрим в код!


Пожалуй, достаточно уже вступительных слов и пора перейти к рассмотрению самого кода, который можно найти в этом репозитории. Начнем с функции request_processor, которая вызывается для обработки каждого корректного HTTP-запроса. При этом углубимся в те функции, которые из request_processor вызываются. Ну а затем уже посмотрим, как именно request_processor ставится в соответствие тому или иному входящему HTTP-запросу.


Функция request_processor и её подручные


Функция request_processor вызывается для обработки нужных мне HTTP GET запросов. Ей в качестве аргументов передаются:


  • Asio-шный io_context, на котором ведется вся работа (он потребуется, например, для взведения таймеров);
  • размер одной части ответа. Т.е. если мне нужно отдавать исходящий поток с темпом в 512KiB/sec, то в качестве этого параметра будет передано значение 512KiB;
  • количество частей в ответе. На случай, если поток должен иметь какую-то ограниченную длину. Например, если нужно отдавать поток с темпом 512KiB/sec в течении 5 минут, то в качестве этого параметра будет передано значение 300 (60 блоков в минуту в течении 5 минут);
  • ну и сам входящий запрос для обработки.

Внутри request_processor создается объект с информацией о запросе и параметрах его обработки, после чего эта самая обработка и начинается:


void request_processor(
        asio_ns::io_context & ctx,
        std::size_t chunk_size,
        std::size_t count,
        restinio::request_handle_t req) {
    auto data = std::make_shared<response_data>(
            ctx,
            chunk_size,
            req->create_response<output_t>(),
            count);

    data->response_
        .append_header(restinio::http_field::server, "RESTinio")
        .append_header_date_field()
        .append_header(
                restinio::http_field::content_type,
                "text/plain; charset=utf-8")
        .flush();

    send_next_portion(data);
}

Тип response_data, содержащий все относящиеся к запросу параметры, выглядит следующим образом:


struct response_data {
    asio_ns::io_context & io_ctx_;
    std::size_t chunk_size_;
    response_t response_;
    std::size_t counter_;

    response_data(
        asio_ns::io_context & io_ctx,
        std::size_t chunk_size,
        response_t response,
        std::size_t counter)
        : io_ctx_{io_ctx}
        , chunk_size_{chunk_size}
        , response_{std::move(response)}
        , counter_{counter}
    {}
};

Тут нужно заметить, что одна из причин появления структуры response_data состоит в том, что объект типа restinio::response_builder_t<restinio::chunked_output_t> (а именно этот тип спрятан за коротким псевдонимом response_t) является moveable-, но не copyable-типом (по аналогии с std::unique_ptr). Поэтому этот объект нельзя просто так захватить в лямбда-функции, которая затем оборачивается в std::function. Но если объект-response поместить в динамически созданный экземпляр response_data, то умный указатель на экземпляр reponse_data уже можно без проблем захватывать в лямбда-функции с последующим сохранением этой лямбды в std::function.


Функция send_next_portion


Функция send_next_portion вызывается каждый раз, когда требуется отослать клиенту очередную часть ответа. Ничего сложного в ней не происходит, поэтому выглядит она достаточно просто и лаконично:


void send_next_portion(response_data_shptr data) {
    data->response_.append_chunk(make_buffer(data->chunk_size_));

    if(1u == data->counter_) {
        data->response_.flush();
        data->response_.done();
    }
    else {
        data->counter_ -= 1u;
        data->response_.flush(make_done_handler(data));
    }
}

Т.е. отсылаем очередную часть. И, если эта часть была последней, то завершаем обработку запроса. А если не последняя, то в метод flush передается after-write нотификатор, который создается, пожалуй, наиболее сложной функцией данного примера.


Функция make_done_handler


Функция make_done_handler отвечает за создание лямбды, которая будет передана в RESTinio в качестве after-write нотификатора. Этот нотификатор должен проверить, завершилась ли запись очередной части ответа успешно. Если да, то нужно разобраться, следует ли следующую часть отослать сразу же (т.е. были "тормоза" в сокете и темп отсылки выдерживать не получается), либо же после некоторой паузы. Если нужна пауза, то она обеспечивается через взведение таймера.


В общем-то, несложные действия, но в коде получается лямбда внутри лямбды, что может смутить людей, не привыкших к "современному" С++. Которому не так уж и мало лет чтобы называться современным ;)


auto make_done_handler(response_data_shptr data) {
    const auto next_timepoint = steady_clock::now() + 1s;
    return [=](const auto & ec) {
        if(!ec) {
            const auto now = steady_clock::now();
            if(now < next_timepoint) {
                auto timer = std::make_shared<asio_ns::steady_timer>(data->io_ctx_);
                timer->expires_after(next_timepoint - now);
                timer->async_wait([timer, data](const auto & ec) {
                        if(!ec)
                            send_next_portion(data);
                    });
            }
            else
                data->io_ctx_.post([data] { send_next_portion(data); });
        }
    };
}

На мой взгляд, основная сложность в этом коде проистекает из-за особенностей создания и "взвода" таймеров в Asio. По-моему, получается как-то слишком уж многословно. Но тут уж что есть, то есть. Зато не нужно никаких дополнительных библиотек привлекать.


Подключение express-like роутера


Показанные выше request_processor, send_next_portion и make_done_handler в общем-то и составляли самую первую версию моего тестового сервера, написанного буквально за 15 или 20 минут.


Но через пару дней использования этого тестового сервера оказалось, что в нем есть серьезный недостаток: он всегда отдает ответный поток с одинаковой скоростью. Скомпилировал со скоростью 512KiB/sec — отдает всем 512KiB/sec. Перекомпилировал со скоростью 20KiB/sec — будет отдавать всем 20KiB/sec и никак иначе. Что было неудобно, т.к. стало нужно иметь возможность получать ответы разной "толщины".


Тогда и появилась идея: а что, если скорость отдачи будет запрашиваться прямо в URL? Например, сделали запрос на localhost:8080/ и получили ответ с заранее заданной скоростью. А если сделали запрос на localhost:8080/128K, то стали получать ответ со скоростью 128KiB/sec.


Потом мысль пошла еще дальше: в URL также можно задавать и количество отдельных частей в ответе. Т.е. запрос localhost:8080/128K/3000 приведет к выдаче потока из 3000 частей со скоростью 128KiB/sec.


Нет проблем. В RESTinio есть возможность использовать маршрутизатор запросов, сделанный под влиянием ExpressJS. В итоге появилась вот такая функция описания обработчиков входящих HTTP-запросов:


auto make_router(asio_ns::io_context & ctx) {
    auto router = std::make_unique<router_t>();

    router->http_get("/", [&ctx](auto req, auto) {
            request_processor(ctx, 100u*1024u, 10000u, std::move(req));
            return restinio::request_accepted();
        });

    router->http_get(
                R"(/:value(\d+):multiplier([MmKkBb]?))",
                [&ctx](auto req, auto params) {

            const auto chunk_size = extract_chunk_size(params);

            if(0u != chunk_size) {
                request_processor(ctx, chunk_size, 10000u, std::move(req));
                return restinio::request_accepted();
            }
            else
                return restinio::request_rejected();
        });

    router->http_get(
                R"(/:value(\d+):multiplier([MmKkBb]?)/:count(\d+))",
                [&ctx](auto req, auto params) {

            const auto chunk_size = extract_chunk_size(params);
            const auto count = restinio::cast_to<std::size_t>(params["count"]);

            if(0u != chunk_size && 0u != count) {
                request_processor(ctx, chunk_size, count, std::move(req));
                return restinio::request_accepted();
            }
            else
                return restinio::request_rejected();
        });

    return router;
}

Здесь формируются обработчики HTTP GET запросов для URL трех типов:


  • вида http://localhost/;
  • вида http://localhost/<speed>[<U>]/;
  • вида http://localhost/<speed>[<U>]/<count>/

Где speed — это число, определяющее скорость, а U — это опциональный мультипликатор, который указывает, в каких единицах задана скорость. Так 128 или 128b означает скорость в 128 байт в секунду. А 128k — 128 килобайт в секунду.


На каждый URL вешается своя лямбда функция, которая разбирается с полученными параметрами, если все нормально, вызывает уже показанную выше функцию request_processor.


Вспомогательная функция extract_chunk_size выглядит следующим образом:


std::size_t extract_chunk_size(const restinio::router::route_params_t & params) {
    const auto multiplier = [](const auto sv) noexcept -> std::size_t {
        if(sv.empty() || "B" == sv || "b" == sv) return 1u;
        else if("K" == sv || "k" == sv) return 1024u;
        else return 1024u*1024u;
    };

    return restinio::cast_to<std::size_t>(params["value"]) *
            multiplier(params["multiplier"]);
}

Здесь C++ная лямбда используется для эмуляции локальных функций из других языков программирования.


Функция main


Осталось посмотреть, как все это запускается в функции main:


using router_t = restinio::router::express_router_t<>;
...
int main() {
    struct traits_t : public restinio::default_single_thread_traits_t {
        using logger_t = restinio::single_threaded_ostream_logger_t;
        using request_handler_t = router_t;
    };

    asio_ns::io_context io_ctx;

    restinio::run(
        io_ctx,
        restinio::on_this_thread<traits_t>()
            .port(8080)
            .address("localhost")
            .write_http_response_timelimit(60s)
            .request_handler(make_router(io_ctx)));

    return 0;
}

Что здесь происходит:


  1. Поскольку мне нужен не обычный штатный роутер запросов (который вообще ничего делать сам не может и перекладывает всю работу на плечи программиста), то я определяю новые свойства для своего HTTP-сервера. Для этого беру штатные свойства однопоточного HTTP-сервера (тип restinio::default_single_thread_traits_t) и указываю, что в качестве обработчика запросов будет использоваться экземпляр express-like роутера. Заодно, чтобы контролировать, что происходит внутри, указываю, чтобы HTTP-сервер использовал настоящий логгер (по умолчанию используется null_logger_t который вообще ничего не логирует).
  2. Поскольку мне нужно взводить таймеры внутри after-write нотификаторов, то мне нужен экземпляр io_context, с которым я смог бы работать. Поэтому я его создаю сам. Это дает мне возможность передать ссылку на мой io_context в функцию make_router.
  3. Остается только запустить HTTP-сервер в однопоточном варианте на ранее созданном мной io_context-е. Функция restinio::run вернет управление только когда HTTP-сервер завершит свою работу.

Заключение


В статье не был показан полный код моего тестового сервера, только его основные моменты. Полный код, которого чуть-чуть больше из-за дополнительных typedef-ов и вспомогательных функций, несколько подлиннее. Увидеть его можно здесь. На момент написания статьи это 185 строк, включая пустые строки и комментарии. Ну и написаны эти 185 строк за пару-тройку подходов суммарной длительностью вряд ли более часа.


Мне такой результат понравился и задача оказалась интересной. В практическом плане быстро был получен нужный мне вспомогательный инструмент. И в плане дальнейшего развития RESTinio появились кое-какие мысли.


В общем, если кто-то еще не пробовал RESTinio, то я приглашаю попробовать. Сам проект живет на BitBucket, есть зеркало на GitHub. Задать вопрос или высказать свои предложения можно в Google-группе или прямо здесь, в комментариях.

Источник: https://habr.com/ru/post/462349/


Интересные статьи

Интересные статьи

Привет! В этой статье я расскажу, как сделать страницу в Atlassian Confluence с таблицей, данные в которую будут приходить из REST запроса. Мы сделаем страницу в Confluence с та...
С утра ни руку поднять, ни строчку кода написать. Нет ни аппетита, ни настроения, ни возможности получать удовольствие от того, что раньше радовало. Да, ангедония, сэр, она самая. Плюс самооценка...
Введение Очень часто, как и в точных науках (физика, химия), так и в прочих областях (экономика, социология, маркетинг и пр.) при работе с разного рода экспериментально полученными зависимостями...
На «Пикабу» появилась новая функция: посты-ответы — фича для тех, кому «постом навеяло» написать собственную статью. Это отличная функция, важность которой выходит далеко за пределы «Пикабу» и ка...
Предыстория Год назад решил я создать телеграм бот для того, чтобы поиграть в достаточно популярную новогоднюю игру «Тайный Санта». Вдохновился я тем, что пару лет назад мы на работе компани...