CQRS на golang

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Сегодня рассмотрю архитектурный паттерн CQRS и его возможное место в вашей архитектуре. Также осуществим его реализацию на языке golang.

Проблематика

В некоторых проектах Каруны мы стремимся к микросервисной архитектуре. У этой концепции много плюсов, но она создаёт некоторые трудности. Одну из таких трудностей и метод её преодоления я хочу рассмотреть в данной статье. 

Для простоты возьмём универсальный пример в виде приложения интернет-магазина. Предположим, он имеет микросервисную архитектуру и следует доменной модели. Одной из главных частей нашего приложения является функционал, связанный с заказами пользователей и с товарами в этих заказах. В нашей архитектуре есть два соответствующих сервиса: order и goods. Сервис order отвечает за создание, обновление, удаление и чтение сущностей заказа (order), а сервис goods реализует тот же CRUD с товарами заказа. Наши клиенты (мобильные приложения, браузерное приложение, и т.д.) взаимодействуют с этими сервисами, и для удобства у нас реализован паттерн объединение API. Т.е разработан сервис, выполняющий роль API-композитора, работающий с данными наших сервис-провайдеров order и goods. Общую архитектуру можно представить следующим образом:

Что касается API-композитора, то его роль может выполнять веб-приложение, API-шлюз или отдельный сервис. Но выбор варианта в нашем случае выходит за рамки темы данной статьи.

Вроде бы всё неплохо: есть одна точка входа в приложение, логика разнесена по доменным областям. Но что делать в случае, когда API-композитору нужно выполнить нетривиальные выборки и объединять большие наборы данных?

Бизнес просит, чтобы были реализованы сложные фильтры и пагинация. Например, нужно выбрать заказы, в которых количество товаров больше заданного N. В этом случае нам нужно делать полную выборку из сервиса goods, и на композитор будет ложиться задача объединения и фильтровки большого количества данных. Это неэффективно и задействует большое количество памяти. Как раз в этом случае на помощь приходит шаблон CQRS, который и решает проблему.

Проектирование

Суть шаблона CQRS (command query respon­sibility segregation — разделение ответственности командных запросов) заключается в разделении модулей и данных на две отдельные части: команды и запросы. Командные модули реализуют операции: create, update, delete. Модуль запросов реализует получение данных (get). Помимо улучшенного разделения ответственностей, преимущество данного шаблона заключается в том, что сервисы могут хранить данные в таком виде, в каком это удобно для более эффективных запросов. В нашем случае сервисы order, goods будут выполнять только команды, а новый сервис order-history возьмёт на себя ответственность в реализации запросов. Посмотрим, как меняется архитектура с внедрением шаблона CQRS:

Теперь сервисы order и goods отвечают только за изменение соответствующих им сущностей: заказов и товаров. Все эти изменения, помимо записи в базы данных сервисов, публикуются в виде событий. Новый сервис order-history отвечает только за запросы на чтение данных. Он подписывается на события из order, goods и заполняет/изменяет свою базу данных.

Реализация

Попробуем реализовать с помощью следующего стэка: golang как язык сервисов, postgresql для баз данных и kafka для обмена сообщениями между сервисами.

Создадим локальное рабочее окружение, а именно — сервисы order, goods, order-history, БД для каждого сервиса и брокер kafka:

docker-compose.yml
version: '3.9'

