Начало работы с MongoDB и Redis на Rust

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

В этой статье будет показано как создать Rust бэкэнд, который использует MongoDB, документо-ориентированную БД, для хранения данных и Redis для кэширования, ограничения количества HTTP запросов и нотификаций пользователя. Для большей наглядности созданное приложение также будет предоставлять REST API. В итоге будет получена следующая архитектура:


architecture


MongoDB является хранилищем, в то время как Redis используется для следующего:


  • кэш (включая изображения)
  • ограничение количества HTTP запросов
  • нотификации с использованием паттерна publish-subscribe

Обратите внимание, что указанные сценарии использования не означают, что для похожего сценария вам нужно использовать подход, описанный в статье. Примеры в первую очередь имеют целью познакомить вас с MongoDB и Redis.


Проект реализован с помощью MongoDB Rust driver и крейта redis-rs.


Вы сможете протестировать REST API приложения, поскольку оно развёрнуто на Google Cloud Platform.


Доменная модель включает данные о планетах Солнечной системы и их спутниках.


Запуск MongoDB и Redis


Этот раздел не требует навыков программирования на Rust и может быть использован вне зависимости от языка программирования приложения.


Обе тулы могут быть запущены как Docker контейнер:


docker-compose.yml


version: '3.8'

services:

  ...

  mongodb:
    image: mongo
    container_name: mongodb
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: $MONGODB_USERNAME
      MONGO_INITDB_ROOT_PASSWORD: $MONGODB_PASSWORD
      MONGO_INITDB_DATABASE: solar_system_info
    ports:
      - 27017:27017

  mongodb-seed:
    image: mongo
    container_name: mongodb-seed
    depends_on:
      - mongodb
    volumes:
      - ./mongodb-init:/mongodb-init
    links:
      - mongodb
    command:
      mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json

  redis:
    image: redis:alpine
    container_name: redis
    ports:
      - 6379:6379

Назначение контейнера mongodb_seed будет показано далее.


Вы можете получить доступ к mongo shell с помощью следующей команды:


docker exec -it mongodb mongo --username admin --password password


(где mongodb — название Docker контейнера, mongo — shell)


Теперь вы можете выполнять команды, например:


  • получить список баз данных с помощью show dbs
  • получить все данные в определённой базе данных:
    • use solar_system_info
    • show collections
    • db.planets.find()

Доступ к Redis CLI может быть получен с помощью следующей команды:


docker exec -it redis redis-cli


Простейший пример команды выглядит так:


Пример команды Redis


> set mykey somevalue
OK
> get mykey
"somevalue"

Для получения списка ключей используйте команду keys *.


Вы можете найти больше примеров команд для Redis CLI в этом гайде.


Инициализация данных


MongoDB инициализируется данными в формате JSON с использованием контейнера mongodb_seed и команды mongoimport:


docker-compose.yml


mongodb-seed:
  image: mongo
  container_name: mongodb-seed
  depends_on:
    - mongodb
  volumes:
    - ./mongodb-init:/mongodb-init
  links:
    - mongodb
  command:
    mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json

Также инициализация БД может быть выполнена с использованием JavaScript файла.


Приложение также работает с изображениями планет. Первоначально я собирался хранить их в MongoDB; это может быть сделано с помощью следующего скрипта:


mongofiles --host mongodb --db solar_system_info --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD put /mongodb-init/images/*.jpg


Однако вскоре обнаружилось, что изображения не могут быть получены из БД из-за отсутствия поддержки GridFS в MongoDB Rust Driver (открытая задача). Поэтому для простоты используется крейт rust_embed, который позволяет включить изображения в бинарный исполняемый файл приложения во время компиляции (при разработке изображения загружаются из файловой системы). (Изображения также возможно хранить отдельно от приложения; папка images должна быть смонтирована как volume в определении Docker Compose сервиса)


Далее будет показано как использовать MongoDB и Redis в Rust приложении.


Имплементация приложения


Зависимости


Приложение имплементировано с помощью:


  • MongoDB Rust driver
  • redis-rs
  • Actix Web

Cargo.toml


[package]
name = "mongodb-redis"
version = "0.1.0"
edition = "2018"

[dependencies]
mongodb = "2.0.0-beta.1"
redis = { version = "0.20.2", features = ["tokio-comp", "connection-manager"] }
actix-web = "4.0.0-beta.7"
tokio = "1.7.1"
tokio-stream = "0.1.6"
chrono = { version = "0.4.19", features = ["serde"] }
serde = "1.0.126"
serde_json = "1.0.64"
dotenv = "0.15.0"
derive_more = "0.99.14"
log = "0.4.14"
env_logger = "0.8.4"
rust-embed = "5.9.0"
mime = "0.3.16"

Структура проекта


Структура проекта


├───images
│
├───mongodb-init
│       init.json
│
└───src
        db.rs
        dto.rs
        errors.rs
        handlers.rs
        index.html
        main.rs
        model.rs
        redis.rs
        services.rs

Функция main


Функция main


#[actix_web::main]
async fn main() -> std::io::Result<()> {
    dotenv::from_filename(".env.local").ok();
    env_logger::init();

    info!("Starting MongoDB & Redis demo server");

    let mongodb_uri = env::var("MONGODB_URI").expect("MONGODB_URI env var should be specified");
    let mongodb_client = MongoDbClient::new(mongodb_uri).await;

    let redis_uri = env::var("REDIS_URI").expect("REDIS_URI env var should be specified");
    let redis_client = redis::create_client(redis_uri)
        .await
        .expect("Can't create Redis client");
    let redis_connection_manager = redis_client
        .get_tokio_connection_manager()
        .await
        .expect("Can't create Redis connection manager");

    let planet_service = Arc::new(PlanetService::new(
        mongodb_client,
        redis_client,
        redis_connection_manager.clone(),
    ));
    let rate_limiting_service = Arc::new(RateLimitingService::new(redis_connection_manager));

    ...
}

Здесь определён кастомный MongoDbClient, клиент Redis и менеджер соединений Redis.


Работа с MongoDB


Начнём с функции, возвращающей список планет, хранящихся в БД, и использующей асинхронный API:


Функция, возвращающая список планет


const DB_NAME: &str = "solar_system_info";
const COLLECTION_NAME: &str = "planets";

pub async fn get_planets(
    &self,
    planet_type: Option<PlanetType>,
) -> Result<Vec<Planet>, CustomError> {
    let filter = planet_type.map(|pt| {
        doc! { "type": pt.to_string() }
    });

    let mut planets = self.get_planets_collection().find(filter, None).await?;

    let mut result: Vec<Planet> = Vec::new();
    while let Some(planet) = planets.next().await {
        result.push(planet?);
    }

    Ok(result)
}

fn get_planets_collection(&self) -> Collection<Planet> {
    self.client
        .database(DB_NAME)
        .collection::<Planet>(COLLECTION_NAME)
}

get_planets также включает пример фильтрации документов MongoDB по определённому критерию.


Модель данных выглядит так:


Модель данных


#[derive(Serialize, Deserialize, Debug)]
pub struct Planet {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub id: Option<ObjectId>,
    pub name: String,
    pub r#type: PlanetType,
    pub mean_radius: f32,
    pub satellites: Option<Vec<Satellite>>,
}

#[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Debug)]
pub enum PlanetType {
    TerrestrialPlanet,
    GasGiant,
    IceGiant,
    DwarfPlanet,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Satellite {
    pub name: String,
    pub first_spacecraft_landing_date: Option<mongodb::bson::DateTime>,
}

Структуры содержат поля "обычных" типов (string, f32), а также:


  • ObjectId (Planet.id)
  • список (Planet.satellites)
  • дата/timestamp (Satellite.first_spacecraft_landing_date)
  • перечисление (Planet.type)
  • nullable поля (Planet.id, Planet.satellites)

Проект также включает примеры получения, создания, обновления и удаления MongoDB документов. Я не буду подробно останавливаться на этих функциях ввиду очевидности кода их имплементации. Вы можете протестировать эти функции используя REST API:


  • получение всего списка


    GET http://localhost:9000/planets


    Пример с фильтрацией:


    GET http://localhost:9000/planets?type=IceGiant


  • создание


    POST http://localhost:9000/planets


    Body:


    {
        "name": "Pluto",
        "type": "DwarfPlanet",
        "mean_radius": 1188,
        "satellites": null
    }

  • получение по id


    GET http://localhost:9000/{planet_id}


  • обновление


    PUT http://localhost:9000/{planet_id}


    Body:


    {
        "name": "Mercury",
        "type": "TerrestrialPlanet",
        "mean_radius": 2439.7,
        "satellites": null
    }

  • удаление


    DELETE http://localhost:9000/{planet_id}


  • получение изображения планеты


    GET http://localhost:9000/planets/{planet_id}/image


    Используйте этот метод для тестирования кэширования с помощью Redis



MongoDB документы хранятся в формате BSON.


Работа с Redis


Redis клиент создаётся следующим образом:


Создание Redis клиента


pub async fn create_client(redis_uri: String) -> Result<Client, RedisError> {
    Ok(Client::open(redis_uri)?)
}

Менеджер соединений Redis может быть создан так:


Получение менеджера соединений Redis


let redis_client = redis::create_client(redis_uri)
    .await
    .expect("Can't create Redis client");
let redis_connection_manager = redis_client
    .get_tokio_connection_manager()
    .await
    .expect("Can't create Redis connection manager");

Кэширование


Рассмотрим функцию сервисного слоя, использующуюся для получения планеты по id:


Получение планеты по id


pub async fn get_planet(&self, planet_id: &str) -> Result<Planet, CustomError> {
    let cache_key = self.get_planet_cache_key(planet_id);
    let mut con = self.redis_client.get_async_connection().await?;
    let cache_is_empty: bool = !con.exists(&cache_key).await?;

    let planet = if cache_is_empty {
        debug!("Use database to retrieve a planet by id: {}", &planet_id);
        let result: Planet = self
            .mongodb_client
            .get_planet(ObjectId::from_str(planet_id)?)
            .await?;
        let _: () = con.set(&cache_key, &result).await?;
        result
    } else {
        debug!("Use cache to retrieve a planet by id: {}", planet_id);
        let cached_value = con.get(&cache_key).await?;
        let planet_string: String = FromRedisValue::from_redis_value(&cached_value)?;
        serde_json::from_str(&planet_string)?
    };

    Ok(planet)
}

В первой ветви if-else вы видите пример как поместить пару ключ-значение в Redis используя функцию set; во второй ветви показано как получить значение из кэша по ключу. Для помещения значения в кэш вам нужно имплементировать трейт ToRedisArgs для структуры:


Имплементация трейта ToRedisArgs


impl ToRedisArgs for &Planet {
    fn write_redis_args<W>(&self, out: &mut W)
    where
        W: ?Sized + RedisWrite,
    {
        out.write_arg_fmt(serde_json::to_string(self).expect("Can't serialize Planet as string"))
    }
}

В функции get_planet используется асинхронное соединение Redis. Следующий пример демонстрирует другой подход, ConnectionManager, на примере очистки кэша с использованием функции del:


Пример очистки кэша


pub async fn update_planet(
    &self,
    planet_id: &str,
    planet: Planet,
) -> Result<Planet, CustomError> {
    let updated_planet = self
        .mongodb_client
        .update_planet(ObjectId::from_str(planet_id)?, planet)
        .await?;

    let cache_key = self.get_planet_cache_key(planet_id);
    self.redis_connection_manager.clone().del(cache_key).await?;

    Ok(updated_planet)
}

ConnectionManager может быть клонирован. Он также используется во всех оставшихся примерах использования Redis вместо Redis клиента.


Кэш изображений может быть имплементирован так же, как и кэши других типов данных (используя функции set/get):


Кэширование изображений


pub async fn get_image_of_planet(&self, planet_id: &str) -> Result<Vec<u8>, CustomError> {
    let cache_key = self.get_image_cache_key(planet_id);
    let mut redis_connection_manager = self.redis_connection_manager.clone();
    let cache_is_empty: bool = !redis_connection_manager.exists(&cache_key).await?;

    let image: Vec<u8> = if cache_is_empty {
        debug!(
            "Use database to retrieve an image of a planet by id: {}",
            &planet_id
        );
        let planet = self
            .mongodb_client
            .get_planet(ObjectId::from_str(planet_id)?)
            .await?;
        let result = crate::db::get_image_of_planet(&planet.name).await;
        let _: () = redis_connection_manager
            .set(&cache_key, result.clone())
            .await?;
        result
    } else {
        debug!(
            "Use cache to retrieve an image of a planet by id: {}",
            &planet_id
        );
        redis_connection_manager.get(&cache_key).await?
    };

    Ok(image)
}

Кэширование может быть протестировано с использованием REST API, описанного выше.


Ограничение количества HTTP запросов


Эта фича реализована в соответствии с официальным гайдом следующим образом:


Имплементация rate limiter


#[derive(Clone)]
pub struct RateLimitingService {
    redis_connection_manager: ConnectionManager,
}

impl RateLimitingService {
    pub fn new(redis_connection_manager: ConnectionManager) -> Self {
        RateLimitingService {
            redis_connection_manager,
        }
    }

    pub async fn assert_rate_limit_not_exceeded(&self, ip_addr: String) -> Result<(), CustomError> {
        let current_minute = Utc::now().minute();
        let rate_limit_key = format!("{}:{}:{}", RATE_LIMIT_KEY_PREFIX, ip_addr, current_minute);

        let (count,): (u64,) = redis::pipe()
            .atomic()
            .incr(&rate_limit_key, 1)
            .expire(&rate_limit_key, 60)
            .ignore()
            .query_async(&mut self.redis_connection_manager.clone())
            .await?;

        if count > MAX_REQUESTS_PER_MINUTE {
            Err(TooManyRequests {
                actual_count: count,
                permitted_count: MAX_REQUESTS_PER_MINUTE,
            })
        } else {
            Ok(())
        }
    }
}

Redis ключ создаётся на каждую минуту + IP адрес клиента. После каждого вызова функции assert_rate_limit_not_exceeded значение, соответствующее ключу, инкрементируется на 1. Чтобы хранилище не переполнилось из-за большого количества ранее созданных пар ключ-значение, ключ "экспайрится" через минуту.


Rate limiter может быть использован в Actix обработчике следующим образом:


Использование rate limiter


pub async fn get_planets(
    req: HttpRequest,
    web::Query(query_params): web::Query<GetPlanetsQueryParams>,
    rate_limit_service: web::Data<Arc<RateLimitingService>>,
    planet_service: web::Data<Arc<PlanetService>>,
) -> Result<HttpResponse, CustomError> {
    rate_limit_service
        .assert_rate_limit_not_exceeded(get_ip_addr(&req)?)
        .await?;

    let planets = planet_service.get_planets(query_params.r#type).await?;
    Ok(HttpResponse::Ok().json(planets.into_iter().map(PlanetDto::from).collect::<Vec<_>>()))
}

Если вызывать метод получения списка планет слишком часто, то будет получена следующая ошибка:


rate limiting


Нотификации


В этом проекте нотификации реализованы с помощью Redis Pub/Sub и Server-Sent Events для доставки сообщений пользователю.


При создании сущности публикуется событие:


Публикация события в Redis


pub async fn create_planet(&self, planet: Planet) -> Result<Planet, CustomError> {
    let planet = self.mongodb_client.create_planet(planet).await?;
    self.redis_connection_manager
        .clone()
        .publish(
            NEW_PLANETS_CHANNEL_NAME,
            serde_json::to_string(&PlanetMessage::from(&planet))?,
        )
        .await?;
    Ok(planet)
}

Подписка реализуется так:


Пример подписки в Redis


pub async fn get_new_planets_stream(
    &self,
) -> Result<Receiver<Result<Bytes, CustomError>>, CustomError> {
    let (tx, rx) = mpsc::channel::<Result<Bytes, CustomError>>(100);

    tx.send(Ok(Bytes::from("data: Connected\n\n")))
        .await
        .expect("Can't send a message to the stream");

    let mut pubsub_con = self
        .redis_client
        .get_async_connection()
        .await?
        .into_pubsub();
    pubsub_con.subscribe(NEW_PLANETS_CHANNEL_NAME).await?;

    tokio::spawn(async move {
        while let Some(msg) = pubsub_con.on_message().next().await {
            let payload = msg.get_payload().expect("Can't get payload of message");
            let payload: String = FromRedisValue::from_redis_value(&payload)
                .expect("Can't convert from Redis value");
            let msg = Bytes::from(format!("data: Planet created: {:?}\n\n", payload));
            tx.send(Ok(msg))
                .await
                .expect("Can't send a message to the stream");
        }
    });

    Ok(rx)
}

Подписка используется в Actix обработчике так:


Пример SSE handler


pub async fn sse(
    planet_service: web::Data<Arc<PlanetService>>,
) -> Result<HttpResponse, CustomError> {
    let new_planets_stream = planet_service.get_new_planets_stream().await?;
    let response_stream = tokio_stream::wrappers::ReceiverStream::new(new_planets_stream);

    Ok(HttpResponse::build(StatusCode::OK)
        .insert_header(header::ContentType(mime::TEXT_EVENT_STREAM))
        .streaming(response_stream))
}

Чтобы протестировать нотификации, вам нужно подписаться на события и сгенерировать событие. Далее приведены два подхода для этого; в обоих событие генерируется с использованием cURL:


  • подписка из браузера


    Перейдите на http://localhost:9000/, где находится HTML страница:


    sse notifications browser


  • подписка из командной строки с использованием cURL


    Используйте curl -X GET localhost:9000/events:


    sse notifications curl



Для генерации события используется следующий cURL запрос:


Запрос для тестирования нотификаций


curl -X POST -H 'Content-Type: application/json' -d '{
    \"name\": \"Pluto\",
    \"type\": \"DwarfPlanet\",
    \"mean_radius\": 1188,
    \"satellites\": null
}' localhost:9000/planets

Веб приложение


Некоторые аспекты этой темы были включены в предыдущие разделы, поэтому здесь будут освещены некоторые из оставшихся тем.


REST API обработчики


REST API обработчики определены так:


Определение REST API обработчиков


#[actix_web::main]
async fn main() -> std::io::Result<()> {
    ...

    let enable_write_handlers = env::var("ENABLE_WRITE_HANDLERS")
        .expect("ENABLE_WRITE_HANDLERS env var should be specified")
        .parse::<bool>()
        .expect("Can't parse ENABLE_WRITE_HANDLERS");

    HttpServer::new(move || {
        let mut app = App::new()
            .route("/planets", web::get().to(handlers::get_planets))
            .route("/planets/{planet_id}", web::get().to(handlers::get_planet))
            .route(
                "/planets/{planet_id}/image",
                web::get().to(handlers::get_image_of_planet),
            )
            .route("/events", web::get().to(handlers::sse))
            .route("/", web::get().to(handlers::index))
            .data(Arc::clone(&planet_service))
            .data(Arc::clone(&rate_limiting_service));

        if enable_write_handlers {
            app = app
                .route("/planets", web::post().to(handlers::create_planet))
                .route(
                    "/planets/{planet_id}",
                    web::put().to(handlers::update_planet),
                )
                .route(
                    "/planets/{planet_id}",
                    web::delete().to(handlers::delete_planet),
                );
        }

        app
    })
    .bind("0.0.0.0:9000")?
    .run()
    .await
}

Обработка ошибок


Обработка ошибок имплементирована в соответствии с документацией.


Запуск и тестирование


Локально проект может быть запущен двумя способами:


  • с использованием Docker Compose (docker-compose.yml):


    docker compose up (или docker-compose up в более старых версиях Docker)


  • без использования Docker


    Запустите приложение с помощью cargo run (в этом случае сервис mongodb-redis в docker-compose.yml должен быть отключён)



CI/CD


CI/CD сконфигурировано с помощью GitHub Actions workflow, который собирает Docker образ приложения и разворачивает его на Google Cloud Platform.


Для доступа к REST API развёрнутого приложения вы можете использовать один из доступных GET эндпоинтов, например:


GET http://demo.romankudryashov.com:9000/planets


Пишущие методы REST API недоступны на production среде.


Заключение


В этой статье я показал как начать работу с MongoDB и Redis и примеры их использования в Rust приложении. Не стесняйтесь написать мне, если нашли какие-либо ошибки в статье или исходном коде. Спасибо за внимание!


Полезные ссылки


  • Quick Start: Up and Running with Rust and MongoDB
  • https://redis.io/topics/quickstart
  • https://redis.io/topics/data-types-intro
Источник: https://habr.com/ru/post/568856/

Интересные статьи

Интересные статьи

Чехия кажется центром Европы — развитым, красивым и с достойными зарплатами. Но насколько хорошо здесь жить IT-иммигранту? И не лучше ли выбрать соседнюю Германию? Мы поговорил...
MongoDB — одна из самых популярных NoSQL/документоориентированных баз данных в мире веб-разработки, поэтому многие наши клиенты используют её в своих продуктах, в том чис...
Прим. перев.: месяц назад компания Kinvolk выпустила свой интерфейс для управления Kubernetes-кластерами. Новый Open Source-проект, пополнив уже немалочисленные ряды подобных решений, соч...
“Иногда мы платим больше всего за то, что получаем бесплатно.” — А.Эйнштейн Не так давно в MongoDB версии 4+ появилась поддержка мульти-документных транзакций. А поскольку наш про...
Я решил написать статью, а если получится — то и серию статей, чтобы поделиться своим опытом самостоятельного исследования как устройства Bare Bone x86, так и организации операционных систем. На ...