Интеграция Axum с S3-хранилищем

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Недавно столкнулся с проблемой подружить веб-фреймворк Axum и библиотеку rust-s3. Собственно, задача сделать 2 эндпойнта:

  • Загрузка файла в хранилище и получение ссылки

  • Скачка файла из хранилища по ссылке

Разумеется, без временных файлов и без удержания целиком всех данных файла в памяти.

Так как для работы с S3 нужны некоторые служебные объекты (настройки доступа к конкретному bucket), вынесем непосредственную работу в структуру UploadService:

#[derive(Clone)]
pub struct UploadService {
	bucket: Arc<s3::Bucket>
}

Для внедрения зависимости (DI) в обработчик эндпойнта, нам необходимо, чтобы наша структура реализовывала трейт Clone. Так как сервис будет клонироваться на каждый запрос, обернём s3::Bucket в Arc, чтобы клонирование было максимально дешёвым.

Теперь реализуем конструктор экземпляра сервиса:

use s3::{Bucket, Region};
use s3::creds::Credentials;

...

impl UploadService {
	pub fn new() -> Self {
		let bucket_name = std::env::var("UPLOAD_BUCKET_NAME")
			.expect("Expected UPLOAD_BUCKET_NAME environment variable");
		let region = Region::Custom {
			region: std::env::var("UPLOAD_BUCKET_REGION")
				.expect("Expected UPLOAD_BUCKET_REGION environment variable"),
			endpoint: std::env::var("UPLOAD_BUCKET_ENDPOINT")
				.expect("Expected UPLOAD_BUCKET_ENDPOINT environment variable")
		};
		let credentials = Credentials::new(
			Some(
				&std::env::var("UPLOAD_BUCKET_ACCESS_KEY")
					.expect("Expected UPLOAD_BUCKET_ACCESS_KEY environment variable")
			),
			Some(
				&std::env::var("UPLOAD_BUCKET_SECRET_KEY")
					.expect("Expected UPLOAD_BUCKET_SECRET_KEY environment variable")
			),
			None,
			None,
			None
		).unwrap();

		let bucket = Bucket::new(&bucket_name, region, credentials).unwrap()
			.with_path_style();
		
		Self {
			bucket: Arc::new(bucket)
		}
	}

...

Сервис конфигурируется с помощью переменных окружения UPLOAD_BUCKET_NAMEUPLOAD_BUCKET_REGIONUPLOAD_BUCKET_ACCESS_KEYUPLOAD_BUCKET_SECRET_KEY и UPLOAD_BUCKET_ENDPOINT. Последний параметр необходим так как я использую не Amazon S3, а другого S3-совместимого провайдера (Scaleway). При использовании Amazon S3 можно явно задать нужный регион с помощью одного из значений из перечисления s3::Region (например, s3::Region::UsWest1), либо воспользоваться s3::Region::from_str для парсинга региона из строки типа us-west-1. Кстати, в наборе перечислений региона помимо стандартных регионов Amazon есть ещё Digital Ocean, Wasabi и Yandex.

Теперь самое сложное – функция загрузки файла в хранилище:

use std::sync::{Arc, Mutex};
use std::path::Path;
use std::ffi::OsStr;
use axum::http::StatusCode;
use axum::extract::multipart::Field;
use async_hash::{Sha256, Digest};
use async_compat::CompatExt;
use futures::TryStreamExt;
use uuid::Uuid;

...

	pub async fn upload<'a>(&self, field: Field<'a>) -> Result<String, StatusCode> {
		let orig_filename = field.file_name()
			.unwrap_or("file")
			.to_owned();
		let mimetype = field.content_type()
			.unwrap_or("application/octet-stream")
			.to_owned();
		let digest = Arc::new(Mutex::new(Sha256::new()));
		let mut reader = field
			.map_ok(|chunk| {
				if let Ok(mut digest) = digest.lock() {
					digest.update(&chunk);
				}
				chunk
			})
			.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
			.into_async_read()
			.compat();
		let tmp_filename = format!("tmp/{}.bin", Uuid::new_v4());
		self.bucket.put_object_stream_with_content_type(
			&mut reader,
			&tmp_filename,
			&mimetype
		)
			.await
			.map_err(|err| {
				log::error!("S3 upload error: {:?}", err);
				StatusCode::INTERNAL_SERVER_ERROR
			})?;
		drop(reader); // Release digest borrow
		let mut result = Err(StatusCode::INTERNAL_SERVER_ERROR);
		if let Some(digest) = Arc::into_inner(digest).and_then(|m| m.into_inner().ok()) {
			let digest = hex::encode(digest.finalize());
			let ext = Path::new(&orig_filename).extension().and_then(OsStr::to_str);
			let mut filename = if let Some(ext) = ext {
				format!("{}.{}", digest, ext)
			} else {
				digest
			};
			filename.make_ascii_lowercase();
			match self.bucket.copy_object_internal(&tmp_filename, &filename).await {
				Ok(_) => result = Ok(format!("/uploads/{}", &filename)),
				Err(err) => log::error!("S3 copy error: {:?}", err)
			}
		}
		if let Err(err) = self.bucket.delete_object(&tmp_filename).await {
			log::error!("S3 delete error: {:?}", err);
		}
		result
	}

