Асинхронный python без головной боли (часть 2)

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Это продолжение цикла статей про asyncio. Начало здесь.

6. Aiohttp и другие жители асинхронного мира

Продолжаем готовить asyncio. Теперь мы уже знаем достаточно много, чтобы написать модный асинхронный микросервис. Реализуем известный архитектурный паттерн "API-шлюз". Это довольно простая штука. По запросу на свой API-интерфейс приложение собирает данные из других API, обрабатывает и возвращает результат пользователю. При этом пользователь знает только одну точку входа, а все внутренние подробности обработки от него скрыты.

В предыдущей главе мы научились запрашивать погоду у сервиса api.openweathermap.org. Давайте его слегка импортозаместим. Вернее сказать, русифицируем. Пусть пользователь нашего приложения посылает название города на русском языке в параметрах GET-запроса (если вы не знаете что это такое, бегом читать про протокол HTTP) и получает ответ в виде json опять-таки на великом и могучем.

Мы уже освоили http-клиента библиотеки aiohttp, с помощью которого можно обращаться к внешним API. Оказывается, в этой же библиотеке есть и все необходимое для создания полноценного http-сервера. Для начала напишем просто зеркальный прокси:

Пример 6.1

import json
from aiohttp import ClientSession, web


async def get_weather(city):
    async with ClientSession() as session:
        url = f'http://api.openweathermap.org/data/2.5/weather' \
              f'?q={city}&APPID=2a4ff86f9aaa70041ec8e82db64abf56'

        async with session.get(url) as response:
            weather_json = await response.json()
            try:
                return weather_json["weather"][0]["main"]
            except KeyError:
                return 'Нет данных'


async def handle(request):
    city = request.rel_url.query['city']
    weather = await get_weather(city)
    result = {'city': city, 'weather': weather}

    return web.Response(text=json.dumps(result, ensure_ascii=False))


if __name__ == '__main__':
    app = web.Application()
    app.add_routes([web.get('/weather', handle)])
    web.run_app(app)

В асинхронной функции get_weather ничего нового нет, мы ее лишь слегка "облагородили", чтобы запрос погоды для несуществующего города не приводил к трагическим последствиям для всего нашего приложения. За обработку запроса отвечает функция handle ("ручка" на сленге бэкендеров). Из запроса извлекается параметр city и передается в get_weather. Далее формируется результирующий ответ в виде json. Адрес нашего сервиса и тип запроса задается следующим образом: app.add_routes([web.get('/weather', handle)]).

Стоп! А где же наш старый друг asyncio.run? Не переживайте. Когда мы имеем дело с асинхронными веб-фреймворками (а aiohttp — это именно фреймворк, хоть и супер-минималистический), вся работа по созданию и запуску задач asyncio происходит у фреймворка "под капотом". Где-то глубоко в недрах 'web.run_app' создаются задачи, организуется бесконечный цикл и запускается в asyncio.run. Нам, как разработчикам, теперь нет нужды беспокоится об этих низменных деталях. Приложение мирно спит в бесконечном цикле, пока не придет запрос GET на определенный URL. Как только это произойдет, отработает логика в ручке. И снова баю-бай до следующего запроса. Но если первый запрос еще не успел обработаться, как поступил следующий, фреймворк отработает его в отдельной задаче, не дожидаясь (по возможности) окончания обработки первого. В этом сама суть асинхронности.

Заходим браузером на адрес: localhost:8080/weather?city=Sochi и получаем симпатичный json:

{"city": "Sochi", "weather": "Clouds"}

Кстати, если вы всерьез решили заняться бэкэнд-разработкой, одним браузером вам никак не обойтись. Потребуется инструмент, позволяющий залезать вглубь HTTP. Стандарт де-факто здесь Postman, но в природе есть и альтернативные решения.

Скелет нашего приложения готов, теперь начинаем наращивать на него мышцы. Воспользуемся бесплатным API-переводчиком libretranslate.de:

Пример 6.2

import json
from aiohttp import ClientSession, web


async def get_weather(city):
    async with ClientSession() as session:
        url = f'http://api.openweathermap.org/data/2.5/weather' \
              f'?q={city}&APPID=2a4ff86f9aaa70041ec8e82db64abf56'

        async with session.get(url) as response:
            weather_json = await response.json()
            try:
                return weather_json["weather"][0]["main"]
            except KeyError:
                return 'Нет данных'


