Использование Ariadne и его интеграция c FastAPI и Starlette

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

С недавнего времени в Starlette прекращена поддержка GraphQL. Так что если вы, как и мы, занимались разработкой сервиса на FastAPI, то обновления до последней версии Starlette вас неприятно удивили.

Причины, по которым это случилось, не столь важны, остается просто принять произошедшее как данность. Но переходить с GraphQL обратно на REST нам не хотелось, стандарт подходил под наши задачи, а поэтому надо было найти альтернативу. Всем привет, это Данил Максимов, программист ZeBrains, в этой статье я расскажу, почему после обновления «жить стало лучше, жить стало веселее»(с), и на что надо обратить внимание при миграции на альтернативное решение.

Выбор библиотеки: почему мы остановились на Ariadne

Самый простой вариант выглядел очевидным: поддержка GraphQL в Starlette изначально была построена на базе Graphene, а в качестве одной из альтернатив предлагалась библиотека starlette-graphene3. Но мы уже успели «оценить» и слишком лаконичную документацию, и отсутствие стандартов для написания кода, и проблемы с расширяемостью. 

А потому свой выбор мы остановили на Ariadne: 

  • У него достаточно объемная и понятная документация.

  • Он построен на базе Apollo Federation, что позволяет пользоваться всеми плюшками от него.

  • Активно развивается и не является форками или доработками чужих решений — это самостоятельный продукт.

Главное, что привлекло наше внимание — в отличие от Graphene Ariadne идет от «обратного». Основа — graphql-схема, а по ней строятся все запросы, мутации или подписки.

Это дает гораздо больше гибкости для работы с типами, а также позволяет выстроить систему ошибок и описать ее в схеме. В варианте с использованием решения от Graphene набор ошибок, которые может вернуть мутация, никак не отражен в схеме (только если вы не патчите вручную методы ее построения), а значит — непредсказуем, если у вас нет исходного кода или документации. 

Кроме того, Ariadne прекрасно поддерживает на уровне библиотеки механизм подписок, и для этого не придется тянуть лишние либы (в отличие от того же Graphene). Плюс поддержка передачи файлов при работе с GraphQL изначально была не самой простой задачей, а Ariadne предоставляет ее «из коробки».

Реализация мутаций, запросов и подписок

Писать очередную статью «как переехать с библиотеки ХХХ на библиотеку YYY» — неинтересно. Если вы дочитали до этого момента, значит — в состоянии самостоятельно установить нужные зависимости. Мы же поговорим о том, на что стоит обратить внимание после переезда, что изменится непосредственно в коде. Рассматривать будем, как водится, на примере классического todo-приложения, демо-версия доступна по ссылке.

Запросы и мутации

В GraphQL запросы обрабатываются с помощью резолверов (преобразователей), каждый из которых принимает в себя два позиционных аргумента: obj и info. Пример из документации Ariadne:

def example_resolver(obj: Any, info: GraphQLResolveInfo): 
  return obj.do_something() 

class FormResolver: 
  def __call__(self, obj: Any, info: GraphQLResolveInfo, **data):
    . . .

Из кода выше мы видим, что нам доступны как функциональный, так и ООП подход. Первым делом — определим тип запросов Query в .graphql:

queries/schema.graphql

…

type Query {
    getTasks(userId: ID!): [TaskType]!
    getTask(userId: ID!, taskId: ID!): TaskType!
}

…

Привяжем резолвер к допустимому типу поля схемы с помощью ObjectType, для которого необходимо будет указать метод .set_field(). Он принимает в себя два параметра: name, которое связывает его с одноименным полем схемы GraphQL и, собственно, нужный нам резолвер.

queries/__init__.py

from ariadne import ObjectType

from ariadne_example.app.api.queries import task

queries = ObjectType("Query")

queries.set_field("getTasks", task.resolve_get_user_tasks)
queries.set_field("getTask", task.resolve_get_user_task_by_id)

Сами резолверы импортируются из отдельного файла, давайте их напишем:

queries/task.py

import json
from typing import Any, List

from ariadne import convert_kwargs_to_snake_case
from graphql import GraphQLResolveInfo
from graphql_relay.node.node import from_global_id
from sqlmodel import select

from ariadne_example.app.db.session import Session, engine
from ariadne_example.app.models import Task