...

Функция принимает одно поле из multipart/form-data запроса (обработка запроса будет рассмотрена ниже), определяет исходное имя файла и mime-тип (если эти данные отсутствуют, используется “file” и “application/octet-stream” в качестве значений по умолчанию). Затем данные поля превращаются в AsyncRead с помощью библиотеки async-compat. При этом наш читатель потока по мере чтения потока вычисляет его SHA256 хеш (пригодится в будущем).

Теперь мы можем загрузить файл в S3-хранилище под временным именем “tmp/<UUID>.bin” (UUID генерируется случайным образом). Если в этот момент возникает ошибка, функция возвращает код Internal Server Error.

Мы имеем файл в S3-хранилище и посчитанный SHA256 от его данных. Теперь можно переименовать файл в его окончательное имя (я хочу использовать SHA256 в качестве имени файла, чтобы одинаковые файлы не дублировались в хранилище). Для этого я беру HEX-представление SHA256 и приписываю расширение файла взятое из оригинального имени (если оно там было). Результат приводится к нижнему регистру (на случай если расширение файла было не в нижнем регистре) и далее мы выполняем копирование S3-объекта (так как API S3 не имеет функции переименования). Если копирование успешно, то у нас получается результирующий URL-файла.

Наконец, можно удалить временный объект из S3. Это происходит в любом случае – и если копирование было успешным, и если нет.

Последняя функция нашего сервиса – отдача файла по ссылке (теоретически это можно делегировать веб-серверу, но как минимум удобно иметь эту функцию при локальной разработке, как максимум нам может требоваться реализовать какую-нибудь дополнительную бизнес-логику вроде проверки прав доступа к файлу):

use axum::response::IntoResponse;
use axum::body::StreamBody;
use s3::error::S3Error;

...

	pub async fn download(
		&self,
		filename: &str
	) -> Result<impl IntoResponse, StatusCode> {
		let stream = self.bucket.get_object_stream(filename)
			.await
			.map_err(|err| match err {
				S3Error::HttpFailWithBody(status_code, body) => match status_code {
					404 => StatusCode::NOT_FOUND,
					_ => {
						log::error!(
							"S3 download HTTP error with code={} and body={:?}",
							status_code,
							body
						);
						StatusCode::INTERNAL_SERVER_ERROR
					}
				}
				err => {
					log::error!("S3 download error: {:?}", err);
					StatusCode::INTERNAL_SERVER_ERROR
				}
			})?;
		Ok(StreamBody::from(stream.bytes))
	}
}

Здесь всё просто – получаем стрим S3-объекта, маппим ошибку отсутствия файла на 404-ую ошибку Axum, а остальные ошибки на 500-ую, возвращаем StreamBody.

Остаётся реализовать обработчики самих эндпойнтов:

use axum::{Extension, Json};
use axum::extract::{Multipart, Path};
use axum::http::{HeaderMap, HeaderValue, StatusCode};
use axum::http::header::CACHE_CONTROL;
use axum::response::IntoResponse;

#[derive(Debug, serde::Serialize)]
pub struct UploadResponse {
	pub url: String
}

pub async fn upload_file(
	Extension(upload_service): Extension<UploadService>,
	mut multipart: Multipart
) -> Result<impl IntoResponse, StatusCode> {
	while let Some(field) = multipart.next_field().await.map_err(|_|
		StatusCode::INTERNAL_SERVER_ERROR
	)? {
		if let Some("upload") = field.name() {
			let url = upload_service.upload(field).await?;
			return Ok(Json(UploadResponse { url }));
		}
	}
	Err(StatusCode::BAD_REQUEST)
}