async def get_translation(text, source, target):
    async with ClientSession() as session:
        url = 'https://libretranslate.de/translate'

        data = {'q': text, 'source': source, 'target': target, 'format': 'text'}

        async with session.post(url, json=data) as response:
            translate_json = await response.json()

            try:
                return translate_json['translatedText']
            except KeyError:
                return text


async def handle(request):
    city_ru = request.rel_url.query['city']
    city_en = await get_translation(city_ru, 'ru', 'en')

    weather_en = await get_weather(city_en)
    weather_ru = await get_translation(weather_en, 'en', 'ru')

    result = {'city': city_ru, 'weather': weather_ru}

    return web.Response(text=json.dumps(result, ensure_ascii=False))


if __name__ == '__main__':
    app = web.Application()
    app.add_routes([web.get('/weather', handle)])
    web.run_app(app)

Теперь в ручке дважды вызывается асинхронная функция get_translation и, вуаля:

localhost:8080/weather?city=Сочи

{"city": "Сочи", "weather": "Облака"}

Что это за микросервис без логгера? Однако использовать в насквозь асинхронном приложении привычную синхронную (а значит блокирующую) библиотеку logging — это непрофессионально. Воспользуемся асинхронной библиотекой логгирования aiologger:

Пример 6.3

import json
from aiohttp import ClientSession, web
from aiologger.loggers.json import JsonLogger


logger = JsonLogger.with_default_handlers(
            level='DEBUG',
            serializer_kwargs={'ensure_ascii': False},
        )


async def get_weather(city):
    async with ClientSession() as session:
        url = f'http://api.openweathermap.org/data/2.5/weather' \
              f'?q={city}&APPID=2a4ff86f9aaa70041ec8e82db64abf56'

        async with session.get(url) as response:
            weather_json = await response.json()
            try:
                return weather_json["weather"][0]["main"]
            except KeyError:
                logger.error(f'Невозможно получить погоду для города: {city}')
                return 'Нет данных'


async def get_translation(text, source, target):
    await logger.info(f'Поступил запрос на на перевод слова: {text}')

    async with ClientSession() as session:
        url = 'https://libretranslate.de/translate'

        data = {'q': text, 'source': source, 'target': target, 'format': 'text'}

        async with session.post(url, json=data) as response:
            translate_json = await response.json()

            try:
                return translate_json['translatedText']
            except KeyError:
                logger.error(f'Невозможно получить перевод для слова: {text}')
                return text


async def handle(request):
    city_ru = request.rel_url.query['city']

    await logger.info(f'Поступил запрос на город: {city_ru}')

    city_en = await get_translation(city_ru, 'ru', 'en')
    weather_en = await get_weather(city_en)
    weather_ru = await get_translation(weather_en, 'en', 'ru')

    result = {'city': city_ru, 'weather': weather_ru}

    return web.Response(text=json.dumps(result, ensure_ascii=False))


if __name__ == '__main__':
    app = web.Application()
    app.add_routes([web.get('/weather', handle)])
    web.run_app(app)

Обратите внимание, асинхронный логгер ни на миллисекунду не задержит работу нашего приложения в целом. Он будет использовать паузы, пока мы ожидаем запроса от пользователя или ответа от внешнего сервиса.

Ну а теперь вишенка на торте — асинхронный доступ к базе данных. Предположим, в процессе работы нашего приложения нам надо что-то писать в БД, ну, например, сохранять поступившие запросы (ничего более умного мне в голову не пришло). Самая простая БД в мире, как известно, — это SQLite. И для нее, к счастью, есть асинхронный драйвер aiosqlite. Пробуем:

Пример 6.4

import json
import aiosqlite
import asyncio
from aiohttp import ClientSession, web
from aiologger.loggers.json import JsonLogger
from datetime import datetime


logger = JsonLogger.with_default_handlers(
            level='DEBUG',
            serializer_kwargs={'ensure_ascii': False},
        )


async def create_table():
    async with aiosqlite.connect('weather.db') as db:
        await db.execute('CREATE TABLE IF NOT EXISTS requests '
                         '(date text, city text, weather text)')
        await db.commit()


async def save_to_db(city, weather):
    async with aiosqlite.connect('weather.db') as db:
        sql = f'INSERT INTO requests VALUES ("{datetime.now()}", "{city}", "{weather}")'

        await db.execute(sql)
        await db.commit()


