Это продолжение цикла статей про asyncio
. Начало здесь.
6. Aiohttp и другие жители асинхронного мира
Продолжаем готовить asyncio. Теперь мы уже знаем достаточно много, чтобы написать модный асинхронный микросервис. Реализуем известный архитектурный паттерн "API-шлюз". Это довольно простая штука. По запросу на свой API-интерфейс приложение собирает данные из других API, обрабатывает и возвращает результат пользователю. При этом пользователь знает только одну точку входа, а все внутренние подробности обработки от него скрыты.
В предыдущей главе мы научились запрашивать погоду у сервиса api.openweathermap.org
. Давайте его слегка импортозаместим. Вернее сказать, русифицируем. Пусть пользователь нашего приложения посылает название города на русском языке в параметрах GET-запроса (если вы не знаете что это такое, бегом читать про протокол HTTP) и получает ответ в виде json опять-таки на великом и могучем.
Мы уже освоили http-клиента библиотеки aiohttp
, с помощью которого можно обращаться к внешним API. Оказывается, в этой же библиотеке есть и все необходимое для создания полноценного http-сервера. Для начала напишем просто зеркальный прокси:
Пример 6.1
import json
from aiohttp import ClientSession, web
async def get_weather(city):
async with ClientSession() as session:
url = f'http://api.openweathermap.org/data/2.5/weather' \
f'?q={city}&APPID=2a4ff86f9aaa70041ec8e82db64abf56'
async with session.get(url) as response:
weather_json = await response.json()
try:
return weather_json["weather"][0]["main"]
except KeyError:
return 'Нет данных'
async def handle(request):
city = request.rel_url.query['city']
weather = await get_weather(city)
result = {'city': city, 'weather': weather}
return web.Response(text=json.dumps(result, ensure_ascii=False))
if __name__ == '__main__':
app = web.Application()
app.add_routes([web.get('/weather', handle)])
web.run_app(app)
В асинхронной функции get_weather
ничего нового нет, мы ее лишь слегка "облагородили", чтобы запрос погоды для несуществующего города не приводил к трагическим последствиям для всего нашего приложения. За обработку запроса отвечает функция handle
("ручка" на сленге бэкендеров). Из запроса извлекается параметр city и передается в get_weather
. Далее формируется результирующий ответ в виде json. Адрес нашего сервиса и тип запроса задается следующим образом: app.add_routes([web.get('/weather', handle)])
.
Стоп! А где же наш старый друг asyncio.run
? Не переживайте. Когда мы имеем дело с асинхронными веб-фреймворками (а aiohttp
— это именно фреймворк, хоть и супер-минималистический), вся работа по созданию и запуску задач asyncio
происходит у фреймворка "под капотом". Где-то глубоко в недрах 'web.run_app' создаются задачи, организуется бесконечный цикл и запускается в asyncio.run
. Нам, как разработчикам, теперь нет нужды беспокоится об этих низменных деталях. Приложение мирно спит в бесконечном цикле, пока не придет запрос GET на определенный URL. Как только это произойдет, отработает логика в ручке. И снова баю-бай до следующего запроса. Но если первый запрос еще не успел обработаться, как поступил следующий, фреймворк отработает его в отдельной задаче, не дожидаясь (по возможности) окончания обработки первого. В этом сама суть асинхронности.
Заходим браузером на адрес: localhost:8080/weather?city=Sochi
и получаем симпатичный json:
{"city": "Sochi", "weather": "Clouds"}
Кстати, если вы всерьез решили заняться бэкэнд-разработкой, одним браузером вам никак не обойтись. Потребуется инструмент, позволяющий залезать вглубь HTTP. Стандарт де-факто здесь Postman, но в природе есть и альтернативные решения.
Скелет нашего приложения готов, теперь начинаем наращивать на него мышцы. Воспользуемся бесплатным API-переводчиком libretranslate.de
:
Пример 6.2
import json
from aiohttp import ClientSession, web
async def get_weather(city):
async with ClientSession() as session:
url = f'http://api.openweathermap.org/data/2.5/weather' \
f'?q={city}&APPID=2a4ff86f9aaa70041ec8e82db64abf56'
async with session.get(url) as response:
weather_json = await response.json()
try:
return weather_json["weather"][0]["main"]
except KeyError:
return 'Нет данных'
async def get_translation(text, source, target):
async with ClientSession() as session:
url = 'https://libretranslate.de/translate'
data = {'q': text, 'source': source, 'target': target, 'format': 'text'}
async with session.post(url, json=data) as response:
translate_json = await response.json()
try:
return translate_json['translatedText']
except KeyError:
return text
async def handle(request):
city_ru = request.rel_url.query['city']
city_en = await get_translation(city_ru, 'ru', 'en')
weather_en = await get_weather(city_en)
weather_ru = await get_translation(weather_en, 'en', 'ru')
result = {'city': city_ru, 'weather': weather_ru}
return web.Response(text=json.dumps(result, ensure_ascii=False))
if __name__ == '__main__':
app = web.Application()
app.add_routes([web.get('/weather', handle)])
web.run_app(app)
Теперь в ручке дважды вызывается асинхронная функция get_translation
и, вуаля:
localhost:8080/weather?city=Сочи
{"city": "Сочи", "weather": "Облака"}
Что это за микросервис без логгера? Однако использовать в насквозь асинхронном приложении привычную синхронную (а значит блокирующую) библиотеку logging
— это непрофессионально. Воспользуемся асинхронной библиотекой логгирования aiologger
:
Пример 6.3
import json
from aiohttp import ClientSession, web
from aiologger.loggers.json import JsonLogger
logger = JsonLogger.with_default_handlers(
level='DEBUG',
serializer_kwargs={'ensure_ascii': False},
)
async def get_weather(city):
async with ClientSession() as session:
url = f'http://api.openweathermap.org/data/2.5/weather' \
f'?q={city}&APPID=2a4ff86f9aaa70041ec8e82db64abf56'
async with session.get(url) as response:
weather_json = await response.json()
try:
return weather_json["weather"][0]["main"]
except KeyError:
logger.error(f'Невозможно получить погоду для города: {city}')
return 'Нет данных'
async def get_translation(text, source, target):
await logger.info(f'Поступил запрос на на перевод слова: {text}')
async with ClientSession() as session:
url = 'https://libretranslate.de/translate'
data = {'q': text, 'source': source, 'target': target, 'format': 'text'}
async with session.post(url, json=data) as response:
translate_json = await response.json()
try:
return translate_json['translatedText']
except KeyError:
logger.error(f'Невозможно получить перевод для слова: {text}')
return text
async def handle(request):
city_ru = request.rel_url.query['city']
await logger.info(f'Поступил запрос на город: {city_ru}')
city_en = await get_translation(city_ru, 'ru', 'en')
weather_en = await get_weather(city_en)
weather_ru = await get_translation(weather_en, 'en', 'ru')
result = {'city': city_ru, 'weather': weather_ru}
return web.Response(text=json.dumps(result, ensure_ascii=False))
if __name__ == '__main__':
app = web.Application()
app.add_routes([web.get('/weather', handle)])
web.run_app(app)
Обратите внимание, асинхронный логгер ни на миллисекунду не задержит работу нашего приложения в целом. Он будет использовать паузы, пока мы ожидаем запроса от пользователя или ответа от внешнего сервиса.
Ну а теперь вишенка на торте — асинхронный доступ к базе данных. Предположим, в процессе работы нашего приложения нам надо что-то писать в БД, ну, например, сохранять поступившие запросы (ничего более умного мне в голову не пришло). Самая простая БД в мире, как известно, — это SQLite. И для нее, к счастью, есть асинхронный драйвер aiosqlite
. Пробуем:
Пример 6.4
import json
import aiosqlite
import asyncio
from aiohttp import ClientSession, web
from aiologger.loggers.json import JsonLogger
from datetime import datetime
logger = JsonLogger.with_default_handlers(
level='DEBUG',
serializer_kwargs={'ensure_ascii': False},
)
async def create_table():
async with aiosqlite.connect('weather.db') as db:
await db.execute('CREATE TABLE IF NOT EXISTS requests '
'(date text, city text, weather text)')
await db.commit()
async def save_to_db(city, weather):
async with aiosqlite.connect('weather.db') as db:
sql = f'INSERT INTO requests VALUES ("{datetime.now()}", "{city}", "{weather}")'
await db.execute(sql)
await db.commit()
async def get_weather(city):
async with ClientSession() as session:
url = f'http://api.openweathermap.org/data/2.5/weather' \
f'?q={city}&APPID=2a4ff86f9aaa70041ec8e82db64abf56'
async with session.get(url) as response:
weather_json = await response.json()
try:
return weather_json["weather"][0]["main"]
except KeyError:
logger.error(f'Невозможно получить погоду для города: {city}')
return 'Нет данных'
async def get_translation(text, source, target):
await logger.info(f'Поступил запрос на на перевод слова: {text}')
async with ClientSession() as session:
url = 'https://libretranslate.de/translate'
data = {'q': text, 'source': source, 'target': target, 'format': 'text'}
async with session.post(url, json=data) as response:
translate_json = await response.json()
try:
return translate_json['translatedText']
except KeyError:
logger.error(f'Невозможно получить перевод для слова: {text}')
return text
async def handle(request):
city_ru = request.rel_url.query['city']
await logger.info(f'Поступил запрос на город: {city_ru}')
city_en = await get_translation(city_ru, 'ru', 'en')
weather_en = await get_weather(city_en)
weather_ru = await get_translation(weather_en, 'en', 'ru')
result = {'city': city_ru, 'weather': weather_ru}
await save_to_db(city_ru, weather_ru)
return web.Response(text=json.dumps(result, ensure_ascii=False))
if __name__ == '__main__':
asyncio.run(create_table())
app = web.Application()
app.add_routes([web.get('/weather', handle)])
web.run_app(app)
Можете заглянуть внутрь созданного на лету файла weather.db
(только используйте не текстовый просмотрщик, а какую-нибудь приблуду для работы с БД, например, DBeaver). Для каждого запроса создается соответствующая запись в таблице requests
. И снова никаких блокировок, мы ведь живем в асинхронном мире.
Смотрите-ка! Неожиданно появился наш забытый друг asyncio.run
, к чему бы это? А это к тому, что операция создания таблицы requests
(если таковой еще нет) должна выполняться лишь однажды при старте приложения, еще до запуска бесконечного цикла внутри web.run_app
. Но функция-то create_table
у нас асинхронная, так просто ее не запустишь (попробуйте сами, если не верите). Вот и приходится единожды запускать ее "вручную" при помощи asyncio.run
и лишь затем передавать бразды правления web.run_app
.
В заключение этого раздела хочу вас поздравить. Теперь вы имеете в руках все необходимое для создания собственных асинхронных веб-приложений. Неважно какой фреймфорк вы будете использовать: FastAPI
, Tornado
, Falcon
или какой-нибудь еще. Принцип останется тот же самый как в старом добром aiohttp
: создаем ручку и в ней нанизываем "шашлык" из вызовов асинхронных функций. Главное за чем необходимо следить — это чтобы в эти функции не затесалось что-нибудь блокирующее из скучного пыльного синхронного мира.
На этом мы временно прощаемся.
Продолжение следует...