pub async fn download_file(
	Path(path): Path<String>,
	Extension(upload_service): Extension<UploadService>
) -> Result<impl IntoResponse, StatusCode> {
	let body = upload_service.download(&path).await?;
	let headers = HeaderMap::from_iter([
		(CACHE_CONTROL, HeaderValue::from_str("max-age=31536000").unwrap()) // One year
	]);
	Ok((headers, body))
}

Обработчик загрузки загружает по одному файлу за раз, при этом имя поля файла в отправленной форме ожидается “upload”. Обработчик скачки файла выставляет срок жизни файла в кеше один год, потому что изменения файла не предполагаются (если файл изменится, он будет иметь другой SHA256 и другое имя).

Последнее, что нам остаётся – создать роутер и запустить сервер:

use std::str::FromStr;
use axum::extract::DefaultBodyLimit;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
	env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
	let upload_service = UploadService::new();
	let router = axum::Router::new()
			.route("/uploads", axum::routing::post(upload_file))
			.route("/uploads/*path", axum::routing::get(download_file))
			.layer(Extension(upload_service))
			.layer(DefaultBodyLimit::max(8 * 1024 * 1024));
	let address = std::env::var("HOST").expect("Expected HOST environment variable");
	let port = std::env::var("PORT").expect("Expected PORT environment variable")
		.parse::<u16>().expect("PORT environment variable must be an integer");
	log::info!("Listening on http://{}:{}/", address, port);
	axum::Server::bind(
		&std::net::SocketAddr::new(
			std::net::IpAddr::from_str(&address).unwrap(),
			port
		)
	).serve(router.into_make_service()).await?;
	Ok(())
}

Экземляр UploadService передаётся через Extension (механизм DI в Axum), также может быть полезно задать DefaultBodyLimit, потому что стандартное значение 1 МБ может подходить не для всех ситуаций. Хост и порт для прослушивания получаются из соответствующих переменных окружения.

Вероятно, нам также может требоваться добавить какую-нибудь проверку авторизации в эндпойнт загрузки (а, возможно, и скачки), но это зависит от конкретной функции конкретного сервиса.

Зависимости в Cargo.toml:

[package]
name = "uploader"
version = "0.1.0"
edition = "2021"

[dependencies]
log = "0.4.20"
env_logger = "0.10.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
axum = { version = "0.6.20", features = ["multipart"] }
serde = "1.0.188"
uuid = { version = "1.4.1", features = ["v4"] }
rust-s3 = "0.34.0-rc1"
futures = "0.3.28"
async-compat = "0.2.2"
async-hash = "0.5.1"
hex = "0.4.3"

В качестве бонуса пример браузерного кода на TypeScript выполняющего загрузку файла:

interface UploadResponse {
	url: string;
}

async function uploadFile(file: Blob, filename?: string): Promise<UploadResponse | "error"> {
	const data = new FormData();
	data.append("upload", file, filename);
	const response = await fetch("/uploads", {
		method: "post",
		body: data
	});
	if (response.status >= 200 && response.status <= 299) {
		return await response.json();
	} else {
		return "error";
	}
}

Наш сервис готов.

Полный исходный код проекта на GitHub

Обсудить в моём персональном блоге в Telegram

Источник: https://habr.com/ru/articles/770332/


Интересные статьи

Интересные статьи

Описывается пример интеграции библиотеки компонентов пользовательского интерфейса Primefaces, построенной на основе фреймворка JavaServer Faces (JSF), в MVC приложение на Spring Boot.Первая частьВтора...
Состоялся релиз 2.16: мы внедрили поддержку DirectX 12 и Vulkan API, открыли Asset Store, добавили даблы и демо Марса в Community SDK, улучшили рендеринг (отражения, свечение, размытие при движении, э...
Как сделали 9 NLU ботов за 5 дней с интеграциями на чистом Low-code В данном проекте перед нами стояла задача – с помощью чат-бота разгрузить КЦ, а также оперативно и качественно отвечать на вопросы ...
Существует множество публикаций о разработке чат-ботов с использованием возможностей платформы Dialogflow. Но для выхода на решение, которое применимо на практике, просто создать бота нед...
Всем привет. В преддверии старта курсов  "iOS Developer. Basic" и "iOS Developer. Professional", публикуем заключительную часть статьи про интеграцию CI/CD для неско...