services:
  order:
    build:
      dockerfile: .docker/app.Dockerfile
      context: ./
      args:
        SERVICE_NAME: order
    environment:
      - HTTP_BIND=8080
      - POSTGRES_DB=orders
      - POSTGRES_USER=orders_user
      - POSTGRES_PASSWORD=orders_password
      - HOST_DB=db-order
      - PORT_DB=5432
      - KAFKA_ADDR=kafka:9092
      - ORDER_CREATED_TOPIC=order_created_v1
    depends_on:
      - db-order
      - kafka
    volumes:
      - ./order:/app/order:delegated
      - ./.docker/entrypoint.sh:/entrypoint.sh:ro
    entrypoint: /entrypoint.sh
    ports:
      - "8080:8080"
    networks:
      - cqrs

  db-order:
    image: postgres:14
    environment:
      - POSTGRES_DB=orders
      - POSTGRES_USER=orders_user
      - POSTGRES_PASSWORD=orders_password
    ports:
      - "5441:5432"
    volumes:
      - data:/var/lib/postgresql
    networks:
      - cqrs

  goods:
    build:
      dockerfile: .docker/app.Dockerfile
      context: ./
      args:
        SERVICE_NAME: goods
    environment:
      - HTTP_BIND=8081
      - POSTGRES_DB=goods
      - POSTGRES_USER=goods_user
      - POSTGRES_PASSWORD=goods_password
      - HOST_DB=db-goods
      - PORT_DB=5432
      - KAFKA_ADDR=kafka:9092
      - GOODS_CREATED_TOPIC=goods_created_v1
    volumes:
      - ./goods:/app/goods:delegated
      - ./.docker/entrypoint.sh:/entrypoint.sh:ro
    entrypoint: /entrypoint.sh
    depends_on:
      - db-goods
      - kafka
    ports:
      - "8081:8081"
    networks:
      - cqrs

  db-goods:
    image: postgres:14
    environment:
      - POSTGRES_DB=goods
      - POSTGRES_USER=goods_user
      - POSTGRES_PASSWORD=goods_password
    ports:
      - "5442:5432"
    volumes:
      - data:/var/lib/postgresql
    networks:
      - cqrs

  order-history:
    build:
      dockerfile: .docker/app.Dockerfile
      context: ./
      args:
        SERVICE_NAME: order-history
    environment:
      - HTTP_BIND=8082
      - POSTGRES_DB=orders_history
      - POSTGRES_USER=orders_history_user
      - POSTGRES_PASSWORD=orders_history_password
      - HOST_DB=db-order-history
      - PORT_DB=5432
      - KAFKA_ADDR=kafka:9092
      - GOODS_CREATED_TOPIC=goods_created_v1
      - ORDER_CREATED_TOPIC=order_created_v1
    depends_on:
      - db-order-history
      - kafka
    volumes:
      - ./order-history:/app/order-history:delegated
      - ./.docker/entrypoint.sh:/entrypoint.sh:ro
    entrypoint: /entrypoint.sh
    ports:
      - "8082:8082"
    networks:
      - cqrs

  db-order-history:
    image: postgres:14
    environment:
      - POSTGRES_DB=orders_history
      - POSTGRES_USER=orders_history_user
      - POSTGRES_PASSWORD=orders_history_password
    ports:
      - "5443:5432"
    volumes:
      - data:/var/lib/postgresql
    networks:
      - cqrs

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - '2181:2181'
    networks:
      - cqrs
  kafka:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: order_created_v1:1:1,goods_created_v1:1:1
    networks:
      - cqrs

volumes:
  data:

networks:
  cqrs:

Сервис order будет сохранять заказы в таблицу:

CREATE TABLE orders (
    id         BIGSERIAL PRIMARY KEY,
    user_id    BIGINT NOT NULL,
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);

И реализовывать эндпоинт для создания заказа:

func NewServer(db *pgxpool.Pool, kafkaProducer sarama.SyncProducer) Server {
	s := Server{}
	s.kafkaProducer = kafkaProducer
	s.db = db
	s.router = mux.NewRouter()

	s.router.HandleFunc("/v1/order", s.CreateOrderV1).Methods(http.MethodPost)

	return s
}

Который обрабатывает создание заказа, сохраняет его в БД и шлет событие в топик order_created_v1:

обработчик CreateOrderV1
func (s Server) CreateOrderV1(w http.ResponseWriter, r *http.Request) {
	err := r.ParseForm()
	if err != nil {
		log.Error().Err(err).Msg("Data hasn't been parsed.")
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	userID := r.Form.Get("userId")

	order := model.Order{}
	err = s.db.QueryRow(context.Background(), `INSERT INTO orders (user_id, created_at) VALUES ($1, NOW()) RETURNING id, user_id, created_at`, userID).Scan(&order.ID, &order.UserID, &order.CreatedAt)
	if err != nil {
		log.Error().Err(err).Msg("Order hasn't been created.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	msg := model.CreatedOrderMsg{Data: order}
	msgStr, err := json.Marshal(msg)
	if err != nil {
		log.Error().Err(err).Msg("Message hasn't been marshaled.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("ORDER_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
	_, _, err = s.kafkaProducer.SendMessage(producerMsg)
	if err != nil {
		log.Error().Err(err).Msg("Message hasn't been sent.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusCreated)
}

Сервис goods реализуется аналогично. Таблица для хранения товаров заказов:

CREATE TABLE goods (
    id         BIGSERIAL PRIMARY KEY,
    order_id   BIGINT NOT NULL,
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);

Эндпоинт для создания товара в заказе:

s.router.HandleFunc("/v1/goods", s.CreateGoodsV1).Methods(http.MethodPost)

И обработчик для создания товара в заказе и отсылки события в топик goods_created_v1:

обработчик CreateGoodsV1
func (s Server) CreateGoodsV1(w http.ResponseWriter, r *http.Request) {
	err := r.ParseForm()
	if err != nil {
		log.Error().Err(err).Msg("Data hasn't been parsed.")
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	orderID := r.Form.Get("orderId")

	goods := model.Goods{}
	err = s.db.QueryRow(context.Background(), `INSERT INTO goods (order_id, created_at) VALUES ($1, NOW()) RETURNING id, order_id, created_at`, orderID).Scan(&goods.ID, &goods.OrderID, &goods.CreatedAt)
	if err != nil {
		log.Error().Err(err).Msg("Goods hasn't been created.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	msg := model.CreatedGoodsMsg{Data: goods}
	msgStr, err := json.Marshal(msg)
	if err != nil {
		log.Error().Err(err).Msg("Message hasn't been marshaled.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("GOODS_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
	_, _, err = s.kafkaProducer.SendMessage(producerMsg)
	if err != nil {
		log.Error().Err(err).Msg("Message hasn't been sent.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusCreated)
}

БД сервиса order-history имеет следующую структуру:

CREATE TABLE orders (
    id         BIGINT PRIMARY KEY,
    user_id    BIGINT NOT NULL,
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);

CREATE TABLE goods (
    id         BIGSERIAL PRIMARY KEY,
    order_id   BIGINT NOT NULL,
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
    FOREIGN KEY (order_id) REFERENCES orders (id)
);

Сервис слушает события order_created_v1 и goods_created_v1 и записывает данные в свою БД:

обработка события order_created_v1
...
type OrderCreatedEvent struct {
	Data struct {
		ID        int64     `json:"id"`
		UserID    int64     `json:"user_id"`
		CreatedAt time.Time `json:"created_at"`
	} `json:"data"`
}

func (oh OrderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		oce := OrderCreatedEvent{}
		err := json.Unmarshal(msg.Value, &oce)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been handled.")
			session.MarkMessage(msg, "")
			continue
		}

		_, err = oh.db.Exec(context.Background(), `INSERT INTO orders (id, user_id, created_at) VALUES ($1, $2, $3)`, oce.Data.ID, oce.Data.UserID, oce.Data.CreatedAt)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been inserted.")
		}

		session.MarkMessage(msg, "")
	}

	return nil
}
...
обработка события goods_created_v1
...
type GoodsCreatedEvent struct {
	Data struct {
		ID        int64     `json:"id"`
		OrderID   int64     `json:"order_id"`
		CreatedAt time.Time `json:"created_at"`
	} `json:"data"`
}

func (gh GoodsHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		gce := GoodsCreatedEvent{}
		err := json.Unmarshal(msg.Value, &gce)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been handled.")
			session.MarkMessage(msg, "")
			continue
		}

		_, err = gh.db.Exec(context.Background(), `INSERT INTO goods (id, order_id, created_at) VALUES ($1, $2, $3)`, gce.Data.ID, gce.Data.OrderID, gce.Data.CreatedAt)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been inserted.")
		}

		session.MarkMessage(msg, "")
	}

	return nil
}
...

Сервис будет реализовывать эндпоинт получения данных:

	s.router.HandleFunc("/v1/order-history", s.GetOrderHistoryV1).Methods(http.MethodGet)

И сама реализация метода:

обработчик GetOrderHistoryV1
func (s Server) GetOrderHistoryV1(w http.ResponseWriter, r *http.Request) {
	err := r.ParseForm()
	if err != nil {
		log.Error().Err(err).Msg("Data hasn't been parsed.")
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	threshold := r.Form.Get("threshold")
	offset := r.Form.Get("offset")
	limit := r.Form.Get("limit")

	rows, err := s.db.Query(context.Background(), `SELECT orders.id, orders.user_id, orders.created_at FROM orders
    	INNER JOIN goods ON goods.order_id = orders.id 
		GROUP BY orders.id 
		HAVING COUNT(goods.id) > $1
		LIMIT $2 OFFSET $3`, threshold, limit, offset)
	if err != nil {
		log.Error().Err(err).Msg("Goods haven't been got.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}
	defer rows.Close()

	data := make([]model.Order, 0)
	for rows.Next() {
		o := model.Order{}
		sErr := rows.Scan(&o.ID, &o.UserID, &o.CreatedAt)
		if sErr != nil {
			log.Error().Err(err).Msg("Reading error.")
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		data = append(data, o)
	}

	ordersRsp := model.OrdersResponse{Data: data}
	response, err := json.Marshal(ordersRsp)
	if err != nil {
		log.Error().Err(err).Msg("Response hasn't been marshaled.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	w.Write(response)
}

Теперь мы без лишних накладных расходов можем выполнить запрос на получение заказов, в которых количество товаров больше 1.

curl 'http://localhost:8082/v1/order-history?limit=2&threshold=1&offset=0'

Таким образом, мы получаем данные без сложной обработки, вся логика получения данных и обработки реализована в сервисе order-history.

Для простоты мы реализовали только создание сущностей и чтение их в сервисе order-history. Реализация обновления данных может усложнить логику сервисов, т.к нужно поддерживать конкурентное обновление данных и как-то обрабатывать повторяющиеся события.

Полный листинг реализации данного функционала вы можете найти на моем github.

Заключение

  • Реализация шаблона CQRS позволяет эффективно разделить логику приложения: эффективно реализуются запросы и улучшить общее разделение ответственности

  • В модуле запросов можно использовать другие СУБД помимо PostgreSQL — в том числе и аналитические. Например, clickhouse или vertica. Также можно использовать NoSQL хранилища типа MongoDB или DynamoDB.

  • Несмотря на преимущества, CQRS влечёт за собой усложнение архитектуры (администрирование и обслуживание БД). Может появиться рассинхронизация между представлениями для БД команд и запросов. За этим тоже необходимо следить.

  • Модуль представлений сложен в обслуживании: проблемы конкурентного обновления данных и повторяющихся событий.

  • CQRS хорошо совместим с event sourcing.

Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.
Как вы работаете с нетривиальными выборками данных в вашей архитектуре?
33.33% Используем CQRS 1
33.33% Объединяем данные с нескольких сервисов в агрегаторе 1
0% У нас нет сложных выборок, сервисы справляются с нужными выборками без CQRS 0
33.33% У нас монолит, нет такой проблемы 1
0% Другое 0
Проголосовали 3 пользователя. Воздержался 1 пользователь.
Источник: https://habr.com/ru/company/karuna/blog/582814/


Интересные статьи

Интересные статьи

Если вы знакомы с Python, то уже сталкивались с Jupyter Notebook или работали в нём по крайней мере один раз. Jupyter Notebook — это удобный инструмент, позволяющий писат...
Генри Форд чуть не прогорел на своей классической фразе про пятьдесят оттенков черного. General Motors стала предлагать разноцветные модели Chevrolet, Pontiac, Buick, ...
Один из ключевых сценариев работы в CRM это общение с клиентом в удобном для него канале. По почте, по телефону, по SMS или в мессенджере. Особенно выделяется WhatsApp — интеграцию с ...
Те, кто собираются открывать интернет-магазин, предварительно начитавшись в интернете о важности уникального контента, о фильтрах, накладываемых поисковиками за копирование материалов с других ресурсо...
Компании переполнили рынок товаров и услуг предложениями. Разнообразие наблюдается не только в офлайне, но и в интернете. Достаточно вбить в поисковик любой запрос, чтобы получить подтверждение насыще...