@convert_kwargs_to_snake_case
def resolve_get_user_tasks(
        obj: Any,
        info: GraphQLResolveInfo,
        user_id: str,
) -> List[dict]:
    """Get user tasks"""
    with Session(engine) as session:
        local_user_id, _ = from_global_id(user_id)
        statement = select(Task).where(Task.user_id == int(local_user_id))
        tasks = session.execute(statement).scalars().all()
        return [
            Task(
                id=task.id,
                created_at=task.created_at,
                title=task.title,
                status=task.status,
                user_id=task.user_id
            ).dict()
            for task in tasks
        ]

@convert_kwargs_to_snake_case
def resolve_get_user_task_by_id(
        obj: Any,
        info: GraphQLResolveInfo,
        task_id: str,
        user_id: str,
) -> dict:
    """Get user task by task ID."""
    with Session(engine) as session:
        local_task_id, _ = from_global_id(task_id)
        local_user_id, _ = from_global_id(user_id)
        statement = select(Task).where(Task.user_id == local_user_id, Task.id == local_task_id)
        task = session.execute(statement).scalar_one()
        return Task(
            id=task.id,
            created_at=task.created_at,
            title=task.title,
            status=task.status,
            user_id=task.user_id,
        ).dict()

Стандартный для todo-приложения набор резолверов, с помощью которого мы получаем список всех задач пользователя или какую-то конкретную задачу. 

Чуть сложнее ситуация обстоит с мутациями. Поскольку в Ariadne основой всего является схема .graphql, добавим в нее тип, соответствующий нашим мутациям:

schema.graphql

. . .

type Mutations {
    createTask(userId: ID!, taskInput: TaskInput): Response
    changeTaskStatus(taskId: ID!, newSatus: TaskStatusEnum): Response
}

. . .

Для обработки схемы нам потребуется резолвер, который мы сопоставим с мутацией.

mutations/__init__.py

from ariadne import ObjectType

from .task import resolve_create_task

mutations = ObjectType('Mutation')

mutations.set_field('createTask', resolve_create_task)

В коде выше мы импортировали резолвер из файла task.py, давайте его напишем. 

Резолверы мутаций в Ariadne — функции, которые принимают в себя аргументы parent и info, а также произвольный набор аргументов, относящихся к мутации, и возвращают данные, которые отправляются пользователю как результат запроса. В нашем примере описаны два резолвера, отвечающие за создание новой задачи и изменение статуса уже существующей:

mutations/task.py

from typing import Any

import sqlalchemy.exc
from ariadne import convert_kwargs_to_snake_case
from graphql.type.definition import GraphQLResolveInfo
from graphql_relay.node.node import from_global_id
from sqlmodel import select

from ariadne_example.app.db.session import Session, engine
from ariadne_example.app.core.struсtures import TaskStatusEnum, TASK_QUEUES
from ariadne_example.app.models import Task
from ariadne_example.app.core.exceptions import NotFoundError

@convert_kwargs_to_snake_case
def resolve_create_task(
        obj: Any,
        info: GraphQLResolveInfo,
        user_id: str,
        task_input: dict,
) -> int:
    with Session(engine) as session:
        local_user_id, _ = from_global_id(user_id)
        try:
            task = Task(
                title=task_input.get("title"),
                created_at=task_input.get("created_at"),
                status=task_input.get("status"),
                user_id=local_user_id
            )
            session.add(task)
            session.commit()
            session.refresh(task)
        except sqlalchemy.exc.IntegrityError:
            raise NotFoundError(msg='Не найден пользователь с таким user_id')
        return task.id


@convert_kwargs_to_snake_case
async def resolve_change_task_status(
        obj: Any,
        info: GraphQLResolveInfo,
        new_status: TaskStatusEnum,
        task_id: str,
) -> None:
    with Session(engine) as session:
        local_task_id, _ = from_global_id(task_id)
        try:
            statement = select(Task).where(Task.id == local_task_id)
            task = session.execute(statement)
            task.status = new_status
            session.add(task)
            session.commit()
            session.refresh(task)
        except sqlalchemy.exc.IntegrityError:
            raise NotFoundError(msg='Не найдена задача с таким task_id')
        for queue in TASK_QUEUES:
            queue.put(task)

Тут важно обратить внимание, что полезная нагрузка, которую возвращает мутация, представлена в виде простого dict. У нас нет возможности реализовать, как в Graphene класс, и указать в нем ожидаемые поля:

(вариант graphene)task.py

. . .

class CreateTask(graphene.Mutation):
    task = graphene.Field(Task)

    class Arguments:
        user_id = graphene.ID()
        input_data = graphene.Field()

    def mutate(self, parent, info, user_id: str, input_data: Task):
        local_user_id, _ = from_global_id(user_id)
        session = get_session()
        task = Task(title=input_data.get("title"), user_id=local_user_id)
        session.add(task)
        session.commit()
        session.refresh()
        return CreateTask(task=task)


. . .

Но прежде чем приступить к решению этой проблемы, давайте разберемся с третьим типом операции — с подписками.

Подписки в Ariadne

По устоявшейся традиции, первым делом определим тип в схеме:

schema.graphql

. . .

type Subscription {
    taskStatusChanged: TaskType!
}

. . .

Подписки сложнее запросов, поскольку работают не для одиночного обращения к серверу, а должны позволять уведомлять клиента при каждом изменении данных. 

Реализовывать это мы будем «по классике», с использованием WebSockets. Но просто открыть сокет — мало, нам понадобится генератор, который будет передавать данные при их изменении. Кроме того, не помешает иметь и «приемник», в который эти данные будут поступать.

subscriptions.py

import asyncio
from typing import Any

from ariadne import convert_kwargs_to_snake_case, SubscriptionType
from graphql import GraphQLResolveInfo

from ariadne_example.app.core.struсtures import TASK_QUEUES
from ariadne_example.app.models import Task

subscription = SubscriptionType()


@subscription.source("taskStatusChanged")
@convert_kwargs_to_snake_case
async def task_source(obj: Any, info: GraphQLResolveInfo):
    queue = asyncio.Queue()
    TASK_QUEUES.append(queue)
    try:
        while True:
            change_task = await queue.get()
            queue.task_done()
            yield change_task
    except asyncio.CancelledError:
        TASK_QUEUES.remove(queue)
        raise


@subscription.field("taskStatusChanged")
@convert_kwargs_to_snake_case
def task_resolver(task: Task, info: Any):
    return task

Источник подписки мы указываем в subscription.source("taskStatusChanged"), генератор открывает сокет и транслирует нужные нам данные, а резолвер принимает их и передает пользователю.

Ошибки, эксепшены и мидлвары

Все, изложенное выше — сродни обычному тестовому заданию «напишите todo с использованием следующих технологий…». А теперь — поговорим серьезно :-)

Пункт первый — мы отложили «на сладкое» вопрос формализации полезной нагрузки в мутациях. Пункт второй — классический слой ошибок GraphQL в целом позволяет прокинуть код ошибки в extensions и потом его оттуда получать, но для фронта было проблемой определить,  какой именно запрос или мутация завершился с ошибкой и как на эти ошибки реагировать.

Вспомним, что Ariadne пропагандирует подход «от схемы к коду», и добавим в .graphql нужные нам типы ошибок и статусов задач:

schema.graphql

. . .

enum ErrorTypeEnum {
    SERVER_ERROR
    NOT_FOUND_ERROR
    VALIDATION_ERROR
}

type ErrorType {
    message: String
    code: ErrorTypeEnum!
    text: String
}

enum TaskStatusEnum {
    draft
    in_process
    delete
    done
}

. . .

Вынесем логику обработки в core и зададим структуру:

core/structures.py

import enum
from typing import Optional
from dataclasses import dataclass

from ariadne import EnumType, ScalarType


class ErrorTypes(enum.Enum):
    SERVER_ERROR = enum.auto()
    NOT_FOUND_ERROR = enum.auto()
    VALIDATION_ERROR = enum.auto()


@dataclass
class ErrorScalar:
    message: Optional[str]
    code: ErrorTypes
    text: Optional[str]


class TaskStatusEnum(enum.Enum):
    draft = "draft"
    in_process = "in_process"
    delete = "delete"
    done = "done"

task_type_enum = EnumType("TaskStatusEnum", TaskStatusEnum)
datetime_scalar = ScalarType("DateTime")

@datetime_scalar.serializer
def serialize_datetime(value):
    return value.isoformat()

TASK_QUEUES = []

Теперь у нас есть класс, возвращающий осмысленный код ошибки, сообщение и опциональный текст, список вариантов ошибок и статусов задачи. Импортируем их в файл эксепшенов:

