Введение в gRPC на Rust

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

В статье будет показано как создать gRPC сервер и клиент на Rust. Для большей наглядности клиент будет также Telegram ботом. В итоге будет получена следующая архитектура:



Статья является не всеобъемлющим руководством по gRPC в Rust, а скорее практическим гайдом, демонстрирующим основы и как создать приложение на основе gRPC.


Доменная модель включает данные о планетах Солнечной системы и их спутниках.


Имплементация


Существует несколько реализаций gRPC на Rust. В этом проекте был использован tonic.


Проект включает следующие модули:


  • gRPC сервер
  • gRPC клиент (также является Telegram ботом)
  • общий модуль rpc

Последний модуль содержит определение gRPC сервиса и отвечает за генерацию gRPC кода необходимого и для сервера, и для клиента.


Определение сервиса и генерация кода


Определение сервиса написано на версии proto3 Protocol Buffers и находится в .proto файле:


solar-system-info.proto


syntax = "proto3";

package solar_system_info;

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

service SolarSystemInfo {
  rpc GetPlanetsList (google.protobuf.Empty) returns (PlanetsListResponse);
  rpc GetPlanet (PlanetRequest) returns (PlanetResponse);
  rpc GetPlanets (google.protobuf.Empty) returns (stream PlanetResponse);
}

message PlanetsListResponse {
  repeated string list = 1;
}

message PlanetRequest {
  string name = 1;
}

message PlanetResponse {
  Planet planet = 1;
}

message Planet {
  uint64 id = 1;
  string name = 2;
  Type type = 3;
  float meanRadius = 4;
  float mass = 5;
  repeated Satellite satellites = 6;
  bytes image = 7;
}

enum Type {
  TERRESTRIAL_PLANET = 0;
  GAS_GIANT = 1;
  ICE_GIANT = 2;
  DWARF_PLANET = 3;
}

message Satellite {
  uint64 id = 1;
  string name = 2;
  google.protobuf.Timestamp first_spacecraft_landing_date = 3;
}

Здесь определены простые (unary) RPC (GetPlanetsList и GetPlanet), server-side streaming RPC (GetPlanets) и структуры для передачи необходимых данных. Структуры содержат поля как некоторых обычных типов (uint64, string, etc.), так и:


  • перечисление (Planet.type)
  • список (Planet.satellites)
  • бинарные данные (Planet.image)
  • тип date/timestamp (Satellite.first_spacecraft_landing_date)

Для настройки генерации серверного и клиентского gRPC кода сначала добавим следующие зависимости:


Cargo.toml


[package]
name = "solar-system-info-rpc"
version = "0.1.0"
edition = "2018"

[dependencies]
tonic = "0.4.2" # Rust gRPC implementation
prost = "0.7.0" # Rust Protocol Buffers implementation
prost-types = "0.7.0" # Contains definitions of Protocol Buffers well-known types

[build-dependencies]
tonic-build = "0.4.2"

Библиотека prost-types позволяет использовать некоторые из well-known типов Protobuf, такие как Empty и Timestamp.


В корне модуля должно быть расположено следующее:


build.rs


fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::compile_protos("proto/solar-system-info/solar-system-info.proto")?;
    Ok(())
}

Создадим модуль, который будет содержать сгенерированный код и будет использован серверным и клиентским приложениями:


lib.rs


pub mod solar_system_info {
    tonic::include_proto!("solar_system_info");
}

После запуска сервера или клиента вы можете найти сгенерированный код в файле /target/debug/build/solar-system-info-rpc/out/solar_system_info.rs. Например, для создания сервера вам нужно будет имплементировать сгенерированный трейт SolarSystemInfo:


Сгенерированный трейт SolarSystemInfo


#[doc = r" Generated server implementations."]
pub mod solar_system_info_server {
    #![allow(unused_variables, dead_code, missing_docs)]
    use tonic::codegen::*;
    #[doc = "Generated trait containing gRPC methods that should be implemented for use with SolarSystemInfoServer."]
    #[async_trait]
    pub trait SolarSystemInfo: Send + Sync + 'static {
        async fn get_planets_list(
            &self,
            request: tonic::Request<()>,
        ) -> Result<tonic::Response<super::PlanetsListResponse>, tonic::Status>;
        async fn get_planet(
            &self,
            request: tonic::Request<super::PlanetRequest>,
        ) -> Result<tonic::Response<super::PlanetResponse>, tonic::Status>;
        #[doc = "Server streaming response type for the GetPlanets method."]
        type GetPlanetsStream: futures_core::Stream<Item = Result<super::PlanetResponse, tonic::Status>>
            + Send
            + Sync
            + 'static;
        async fn get_planets(
            &self,
            request: tonic::Request<()>,
        ) -> Result<tonic::Response<Self::GetPlanetsStream>, tonic::Status>;
    }
    #[derive(Debug)]
    pub struct SolarSystemInfoServer<T: SolarSystemInfo> {
        inner: _Inner<T>,
    }
}