async def get_weather(city):
    async with ClientSession() as session:
        url = f'http://api.openweathermap.org/data/2.5/weather' \
              f'?q={city}&APPID=2a4ff86f9aaa70041ec8e82db64abf56'

        async with session.get(url) as response:
            weather_json = await response.json()
            try:
                return weather_json["weather"][0]["main"]
            except KeyError:
                logger.error(f'Невозможно получить погоду для города: {city}')
                return 'Нет данных'


async def get_translation(text, source, target):
    await logger.info(f'Поступил запрос на на перевод слова: {text}')

    async with ClientSession() as session:
        url = 'https://libretranslate.de/translate'

        data = {'q': text, 'source': source, 'target': target, 'format': 'text'}

        async with session.post(url, json=data) as response:
            translate_json = await response.json()

            try:
                return translate_json['translatedText']
            except KeyError:
                logger.error(f'Невозможно получить перевод для слова: {text}')
                return text


async def handle(request):
    city_ru = request.rel_url.query['city']

    await logger.info(f'Поступил запрос на город: {city_ru}')

    city_en = await get_translation(city_ru, 'ru', 'en')
    weather_en = await get_weather(city_en)
    weather_ru = await get_translation(weather_en, 'en', 'ru')

    result = {'city': city_ru, 'weather': weather_ru}

    await save_to_db(city_ru, weather_ru)

    return web.Response(text=json.dumps(result, ensure_ascii=False))


if __name__ == '__main__':
    asyncio.run(create_table())

    app = web.Application()
    app.add_routes([web.get('/weather', handle)])
    web.run_app(app)

Можете заглянуть внутрь созданного на лету файла weather.db (только используйте не текстовый просмотрщик, а какую-нибудь приблуду для работы с БД, например, DBeaver). Для каждого запроса создается соответствующая запись в таблице requests. И снова никаких блокировок, мы ведь живем в асинхронном мире.

Смотрите-ка! Неожиданно появился наш забытый друг asyncio.run, к чему бы это? А это к тому, что операция создания таблицы requests (если таковой еще нет) должна выполняться лишь однажды при старте приложения, еще до запуска бесконечного цикла внутри web.run_app. Но функция-то create_table у нас асинхронная, так просто ее не запустишь (попробуйте сами, если не верите). Вот и приходится единожды запускать ее "вручную" при помощи asyncio.run и лишь затем передавать бразды правления web.run_app.

В заключение этого раздела хочу вас поздравить. Теперь вы имеете в руках все необходимое для создания собственных асинхронных веб-приложений. Неважно какой фреймфорк вы будете использовать: FastAPI, Tornado, Falcon или какой-нибудь еще. Принцип останется тот же самый как в старом добром aiohttp: создаем ручку и в ней нанизываем "шашлык" из вызовов асинхронных функций. Главное за чем необходимо следить — это чтобы в эти функции не затесалось что-нибудь блокирующее из скучного пыльного синхронного мира.

На этом мы временно прощаемся.

Продолжение следует...

Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.
О чем бы вы хотели узнать в следующей статье цикла?
0% О работе с популярными асинхронными фреймворками и библиотеками 0
0% Лучше будем углубляться в низкоуровневые детали asyncio и научимся сами писать фреймворки 0
Никто еще не голосовал. Воздержавшихся нет.
Источник: https://habr.com/ru/post/671798/


Интересные статьи

Интересные статьи

Согласно заявлению Python Software Foundation (PSF), Python 3.11 сейчас находится в альфа-ревизии и планируется к релизу в Октябре 2022.Какие изменения ожидают нас в следующей версии?
В октябре Минимущества Московской области отчиталось, что за 2021 год был начислено 410 млн рублей штрафов за борщевик Сосновского в Подмосковье. В этой заметке расскажу, как сейчас собирают штрафы в ...
Неделя аналитиков на Хабр Карьере завершилась, но некоторые интересные вопросы участников остались без ответа. Поэтому мы собрали их и адресовали ребятам из Usetech, Хоум Кредит, Леруа Мерлен и EPAM, ...
Введение В данной статье я бы хотел продемонстрировать то, как можно реализовать собственную программу ARP-спуфинга на Python. Реализаций уже тысячи, но почти все они с использование...
Сегодня поговорим о конкретной работе в области sizecoding. Дело в том, что некоторые релизы не только имеют культовый статус в узких кругах — они прямо и явно воздействовали на умы людей, застав...