core/exceptions.py

from typing import Optional, Dict, Any

from graphql import GraphQLError

from ariadne_example.app.core.struсtures import ErrorTypes, ErrorScalar


class BaseGraphQLError(GraphQLError):
    def __init__(self, msg: str = "Server Error", extensions: Optional[Dict[str, Any]] = None):
        if not hasattr(self, "_extensions"):
            self._extensions = {"code": ErrorTypes.SERVER_ERROR.name}
        if extensions is not None:
            self._extensions = {**self._extensions, **extensions}
        super().__init__(msg, extensions=self._extensions)

    def parse(self) -> ErrorScalar:
        parsed_exception = ErrorScalar(
            message=self.extensions.get("user_message"),
            code=self.extensions.get("code"),
            text=self.message,
        )
        return parsed_exception


class ValidationError(BaseGraphQLError):
    def __init__(self, msg: str, extensions: Optional[Dict[str, Any]] = None):
        self._extensions = {"code": ErrorTypes.VALIDATION_ERROR.name}
        super().__init__(msg, extensions=extensions)


class NotFoundError(BaseGraphQLError):
    def __init__(self, msg: str, extensions: Optional[Dict[str, Any]] = None):
        self._extensions = {"code": ErrorTypes.NOT_FOUND_ERROR.name}
        super().__init__(msg, extensions=extensions)

Теперь мы можем вернуть не просто ошибку GraphQL или авторизации, а конкретный, причем формализованный тип нашей ошибки и ее код. Но зачем останавливаться на достигнутом? Вспомним о middlewares, которые позволят нам обработать написанные шагом ранее эксепшены:

core/middlewares.py

from ariadne.contrib.tracing.utils import is_introspection_field

from ariadne_example.app.core.exceptions import BaseGraphQLError


async def handle_error_middleware(resolver, obj, info, **args):
    """
    Если на этапе выполнения мутации или запроса будет выброшено исключение,
    перехватить и вывести в качестве ошибки.
    """
    errors = []
    value = {}

    if is_introspection_field(info):
        return resolver(obj, info, **args)

    try:
        value = await resolver(obj, info, **args)
    except BaseGraphQLError as exc:
        errors.append(exc.parse())
        value = {**value, **{'errors': errors}}
    except TypeError:
        value = resolver(obj, info, **args)
    return value

Краткий итог

Узнать о том, что разработчик библиотеки отказался от поддержки нужного вам функционала — конечно, неприятно. Но это шанс сделать все так, как хочется именно вам. 

Нам не хватало системы ошибок, чтобы фронт мог связать каждую с конкретным запросом или мутацией. Благодаря (пусть и вынужденному) переходу на Ariadne — мы получили возможность гибко настраивать схему GraphQL под свои задачи, через мидлвары автоматически перехватывать нужные исключения и форматировать их для работы с ошибками на фронте. А самое главное — у нас заработали подписки!

По ссылке — оба варианта реализации тестового приложения todo: на базе  Starlette, и на базе Ariadne. Вопросы, пожелания и проклятия — можно постить в комментариях или ко мне в телеграм @maximovd

Источник: https://habr.com/ru/post/595777/


Интересные статьи

Интересные статьи

Компания OmniLine — Платиновый партнер 3СХ — разработала интеграцию 3СХ и Creatio, доступную на маркетплейсе Terrasoft. 3CX CRM Template for Creatio — базовая интеграция популярной СRM-системы Cr...
Всем привет, я Android разработчик в компании Enaza подразделения Games, которая занимается дистрибуцией ключей для игр. В данном посте опишу опыт интеграции Jetpack Compose в существующий проект. В д...
ВведениеВ данной статье я бы хотел рассмотреть проблему обновления PHP в виртуальной машине BitrixVM, и действия, которые возможно применить если выполнение переезда на машину с обновленным ПО невозмо...
Ваш сайт работает на 1С-Битрикс? Каждому клиенту вы даёте собственную скидку или назначаете персональную цену на товар? Со временем в вашей 1С сложилась непростая логика ценообразования и формирования...
Постановка задачи Для отображения иконок в приложении Xamarin.Forms можно использовать изображения в различных форматах, например png, svg или шрифты ttf. Чаще всего для добавления стандартных...