Сгенерированные структуры, используемые функцией get_planet, выглядят так:


Сгенерированные структуры для функции get_planet


#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PlanetRequest {
    #[prost(string, tag = "1")]
    pub name: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PlanetResponse {
    #[prost(message, optional, tag = "1")]
    pub planet: ::core::option::Option<Planet>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Planet {
    #[prost(uint64, tag = "1")]
    pub id: u64,
    #[prost(string, tag = "2")]
    pub name: ::prost::alloc::string::String,
    #[prost(enumeration = "Type", tag = "3")]
    pub r#type: i32,
    #[prost(float, tag = "4")]
    pub mean_radius: f32,
    #[prost(float, tag = "5")]
    pub mass: f32,
    #[prost(message, repeated, tag = "6")]
    pub satellites: ::prost::alloc::vec::Vec<Satellite>,
    #[prost(bytes = "vec", tag = "7")]
    pub image: ::prost::alloc::vec::Vec<u8>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Satellite {
    #[prost(uint64, tag = "1")]
    pub id: u64,
    #[prost(string, tag = "2")]
    pub name: ::prost::alloc::string::String,
    #[prost(message, optional, tag = "3")]
    pub first_spacecraft_landing_date: ::core::option::Option<::prost_types::Timestamp>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum Type {
    TerrestrialPlanet = 0,
    GasGiant = 1,
    IceGiant = 2,
    DwarfPlanet = 3,
}

gRPC сервер


Функция main сервера представлена ниже:


Функция main


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    dotenv().ok();
    env_logger::init();

    info!("Starting Solar System info server");

    let addr = std::env::var("GRPC_SERVER_ADDRESS")?.parse()?;

    let pool = create_connection_pool();
    run_migrations(&pool);

    let solar_system_info = SolarSystemInfoService { pool };
    let svc = SolarSystemInfoServer::new(solar_system_info);

    Server::builder().add_service(svc).serve(addr).await?;

    Ok(())
}

Имплементация трейта SolarSystemInfo (был показан в предыдущем разделе) выглядит так:


Имплементация gRPC сервера


struct SolarSystemInfoService {
    pool: PgPool,
}

#[tonic::async_trait]
impl SolarSystemInfo for SolarSystemInfoService {
    type GetPlanetsStream =
        Pin<Box<dyn Stream<Item = Result<PlanetResponse, Status>> + Send + Sync + 'static>>;

    async fn get_planets_list(
        &self,
        request: Request<()>,
    ) -> Result<Response<PlanetsListResponse>, Status> {
        debug!("Got a request: {:?}", request);

        let names_of_planets = persistence::repository::get_names(&get_connection(&self.pool))
            .expect("Can't get names of the planets");

        let reply = PlanetsListResponse {
            list: names_of_planets,
        };

        Ok(Response::new(reply))
    }

    async fn get_planets(
        &self,
        request: Request<()>,
    ) -> Result<Response<Self::GetPlanetsStream>, Status> {
        debug!("Got a request: {:?}", request);

        let (tx, rx) = mpsc::channel(4);

        let planets: Vec<Planet> = persistence::repository::get_all(&get_connection(&self.pool))
            .expect("Can't load planets")
            .into_iter()
            .map(|p| {
                PlanetWrapper {
                    planet: p.0,
                    satellites: p.1,
                }
                .into()
            })
            .collect();

        tokio::spawn(async move {
            let mut stream = tokio_stream::iter(&planets);

            while let Some(planet) = stream.next().await {
                tx.send(Ok(PlanetResponse {
                    planet: Some(planet.clone()),
                }))
                .await
                .unwrap();
            }
        });

        Ok(Response::new(Box::pin(
            tokio_stream::wrappers::ReceiverStream::new(rx),
        )))
    }

    async fn get_planet(
        &self,
        request: Request<PlanetRequest>,
    ) -> Result<Response<PlanetResponse>, Status> {
        debug!("Got a request: {:?}", request);

        let planet_name = request.into_inner().name;

        let planet =
            persistence::repository::get_by_name(&planet_name, &get_connection(&self.pool));

        match planet {
            Ok(planet) => {
                let planet = PlanetWrapper {
                    planet: planet.0,
                    satellites: planet.1,
                }
                .into();

                let reply = PlanetResponse {
                    planet: Some(planet),
                };

                Ok(Response::new(reply))
            }
            Err(e) => {
                error!(
                    "There was an error while getting a planet {}: {}",
                    &planet_name, e
                );
                match e {
                    Error::NotFound => Err(Status::not_found(format!(
                        "Planet with name {} not found",
                        &planet_name
                    ))),
                    _ => Err(Status::unknown(format!(
                        "There was an error while getting a planet {}: {}",
                        &planet_name, e
                    ))),
                }
            }
        }
    }
}

Здесь определена кастомная SolarSystemInfoService структура, которая имеет доступ к БД с помощью Diesel ORM.


Напомню, что get_planets_list и get_planet являются примерами простых RPC, а get_planets — server-side streaming RPC.


Изображения планет включаются в бинарник приложения во время компиляции с помощью библиотеки rust_embed (при разработке они загружаются из файловой системы).


gRPC клиент


gRPC клиент в модуле bot создаётся так:


Создание gRPC клиента


async fn create_grpc_client() -> SolarSystemInfoClient<tonic::transport::Channel> {
    let channel = tonic::transport::Channel::from_static(&GRPC_SERVER_ADDRESS)
        .connect()
        .await
        .expect("Can't create a channel");

    SolarSystemInfoClient::new(channel)
}

Далее он может быть использован так:


Использование gRPC клиента


let response = get_planets_list(grpc_client).await?;

Telegram бот


Как было отмечено ранее, для большей наглядности gRPC клиент является также и Telegram ботом. Для имплементации бота использована библиотека teloxide.


Перейдём сразу к main.rs:


main.rs


#[tokio::main]
async fn main() {
    dotenv().ok();
    teloxide::enable_logging!();
    log::info!("Starting Solar System info bot");

    let api_url = std::env::var("TELEGRAM_API_URL").expect("Can't get Telegram API URL");
    let api_url = Url::parse(&api_url).expect("Can't parse Telegram API URL");

    let bot = Bot::from_env()
        .set_api_url(api_url)
        .parse_mode(Html)
        .auto_send();

    let bot = Arc::new(bot);

    let grpc_client = create_grpc_client().await;

    teloxide::commands_repl(bot, "solar-system-info-bot", move |cx, command| {
        answer(cx, command, grpc_client.clone())
    })
    .await;
}

Для упрощения настройки SSL/TLS в проект включён модуль nginx. Он действует как forward proxy, который получает HTTP запросы от бота и перенаправляет их на серверы Telegram API.


Запуск и тестирование


Вы можете запустить проект локально двумя способами:


  • используя Docker Compose (docker-compose.yml):
    docker-compose up
  • без Docker
    Запустите gRPC сервер и клиент с помощью cargo run

Запросы к серверу можно выполнять используя какой-либо gRPC клиент (например, BloomRPC):



или делать это косвенно с помощью Telegram бота:



Соответствие между командами бота и RPC следующее:


  • /listGetPlanetsList
  • /planetsGetPlanets
  • /planetGetPlanet

Для тестирования приложения с помощью бота вам нужен Telegram аккаунт и собственный бот (введение в эту тему здесь). В зависимости от выбранного способа запуска, токен бота должен быть указан здесь или здесь.


CI/CD


CI/CD сконфигурировано с использованием GitHub Actions (workflow), который собирает Docker образы gRPC сервера и клиента (то есть Telegram бота) и разворачивает их на Google Cloud Platform.


Бота можно протестировать здесь.


Заключение


В статье я показал как создать gRPC сервер и клиент на Rust и привёл пример использования клиента как источника данных для Telegram бота. Не стесняйтесь написать мне, если нашли какие-либо ошибки в статье или исходном коде. Спасибо за внимание!


Полезные ссылки


  • Гайд по Protocol Buffers
  • Введение в gRPC
  • Tonic
Источник: https://habr.com/ru/post/557600/


Интересные статьи

Интересные статьи

Команда Rust рада сообщить о выпуске новой версии, 1.49.0. Rust — это язык программирования, позволяющий каждому создавать надёжное и эффективное программное обеспечение. Если вы установ...
Каждый раз рассказывая свой доклад про архитекторов качества я слышу один, самый популярный вопрос: "С чего начать?" - Хороший вопрос! Но прежде чем на него ответить, я п...
Целевая аудитория Вы разработчик, который хочет повернуть свою карьеру в сторону более совершенной модели DevOps? Вы являетесь классическим Ops-инженером и хотели бы получить представление о то...
Вообще-то смотреть какого цвета потроха у Rust я не собирался. Ковырнул хобби-проект на Go, пошел на GitHub посмотреть состояние fasthttp: развивается ли? Ну хотя бы поддерживается? Вспрокрастину...
В прошлой части цикла «Введение в SSD» мы рассказали про историю появления дисков. Вторая часть расскажет про интерфейсы взаимодействия с накопителями. Общение между процессором и перифери...