Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
Сердцем любого backend являются данные. Существует два сценария использования данных. В одном из них данные изменяются редко, но при этом активно используются в сыром или агрегированном виде и применяются для целей аналитики в реальном времени (такие системы принято называть OLAP). В других системах важно обеспечить сохранение с высокой скоростью большого количество неструктурированных или полуструктурированных объектов, поступающих от устройств Интернета вещей, из источников произвольных событий, наблюдений за активностью пользователя (такие системы называются OLTP - Online Transaction Processing, ориентированные на большое количество транзакций с минимальной задержкой обработки). Для таких систем важно обеспечить надежность хранения данных, поддержку распределенного хранения на нескольких серверах и/или дата-центрах и сохранение консистентности распределенного хранилища.
При этом сами объекты могут отличаться от привычной реляционной модели данных и представляться, например, в виде json-документов с произвольной схемой, объектов с полями со множественными значениями или графов. Разумеется это приводит к необходимости изучения новых подходов к поиску и добавлению данных, использованию специальных драйверов. Но что если соединить распределенное надежное хранилище и синтаксис запросов, близкий к SQL? В этой статье мы познакомимся с проектом Apache Cassandra и обсудим на примере разработки API на Kotlin для сбора телеметрии с датчиков, расположенных по всему миру (с поддержкой отказоустойчивости и управляемой репликации между дата-центрами).
Начнем с рассмотрения основной идеи Cassandra (мы говорим об актуальной версии 4.0.4). Согласно современным представлениям Cassandra можно отнести к категории NoSQL баз данных, основанных на идее автоматического шардирования гигантских таблиц. Основной единицей Cassandra является узел (node), который координируется с другими узлами и организует хранение и доступ к данным, сохраненным в виде записей в таблицах. Важно отметить, что для кластера Cassandra не используются выделенные управляющие узлы и все узлы являются основными и содержат информацию о топологии сети (как в пределах одного дата-центра, так и между дата-центрами), размещении хэшей по разделам, состоянию соседних узлов (replica node), содержащих реплики записей. Для Cassandra предусмотрены механизмы поддержки согласованности (начиная от hinted handoff, когда при временной недоступности одного из узлов, другие носители реплики создают отложенный лог действий для последующего воспроизведения после восстановления доступа для пропавшего узла, и заканчивая механизмами полной репликации и восстановления узла (даже из полностью очищенного состояния, на основании данных из других реплик).
Коллекции столбцов (таблица) описываются схемой, при этом таблица создается внутри пространства ключей (keyspace). В свою очередь keyspace может определять стратегию репликации (с возможностью задать количество реплик на дата-центры), а также настройки кэширования и буферов поиска. Важно отметить, что для каждой таблицы обязательно указывает ключ - одно или несколько полей, использующихся для размещения хэша (записи) в keyset, при этом поля могут определять как отнесение записи к разделу (partition), так и кластеризацию внутри раздела. При этом записи одного раздела хранятся на одних и тех же серверах и их расположение известно (что позволяет оптимизировать выполнение запроса, поскольку каждый узел знает о карте размещения записей по узлам). Cassandra может выполнять отбор записей и без использования keyspace, но это может быть неоптимально, поскольку из запроса будет непонятно, какие из серверов должны быть вовлечены в процесс поиска и обращение будет отправлено на все узлы кластера.
При создании таблицы могут использоваться не только примитивные типы данных (int, boolean, float, double), но и составные структуры (list<T> для упорядоченных коллекций объектов указанного типа, map<T,V> для словарей с типом ключа T и типом значения V, set<T> для неупорядоченных множеств). Также поддерживаются типы данных для хранения времени (duration - продолжительность, timestamp - временная метка, date - дата, time - время без даты), text для хранения строки произвольной длины, blob для двоичных объектов, inet для хранения ip-адресов, uuid для уникальных идентификаторов, counter для хранения счетчика с атомарными функциями изменения, а также можно создать собственные типы данных через CREATE TYPE.
Cassandra хорошо подходит для хранения поточных событий, данных телеметрии, временных рядов (существует даже отдельная база данных KairosDB, работающая поверх Cassandra и ориентированная на хранение последовательности значений с привязкой ко времени), при этом близкие по времени записи могут интерпретироваться как единый набор, который должен быть помечен на удаление одновременно (задается через стратегию TimeWindowCompactionStrategy). Также для записей в Cassandra может быть извлечена дата последней модификации поля WRITETIME(name) и определено время до истечения TTL(name), при этом время существования записи определяется при добавлении или модификации. Также доступны методы сравнения для использования с полями типа timestamp, при этом можно использовать как временные метки, так и строковое представление ISO даты-времени и встроенные функции для преобразования представления.
Для выполнения запросов используется язык Cassandra Query Language (CQL), который синтаксически близок к SQL (SELECT, UPDATE, INSERT, DELETE) с возможностью отбора записей (WHERE), сортировки (ORDER BY), ограничения количества LIMIT, но при этом не поддерживает JOIN. Мы рассмотрим несколько подходов к использованию Cassandra в коде backend-приложения на Kotlin (начиная от использования низкоуровневого драйвера и до использования Spring Data). Но начнем мы с подготовки модели данных и развертывания демонстрационного кластера.
В нашем приложении предполагается сохранение данных о температуре и влажности от датчиков, расположенных по всему миру, а также сообщения об экологических проблемах, поступающие от пользователей системы. Информация от датчика будет иметь следующую структуру:
дата-время замера (с точностью до секунды)
местоположение (широта и долгота)
серийный номер датчика (int)
тип датчика (сама cassandra не поддерживает enum-типы, но их можно использовать на уровне драйвера)
значение замера (число с плавающей точкой)
организация-владелец датчика (uuid)
Для сохранения сообщений о проблемах будет использоваться структура:
автор сообщения
текст сообщения
категория проблемы
местоположение (широта и долгота)
тэги сообщения (список строк)
статус обработки
По описанию схемы видно, что здесь вполне можно было и обойтись обычной реляционной базой данных, но в действительности у нас есть еще одно важное требование - данные должны храниться в нескольких дата-центрах по всему миру и суммарный объем данных существенно превышает возможности хранения любого дата-центра.
Начнем с установки кластера из трех узлов Cassandra, для этого будем использовать официальный образ Docker-контейнера на Docker Hub. Для отладки также можно использовать бесплатный продукт Datastax Desktop для запуска отладочного сервера и взаимодействия с ним. Также для развертывания кластеров поверх Kubernetes существует несколько операторов: K8ssandra, Casskop, Sky Cassandra Operator для автоматизации развертывания, сбора и визуализации метрик, резервного копирования и восстановления данных, а также иных операций по обслуживанию кластера. Также облачные провайдеры предлагают свои решения, основанные на Apache Cassandra: Amazon Keyspaces, DataStax Astra DB, Azure Cosmos DB (совместима с API Cassandra и MongoDB).
Перед запуском кластера создадим файл конфигурации для поддержки ролевого доступа и управления учетными записями:
cassandra1.yaml
# название кластера
cluster_name: 'Demo Cluster'
num_tokens: 16
# allocate_tokens_for_local_replication_factor: 3
# конфигурация восстановления при сбое до 3 часов
hinted_handoff_enabled: true
max_hint_window_in_ms: 10800000 # 3 hours
# настройка ролевого доступа и управления пользователями
authenticator: PasswordAuthenticator
authorizer: CassandraAuthorizer
role_manager: CassandraRoleManager
network_authorizer: AllowAllNetworkAuthorizer
# механизм разделения по первичному ключу на разделы
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
# сохранение лога транзакций в памяти и выгрузка в хранилище каждые 10 секунд
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
# параметры seeds будут перезаписаны на реальный ip адрес после запуска сервера
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- seeds: "cassandra1"
# конфигурация публикации
storage_port: 7000
ssl_storage_port: 7001
listen_address: 0.0.0.0
broadcast_address: 0.0.0.0
start_native_transport: true
native_transport_port: 9042
native_transport_allow_older_protocols: true
rpc_address: 0.0.0.0
broadcast_rpc_address: 172.28.0.2
# время ожидания ответа и лимит для медленных запросов
request_timeout_in_ms: 10000
slow_query_log_timeout_in_ms: 500
# режим определения привязки к дата-центру (SimpleSnitch для одного дата-центра, может использоваться PropertyFileSnitch)
# и список ip-адресов и дата-центров
endpoint_snitch: SimpleSnitch
# разрешить добавление пользовательских функций в Cassandra
enable_user_defined_functions: true
enable_scripted_user_defined_functions: true
# разрешить подробное протоколирование запросов (с версии 4.0)
full_query_logging_options:
enabled: true
log_dir: /tmp/cassandrafullquerylog
roll_cycle: HOURLY
# разрешить аудит попыток входа (с версии 4.0)
audit_logging_options:
enabled: true
logger:
- class_name: BinAuditLogger
Дополнительно в конфигурации мы добавили автоматический аудит попыток входа (появилось в версии Cassandra 4.0) и выполненных запросов, а также разрешили добавление пользовательских функций на Java в базу данных Cassandra. Поскольку docker-контейнер модифицирует непосредственно conf-файл при запуске, создадим три копии этого файла и запустим наш кластер:
cp cassandra1.yaml cassandra2.yaml
cp cassandra1.yaml cassandra3.yaml
docker create network cassandra
docker run -itd --name cassandra1 --hostname cassandra1 --network=cassandra -p 9042:9042 -v ./cassandra1.yaml:/opt/cassandra/conf/cassandra.yaml cassandra
docker run -itd --name cassandra2 --hostname cassandra2 --network=cassandra -e CASSANDRA_SEEDS=cassandra1 -v ./cassandra2.yaml:/opt/cassandra/conf/cassandra.yaml cassandra
docker run -itd --name cassandra3 --hostname cassandra3 --network=cassandra -e CASSANDRA_SEEDS=cassandra1 -v ./cassandra3.yaml:/opt/cassandra/conf/cassandra.yaml cassandra
Для проверки состояния кластера (и выполнению работ по обслуживанию узлов) используется утилита nodetool, доступная внутри контейнера:
$ docker exec cassandra1 nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 172.28.0.2 136.47 KiB 16 100.0% 0f42781b-79a8-46f0-9446-287f3b9bd2e5 rack1
UN 172.28.0.4 20.75 KiB 16 100.0% c831f85c-f64d-468e-9681-b19413a78f23 rack1
UN 172.28.0.3 91.83 KiB 16 100.0% 8bb65a75-7130-4d58-9a42-de112b44906c rack1
После запуска кластера мы можем установить утилиту cqlsh или использовать ее в одном из контейнеров. Поскольку мы разрешили контроль доступа, при подключении необходимо указать имя пользователя и пароль (по умолчанию cassandra / cassandra). Cqlsh подключается по умолчанию к localhost, но параметром командной строки может быть указан адрес или имя любого из доступных серверов.
$ docker exec -it cassandra1 bash
cqlsh -u cassandra -p cassandra
При успешном подключении будет отображен заголовок с версиями сервера и протокола и приглашение для ввода команд.
Создадим новое keyspace с фактором репликации 3 (в этом случае для достижения консенсуса при чтении или записи в режиме кворум должно быть доступно (F+1)/2 серверов, т.е. выход при F=3, достаточно двух серверов для успешного завершения операции чтения или записи.
CREATE KEYSPACE IF NOT EXISTS ecology WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 } AND durable_writes = true;
Здесь используется класс для определения стратегии SimpleStrategy, который подразумевает единую политику репликации для всех дата-центров. Альтернативно может быть задан класс NetworkTopologyStrategy для которого задается соответствие между дата-центрами и политикой репликации создаваемого keyspace. Флаг durable_writes гарантирует успешную запись данных в постоянное хранилище.
Следующим действием создадим необходимые таблицы, но прежде создадим пользовательский тип данных, на который мы будем ссылаться в определении таблицы (местоположение):
CREATE TYPE ecology.location (latitude float, longitude float);
CREATE TABLE ecology.problem (id uuid, author uuid, message text, place location, status int, tags list<text>, PRIMARY KEY ((author), id));
Обратите внимание, что при создании типа и таблицы мы дополнительно указываем к какому keyspace относится данный объект. Определение таблицы создает схему данных для колонок: id (тип uuid), будет использоваться для уникальной идентификации сообщения, author (тип uuid) - идентификатор автора в сторонней системе (либо может быть создана дополнительная таблица с пользователями), message (текст произвольной длины) - сообщение от пользователя, place (пользовательский тип location), status (статус обращения, будет обновляться позднее). В определении первичного ключа можно увидеть, что поле author будет использоваться для partitioning (т.е. все сообщения от одного автора будут находиться на одних и тех же replica node), внутри которых выполняется кластеризация по id. Такой выбор первичного ключа обусловлен необходимостью его уникальности, а также возможностью быстрого поиска по автору и/или идентификатору сообщения.
Создадим также таблицу для хранения телеметрии с датчиков:
CREATE TABLE ecology.telemetry (datetime timestamp, place location, sensor_id uuid, sensor_type int, value double, sensor_owner uuid, PRIMARY KEY ((sensor_owner,sensor_id), sensor_type, datetime));
Здесь структура чуть более сложная, поскольку sensor_serial (серийный номер датчика) может повторяться у разных владельцев (sensor_owner), поэтому partition-ключ является составным. Также дополнительно мы делаем кластеризацию по типу датчика и дате-времени, что важно для возможности отбора значений в указанном временном промежутке для соответствующего измеренного значения. В остальном определение таблицы аналогично ранее созданному списку событий. Отдельный идентификатор для замера не создается, поскольку сочетание "владелец датчика" + "серийный номер датчика" + "дата-время" всегда является уникальным.
Давайте теперь наполним таблицу несколькими тестовыми записями:
insert into ecology.problem (id, author, message, place, tags, status) values
(uuid(), 00000000-0000-0000-0000-000000000000, 'На солнце появились пятна',
{longitude:30.3201503, latitude:60.03692}, ['bugs','sun'], 0);
insert into ecology.problem (id, author, message, place, tags) values
(uuid(), 00000000-0000-0000-0000-000000000000, 'Снова пропал корабль в бермудском треугольнике',
{longitude:-71.2112568, latitude:24.9391274}, ['shiplost']);
Как можно видеть, специальные типы данных (например, uuid) не являются строковыми литералами, поэтому указываются без кавычек. Для заполнения списка используется форма записи [набор значений], для множеств - {набор значений}, для словарей (map) - {'ключ':'значение',...}. Функция uuid() создает новое случайное значение уникального идентификатора. Запросим все данные из таблицы:
cassandra@cqlsh> select * from ecology.problem;
author | id | message | place | status | tags
--------------------------------------+--------------------------------------+------------------------------------------------+--------------------------------------------+--------+-----------------
00000000-0000-0000-0000-000000000000 | 643d58b1-37fc-4b18-8c46-b4744b17a938 | На солнце появились пятна | {latitude: 60.03692, longitude: 30.32015} | 0 | ['bugs', 'sun']
00000000-0000-0000-0000-000000000000 | 5c9e52e9-df34-42fe-b340-62a29a9d90c5 | Снова пропал корабль в бермудском треугольнике | {latitude: 24.93913, longitude: -71.21126} | null | ['shiplost']
(2 rows)
И теперь попробуем поискать все записи автора (это действие допустимо, поскольку автор указан как partition key):
select * from ecology.problem where author=00000000-0000-0000-0000-000000000000;
Аналогично можно получить отдельную запись автора (т.к. мы указали id записи как clustering key):
cassandra@cqlsh> select * from ecology.problem where author=00000000-0000-0000-0000-000000000000 and id=643d58b1-37fc-4b18-8c46-b4744b17a938
В то же время нам не получится непосредственно найти записи по статусу или по тэгу, поскольку из запроса нельзя получить информацию о расположении реплик (так как эти поля не входят в partition/clustering key). При попытке выполнить запрос по одному из полей будет выдана ошибка:
cassandra@cqlsh> select * from ecology.problem where author=00000000-0000-0000-0000-000000000000 AND status=0;
InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"
Можно добавить к запросу суффикс ALLOW FILTERING для явного указания, что мы готовы к непредсказуемому времени отклика (поскольку запрос будет отправлен на весь кластер):
select * from ecology.problem where author=00000000-0000-0000-0000-000000000000 AND status=0 ALLOW FILTERING;
author | id | message | place | status | tags
--------------------------------------+--------------------------------------+---------------------------+-------------------------------------------+--------+-----------------
00000000-0000-0000-0000-000000000000 | 643d58b1-37fc-4b18-8c46-b4744b17a938 | На солнце появились пятна | {latitude: 60.03692, longitude: 30.32015} | 0 | ['bugs', 'sun']
Аналогичная ситуация с поиском по списку (в этом случае используется предикат value contains tag):
select * from ecology.problem where author=00000000-0000-0000-0000-000000000000 AND tags contains 'sun' ALLOW FILTERING;
При сохранении данных датчиков мы можем указать срок актуальности (например, мы хотим сохранять только последнюю неделю):
insert into ecology.telemetry (sensor_owner,sensor_serial,sensor_type,datetime,value,place) VALUES (00000000-0000-0000-0000-000000000000,1,1,totimestamp(now()),25.1,{longitude:-95.5493073,latitude:44.8127629}) using ttl 604800;
Здесь мы использовали функцию now() для получения текущего времени и totimestamp(dt) для преобразования во временную метку. Дополнительно при вставке мы уточнили срок актуальности записи (604800 = 1 неделя в секундах). Проверим дату-время последнего обновления значения и время до истечения срока действия:
cassandra@cqlsh> select toDate(writetime(value)), ttl(value) from ecology.telemetry;
writetime(value) | ttl(value)
-----------------+------------
1653836623289664 | 604628
После истечения срока действия или при выполнении delete с подходящим условием where записи помечаются на удаление и будут очищены во время выполнения очередного действия по сжатию базу compaction (также может быть вызван вручную через nodetool compact
).
Поля с типом timestamp могут использоваться при поиске для отбора записей, находящихся в указанном временном интервале. Но иногда нужно получить относительно время (от текущего), например извлечь записи, созданные за 15 минут. Здесь нам могут помочь возможности определения собственных функций в Cassandra:
CREATE FUNCTION IF NOT EXISTS timeAgo(minutes int)
CALLED ON NULL INPUT
RETURNS timestamp
LANGUAGE java AS '
long now = System.currentTimeMillis();
if (minutes == null)
return new Date(now);
return new Date(now - (minutes.intValue() * 60 * 1000));
';
Здесь мы создаем функцию на Java, которая использует метод из пакета java.lang.System для получения текущего времени и возвращает временную метку, отстоящую на указанное количество минут в прошлом. Теперь мы можем использовать этот результат в отборе записей:
select sensor_owner,sensor_serial,sensor_type,value FROM ecology.telemetry WHERE sensor_type=1 AND datetime>timeAgo(18) allow filtering;
sensor_owner | sensor_serial | sensor_type | value
--------------------------------------+---------------+-------------+-------
00000000-0000-0000-0000-000000000000 | 1 | 1 | 25.1
Здесь мы использовали allow filtering, поскольку в условиях отбора нет ни одного фильтра, который бы помог выбрать правильные узлы с репликами по partition-ключу.
Также данные могут быть извлечены как json-объект или обновлены/добавлены из json. Структура json должна соответствовать схеме таблицы. Например, для извлечения данных о датчиках в JSON можно использовать такой запрос:
select json sensor_owner,sensor_serial,datetime,place,sensor_type,value from ecolo
gy.telemetry;
[json]
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"sensor_owner": "00000000-0000-0000-0000-000000000000", "sensor_serial": 1, "datetime": "2022-05-29 15:03:03.253Z", "place": {"latitude": 44.812763, "longitude": -95.54931}, "sensor_type": 1, "value": 25.1}
Аналогично может быть выполнено добавление записи. Обратите внимание, что в случае полного совпадения ключей ошибка не будет выводиться, но будет обновлена (перезаписана) существующая запись:
insert into ecology.telemetry json '{"sensor_owner": "00000000-0000-0000-0000-000000000000", "sensor_serial": 1, "datetime": "2022-05-29 15:03:43.295Z", "place": {"latitude": 44.812
763, "longitude": -95.54931}, "sensor_type": 1, "value": 25.1}';
Для диагностики работы самого кластера можно использовать виртуальные таблицы, расположенные в keyspace system_views. Получить информацию о доступных ресурсах и их текущую конфигурацию можно также командами describe:
describe keyspaces - вывести список доступных пространств ключей, describe keyspace <id> - подробное описание состояния и настроек указанного keyspace;
describe tables - вывести список доступных таблиц, describe table <keyspace>.<name> - подробное описание состояния и настроек указанной таблицы;
describe cluster - выводим информацию о конфигурации кластера;
describe types - список пользовательских типов, describe type <keyspace>.<name> - подробная информация об определении типа;
describe functions - список пользовательских функций, describe function <keyspace>.<name> - подробная информация об определении функции;
В целом мы посмотрели возможности Apache Cassandra по работе с данными и теперь самое время перейти к обсуждению методов программного взаимодействия с кластером Cassandra на Kotlin. Но перед этим добавим новую роль и назначим для нее права для работы с нашими таблицами:
CREATE ROLE ecology_user WITH PASSWORD = 'WaterIsLife!' AND LOGIN = true;
GRANT MODIFY ON ecology.problem TO ecology_user;
GRANT SELECT ON ecology.problem TO ecology_user;
GRANT MODIFY ON ecology.telemetry TO ecology_user;
Созданный пользователь будет иметь права на изменение и чтение таблицы ecology.problem и только на отправку телеметрии в ecology.telemetry. Следующим шагом мы добавим поддержку Cassandra в наше приложение и попробуем добавить и получить данные из нашего API.
Драйвер Datastax для Cassandra
Для доступа к базе данных сообществом разработчиков и компанией, которая занимается коммерческой поддержкой развертываний Cassandra и Datastax Enterprise, был предоставлен свободный драйвер для использования в приложениях на JVM. По своему назначению его назначение максимально похоже на JDBC-драйверы к другим СУБД, но все же здесь используется свой нестандартный набор примитивов и концепций.
Добавим поддержку драйвера в наше приложение и подключим зависимости в build.gradle.kts dependencies:
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
kotlin("jvm") version "1.6.21"
}
group = "tech.dzolotov.cassandra"
version = "1.0-SNAPSHOT"
val cassandraDriverVersion by extra("4.14.1")
repositories {
mavenCentral()
}
dependencies {
testImplementation(kotlin("test"))
implementation("com.datastax.oss:java-driver-core:$cassandraDriverVersion")
implementation("com.datastax.oss:java-driver-query-builder:$cassandraDriverVersion")
implementation("com.datastax.oss:java-driver-mapper-runtime:$cassandraDriverVersion")
}
tasks.withType<KotlinCompile> {
kotlinOptions.jvmTarget = "1.8"
}
Для установки подключения используется статический метод builder из класса CqlSession. При настройке builder может быть указаны следующие параметры конфигурации:
.addContactPoint(...) - добавить адрес сервера Cassandra, например
.addContactPoint(InetSocketAddress("cassandra1.local.domain", 9042))
addNodeStateListener(...) - добавить обработчик для отслеживания изменения состояния узла
addSchemaChangeListener(...) - добавить обработчик для отслеживания изменения схемы данных
.addTypeCodecs(...) - добавить преобразователи для специальных типов данных
.withAuthCredentials(login, pass) - указать имя и пароль для подключения к сессии
.withKeyspace(name) - подключиться к указанному пространству ключей по умолчанию
.withLocalDatacenter(name) - указать предпочтительный ближайший дата-центр
.withNodeDistanceEvaluator(profile, func) - использовать нестандартный способ определения ближайших доступных узлов
.withConfigLoader(...) - загрузка конфигурации из внешнего ресурса (например, application.conf), может использоваться совместно с DriverConfigLoader.fromClasspath("<название_файла_конфигурации>")
После завершения настройки вызывается метод build() для получения объекта сессии, через который будут выполняться все последующие запросы.
Мы будем использовать файл конфигурации и дополнительно определим профили, которые помогут определить подходящие настройки для сетевого подключения и таймаутов (application.conf размещается в /src/resources):
datastax-java-driver {
basic {
contact-points = [ "127.0.0.1:9042" ]
load-balancing-policy.local-datacenter = datacenter1
request {
timeout = 2 seconds
consistency = LOCAL_QUORUM
}
profiles {
olap {
basic.request.timeout = 5 seconds
basic.request.consistency = QUORUM
}
}
}
advanced.auth-provider {
class = PlainTextAuthProvider
username = ecology_user
password = "WaterIsLife!"
}
}
Файл конфигурации определяет настройки запроса по умолчанию (таймаут 2 секунды, режим поддержки консистентности при чтении - локальный кворум, для которого достаточно иметь доступными более половины реплика-узлов в местном дата-центре). Для отдельных длительных аналитических запросов создан дополнительный профиль olap, который может быть указан при создании объекта запроса. Также здесь указываются адреса точек подключения для получения первоначальной информации о топологии (у нас localhost), название ближайшего дата-центра и дополнительные данные для аутентификации.
Установим подключение к кластеру с использованием файла конфигурации:
fun main() {
val session = CqlSession.builder()
.withConfigLoader(DriverConfigLoader.fromClasspath("application.conf"))
.build()
println(session)
}
Объект сессии может использоваться как для непосредственного вызова запросов - синхронным/блокирующим способом (execute) или асинхронным с использованием механизмов java.util.Concurrent (CompletionStage), так и для создания prepared-запросов (аналогично PreparedStatement в JDBC) и извлечения метрик и метаданных кластера. Начнем с выполнения простого запроса - получим все зарегистрированные обращения об экологических проблемах:
data class Problem(val author:UUID?, val message:String?)
fun getProblems(session: CqlSession):List<Problem> {
return session.execute("SELECT author, message FROM ecology.problem").map {
Problem(it.getUuid("author"), it.getString("message"))
}.toList()
}
fun main() {
val session = CqlSession.builder().withConfigLoader(DriverConfigLoader.fromClasspath("application.conf")).build()
println(getProblems(session))
session.close()
}
Здесь мы использовали синхронный метод для выполнения запроса к базе данных и затем построчно преобразовали его в объекты дата-класса, используя типизованные get-методы от каждого объекта Row. Кроме get*-методов есть возможность получить из поля произвольного класса через метод get, которому первым параметром передается порядковый номер или название поля в результате запроса, а вторым - Java-класс, который должен быть создан из исходных данных). Например, в нашей записи есть вложенный объект (пользовательский тип location) и мы бы тоже хотели его добавить к нашей информации о проблеме. Добавим новый тип и явно обозначим, что хотели бы получить объект этого типа:
data class Location(val longitude:Float, val latitude:Float)
data class Problem(val author:UUID?, val message:String?, val place:Location?)
fun getProblems(session: CqlSession):List<Problem> {
return session.execute("SELECT author, message, place FROM ecology.problem").map {
Problem(it.getUuid("author"), it.getString("message"), it.get("place", Location::class.java))
}.toList()
}
Но, к сожалению, при запуске произойдет исключение CodecNotFoundException: Codec not found for requested operation: [UDT(ecology.location) <-> Location], поскольку драйвер не знает о способе отображения данных из Cassandra в наш новый класс. Здесь нам поможет создание и регистрация TypeCodec, который отвечает за преобразование сериализованного представления объекта в Kotlin-объект и обратно, а также за вывод форматированного JSON-представления. Для обработки JSON-представления добавим поддержку kotlinx.serialization для json-формата:
buildscript {
repositories { mavenCentral() }
dependencies {
val kotlinVersion by extra("1.6.21")
classpath(kotlin("gradle-plugin", version = kotlinVersion))
classpath(kotlin("serialization", version = kotlinVersion))
}
}
plugins {
kotlin("jvm") version "1.6.21"
kotlin("plugin.serialization") version "1.6.21"
}
group = "tech.dzolotov.cassandra"
version = "1.0-SNAPSHOT"
val cassandraDriverVersion by extra("4.14.1")
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.3")
implementation("com.datastax.oss:java-driver-core:$cassandraDriverVersion")
implementation("com.datastax.oss:java-driver-query-builder:$cassandraDriverVersion")
implementation("com.datastax.oss:java-driver-mapper-runtime:$cassandraDriverVersion")
}
При определении TypeCodec необходимо определить методы проверки обработки CQL и Java-типа, десериализации-сериализации decode/encode, форматирования и разбора JSON-представления format/parse. При обработке двоичного представления на вход поступает буфер байтов, где содержание записывается последовательностью из 4-байтовой длины и последовательности байт, кодирующих содержание (в случае с float используется 32-битовое представление).
class LocationTypeCodec : TypeCodec<Location> {
override fun accepts(javaClass: Class<*>): Boolean {
return javaClass == Location::class.java //используется в update/insert/where
}
override fun accepts(cqlType: DataType): Boolean {
val type = cqlType.asCql(false, true) //используется в select
return type == "ecology.location"
}
override fun getJavaType(): GenericType<Location> = GenericType.of(Location::class.java) //Связанный java-тип
override fun getCqlType(): DataType = DataTypes.custom("ecology.location") //Связанный CQL-тип
override fun decode(bytes: ByteBuffer?, protocolVersion: ProtocolVersion): Location? {
if (bytes == null) return null
val arr = bytes.array()
//первые 4 байта - длина записи, затем с 4 по 7 - float-представление широты
val latBytes = ByteBuffer.wrap(ByteArray(4) { arr[4 + it] })
//следующие 4 байта - длина записи, затем с 12 по 15 - float-представление долготы
val longBytes = ByteBuffer.wrap(ByteArray(4) { arr[12 + it] })
//декодирование делегируем в кодек FLOAT
val latitudeValue = TypeCodecs.FLOAT.decode(latBytes, protocolVersion)
val longitudeValue = TypeCodecs.FLOAT.decode(longBytes, protocolVersion)
return Location(latitudeValue!!, longitudeValue!!)
}
override fun parse(value: String?): Location? {
if (value == null) return null
return Json.decodeFromString<Location>(value)
}
override fun format(value: Location?): String {
if (value == null) return "{}"
return Json.encodeToString(Location.serializer(), value)
}
override fun encode(value: Location?, protocolVersion: ProtocolVersion): ByteBuffer? {
if (value == null) return null
val latLen = TypeCodecs.INT.encodePrimitive(4, protocolVersion) //длина float - 4 байта
val longLen = TypeCodecs.INT.encodePrimitive(4, protocolVersion) //длина float - 4 байта
val latBuf = TypeCodecs.FLOAT.encodePrimitive(value.latitude, protocolVersion) //кодируем широту
val longBuf = TypeCodecs.FLOAT.encodePrimitive(value.longitude, protocolVersion) //кодируем долготу
return ByteBuffer.allocate(16).put(latLen).put(latBuf).put(longLen).put(longBuf) //собираем сериализацию
}
}
fun main() {
val session = CqlSession.builder().withConfigLoader(DriverConfigLoader.fromClasspath("application.conf"))
.addTypeCodecs(LocationTypeCodec()).build()
println(getProblems(session))
session.close()
}
[Problem(author=00000000-0000-0000-0000-000000000000, message=Снова пропал корабль в бермудском треугольнике, place=Location(latitude=24.939127, longitude=-71.21126)), Problem(author=00000000-0000-0000-0000-000000000000, message=На солнце появились пятна, place=Location(latitude=60.03692, longitude=30.32015))]
Хотелось бы также получить дату-время добавления записи, но к сожалению в структуре такого поля нет. Здесь мы можем использовать функцию WRITETIME для определения времени последней модификации поля, но она возвратит результат с типом BIGINT, а нам бы хотелось получить дату-время (например LocalDateTime). Создадим еще один кодек для выполнения этого преобразования:
class DateTimeCodec : TypeCodec<LocalDateTime> {
override fun getJavaType(): GenericType<LocalDateTime> {
return GenericType.LOCAL_DATE_TIME
}
override fun getCqlType(): DataType {
return DataTypes.BIGINT
}
override fun accepts(javaClass: Class<*>): Boolean {
return javaClass == LocalDateTime::class.java
}
override fun accepts(cqlType: DataType): Boolean {
val type = cqlType.asCql(false, true)
return type == "bigint"
}
override fun decode(bytes: ByteBuffer?, protocolVersion: ProtocolVersion): LocalDateTime? {
val dateTime = TypeCodecs.BIGINT.decodePrimitive(bytes, protocolVersion)
return LocalDateTime.ofEpochSecond(dateTime/1000000, 0, ZoneOffset.UTC)
}
override fun parse(value: String?): LocalDateTime? {
if (value==null) return null
return LocalDateTime.parse(value)
}
override fun format(value: LocalDateTime?): String {
if (value==null) return "1900-01-01T00:00:00Z"
return value.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
}
override fun encode(value: LocalDateTime?, protocolVersion: ProtocolVersion): ByteBuffer? {
if (value == null) return null
val dtLen = TypeCodecs.INT.encodePrimitive(8, protocolVersion) //длина float - 4 байта
val dtBuf =
TypeCodecs.BIGINT.encodePrimitive(value.toInstant(ZoneOffset.UTC).epochSecond * 1000000, protocolVersion)
return ByteBuffer.allocate(12).put(dtLen).put(dtBuf)
}
}
Добавим регистрацию кодека при создании сессии:
val session = CqlSession.builder().withConfigLoader(DriverConfigLoader.fromClasspath("application.conf"))
.addTypeCodecs(LocationTypeCodec(), DateTimeCodec()).build()
И добавим в запрос получение времени последнего обновления записи:
fun getProblems(session: CqlSession): List<Problem> {
return session.execute("SELECT author, message, place, WRITETIME(message) as datetime FROM ecology.problem").map {
Problem(
it.getUuid("author"),
it.getString("message"),
it.get("place", Location::class.java),
it.get("datetime", LocalDateTime::class.java),
)
}.toList()
}
Теперь мы можем получить любые необходимые данные из Cassandra. Перейдем к рассмотрению методов создания запросов манипуляции данными. Одним из вариантов является использование метода execute с передачей SQL-запроса INSERT, но это в большинстве случаев неудобно из-за необходимости подстановки значений в корректном синтаксисе, несмотря на наличие метода format в кодеке типа данных. Чтобы избежать прямых подстановок значений можно использовать создание PreparedStatement, в котором значения аргументов подставляются в корректном синтаксисе в позиции вопросительных знаков в запросе после выполнения связывания bind. Созданный в bind объект класса BoundStatement может быть выполнен через уже известный execute:
val prepared = session.prepare("INSERT INTO ecology.problem (author,id,message) VALUES (?,?,?)")
session.execute(prepared.bind(testAuthor, UUID.randomUUID(), "Обнаружена шахта к центру Земли2"))
Однако, если в PreparedStatement поместить наш класс Location, то мы обнаружим, что он не обрабатывается через метод format и в запись не попадает (остается null). PreparedStatement ожидает, что значения полей, реализующих пользовательские типы, будут созданы как реализации интерфейса UdtValue, которую можно сделать вручную (реализовать методы в Location) или создать описание структуры данных через UserDefinedTypeBuilder:
val prepared2 = session.prepare("INSERT INTO ecology.problem (author,id,message,place) VALUES (?,?,?,?)")
val locationValue = UserDefinedTypeBuilder("ecology", "location").withField("latitude", DataTypes.FLOAT)
.withField("longitude", DataTypes.FLOAT).build()
session.execute(prepared2.bind(testAuthor, UUID.randomUUID(), "Обнаружена шахта к центру Земли", locationValue.newValue(0.0f, 0.0f)))
Теперь мы можем перенести это определение в data-класс и сделать генератор значения UdtValue для использования в запросах:
@Serializable
data class Location(val latitude: Float, val longitude: Float) {
fun value(): UdtValue = UserDefinedTypeBuilder("ecology", "location").withField("latitude", DataTypes.FLOAT)
.withField("longitude", DataTypes.FLOAT).build().newValue(latitude, longitude)
}
...
val prepared2 = session.prepare("INSERT INTO ecology.problem (author,id,message,place) VALUES (?,?,?,?)")
session.execute(prepared2.bind(testAuthor, UUID.randomUUID(), "Обнаружена шахта к центру Земли", Location(0.0f, 0.0f).value()))
...
Для диагностики ошибок и настройки запросов можно использовать builder-метод через класс SimpleStatement.builder("запрос") и далее использовать set-методы для конфигурации:
setExecutionProfile("name") - применяет профиль с преднастроенными параметрами запроса (определяется в application.conf)
setTracing(bool) - включает или отключает режим трассировки запроса (в лог вывод выполняется через log4j, поэтому требуется конфигурация log4j.xml)
setTimeout(duration) - переопределяет таймаут для ожидания ответа
setKeyspace("name") - изменяет пространство ключей по умолчанию
setIdempotence(bool) - помечает запрос как идемпотентный
setSerialConsistencyLevel(level) - изменяет режим ожидания консистентности (может быть ANY, ONE, TWO, THREE, LOCAL_QUORUM, QUORUM, ALL - расположены по увеличению надежности)
Кроме синхронных (блокирующих) методов драйвер предлагает варианты executeAsync (использует механизмы CompletableFuture из java.util.concurrent) и executeReactive/executeContinouslyReactive с моделью реактивной обработки, похожей на подход RxJava.
Для упрощения работы с запросами драйвер предлагает еще два компонента: QueryBuilder и Mapper. Начнем с рассмотрения QueryBuilder.
Основой запроса является функция selectFrom(table), к которой могут по цепочке добавляться модификаторы:
all()
- получить все записиcountAll()
- получить количество записейlimit(n)
- ограничить количество записей nttl(name)
- получить значение времени жизни для колонкиwhereColumn("name").isEqualTo(value)
- добавить проверку равенства значения поля (предикат для сравнения может быть другим)column("name")
- добавить колонку в ответcolumns("name1", "name2", ...)
- добавить несколько колонок в ответdistinct()
- выбрать неповторяющиеся записиjson()
- получить ответ в формате jsonas(alias)
- задать псевдоним для последнего выраженияallowFiltering()
- разрешить поиск по полям, не входящим в первичный ключ (partitioning/clustering)
Объект запроса формируется после вызова метода build и может быть передан в вызов session.execute(). Так предыдущие запросы можно переписать с использованием QueryBuilder:
//запрос содержания проблем
return session.execute(
selectFrom("ecology", "problem").all().column("author").column("message").column("place").writeTime("message").`as`("datetime")
.build()
).map {
Problem(
it.getUuid("author"),
it.getString("message"),
it.get("place", Location::class.java),
it.get("datetime", LocalDateTime::class.java),
)
}.toList()
//добавление новой проблемы
session.execute(
session.prepare(
insertInto("ecology", "problem").value("author", literal(testAuthor)).value("id", bindMarker())
.value("message", literal("Аномальная гравитация"))
.value("place", literal(Location(34.98293f, 32.40596f).value())).build()
).bind(UUID.randomUUID())
)
Компонент Mapper позволяет выполнить автоматическое преобразование полей класса в структуру таблиц базы данных Cassandra. Для использования Mapper добавим аннотацию @Entity к классу Problem, дополнительную аннотацию @PartitionKey к полю author и аннотацию @ClusteringColumn к новому полю id. Теперь определение класса выглядит так:
@Entity
data class Problem(
@PartitionKey val author: UUID?,
@ClusteringColumn val id: UUID?,
val message: String?,
val place: Location?,
val tags: List<String>?
)
Дальше создадим интерфейс для описания методов работы с базой данных и пометим его аннотацией @Dao, а его методы аннотациями:
@Select
- используется при поиске записей@GetEntity
- для получения одиночной записи@Insert
- описывает добавление записей@Delete
- применяется для удаления записей@Update
- для описания запросов изменения@Query
- произвольный запрос@QueryProvider
- делегировать запрос на разбор вложенного элемента коллекции указанному классу
И дополнительно создадим интерфейс с аннотацией @Mapper:
@Dao
interface Problems {
@Query("SELECT * FROM ecology.problem")
fun getAllProblems():PagingIterable<Problem>
}
@Mapper
interface InventoryMapper {
@DaoFactory
fun problemsDao():Problems
}
Для корректной обработки аннотаций подключим gradle plugin kapt и соответствующий процессор аннотаций от Mapper:
buildscript {
repositories { mavenCentral() }
dependencies {
val kotlinVersion by extra("1.6.21")
classpath(kotlin("gradle-plugin", version = kotlinVersion))
classpath(kotlin("serialization", version = kotlinVersion))
}
}
plugins {
kotlin("jvm") version "1.6.21"
kotlin("plugin.serialization") version "1.6.21"
kotlin("kapt") version "1.6.21"
}
group = "tech.dzolotov.cassandra"
version = "1.0-SNAPSHOT"
val cassandraDriverVersion by extra("4.14.1")
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.3")
implementation("com.datastax.oss:java-driver-core:$cassandraDriverVersion")
implementation("com.datastax.oss:java-driver-query-builder:$cassandraDriverVersion")
implementation("com.datastax.oss:java-driver-mapper-runtime:$cassandraDriverVersion")
implementation("org.slf4j:slf4j-log4j12:1.7.36")
kapt("com.datastax.oss:java-driver-mapper-processor:$cassandraDriverVersion")
}
Выполним сборку приложения (на этом этапе будет запущена кодогенерация) и заменим реализацию функции getProblems:
fun getProblems(session: CqlSession): List<Problem> = InventoryMapperBuilder(session)
.build()
.problemsDao()
.getAllProblems()
.toList()
Мы рассмотрели все основные вопросы по использованию драйвера для Cassandra для Kotlin и теперь готовы к созданию простого REST API на Spring Data и удобных привязках, которые упрощают использование Cassandra для backend-приложений.
Spring Data
Над драйвером DataStax в Spring Data реализована удобная обертка, которая позволяет генерировать автоматически запросы к базе данных для CRUD REST-сервисов и выполнять конфигурацию с использованием общих механизмов Spring.
Создадим проект с использованием конструктора Spring Initializr и добавим поддержку Spring Web (или Spring Web Reactive для использования webflux) и Spring Data for Apache Cassandra (может быть также добавлен реактивный вариант). Добавим конфигурацию запуска в src/resources/application.properties:
spring.data.cassandra.keyspace-name=ecology
spring.data.cassandra.port=9042
spring.data.cassandra.contact-points=127.0.0.1
spring.data.cassandra.username=cassandra
spring.data.cassandra.password=cassandra
spring.data.cassandra.local-datacenter=datacenter1
И зарегистрируем data-классы для наших записей в таблицах. Обратите внимание, что для определения записи используется аннотация @Table, а ключи помечаются @PrimaryKey с уточением типа ключа (type=PrimaryKeyType.Partitioned для ключа разделения на разделы, type=PrimaryKeyType.Clustered для ключа кластеризации внутри раздела).
@Table
data class TelemetryData(
val sensorType: Int,
@PrimaryKeyColumn(type = PrimaryKeyType.PARTITIONED) val sensorOwner: UUID,
@PrimaryKeyColumn(type = PrimaryKeyType.CLUSTERED) val sensorSerial: Int,
val value: Double
)
@Table
data class ProblemData(
@PrimaryKeyColumn(type=PrimaryKeyType.PARTITIONED)
val author:UUID,
@PrimaryKeyColumn(type=PrimaryKeyType.CLUSTERED)
val id:UUID,
val message:String,
val tags:List<String>,
)
Для использования специальных типов данных (например, Location) используются классы, реализующие интерфейс Converter<S,T> для двухстороннего преобразования значений в формат базы данных и в java-объект. Для корректного связывания объекты преобразования должны быть маркированы аннотации @WritingConverter и @ReadingConverter, а также конверторы необходимо зарегистрировать в классе конфигурации Cassandra. Также добавим поддержку kotlinx.serialization.
@Serializable
data class Location(val latitude:Float, val longitude:Float)
@Component
@WritingConverter
class LocationWritingConvertor : Converter<String?, Location?> {
override fun convert(source: String): Location? {
return Json.decodeFromString(Location.serializer(), source)
}
}
@Component
@ReadingConverter
class LocationReadingConvertor : Converter<Location?, String?> {
override fun convert(source: Location): String? {
return Json.encodeToString(Location.serializer(), source)
}
}
@Table
data class TelemetryData(
val sensorType: Int?,
@PrimaryKeyColumn(type = PrimaryKeyType.PARTITIONED) val sensorOwner: UUID,
@PrimaryKeyColumn(type = PrimaryKeyType.CLUSTERED) val sensorSerial: Int,
val value: Double?,
val place: Location?
)
@Table
data class ProblemData(
@PrimaryKeyColumn(type=PrimaryKeyType.PARTITIONED)
val author:UUID?,
@PrimaryKeyColumn(type=PrimaryKeyType.CLUSTERED)
val id:UUID?,
val message:String?,
val tags:List<String>?,
val place: Location?
)
Для взаимодействия с Cassandra используются репозитории - интерфейсы, помеченные как @Repository и унаследованные от CrudRepository<класс_данных, класс_ключа> или от CassandraRepository<класс_данных, класс_ключа>.
Конфигурация Cassandra должна перечислить используемые репозитории и необходимые настройки для драйвера (альтернативно настройки могут быть указаны в properties-файле):
@Repository
interface TelemetryRepository : CassandraRepository<TelemetryData, UUID>
@Repository
interface ProblemRepository : CassandraRepository<ProblemData, UUID>
@Configuration
@EnableCassandraRepositories(basePackageClasses = [TelemetryRepository::class, ProblemRepository::class])
class CassandraConfig : AbstractCassandraConfiguration() {
override fun getKeyspaceName() = "ecology"
override fun customConversions(): CassandraCustomConversions {
return CassandraCustomConversions(mutableListOf(LocationWritingConvertor::class, LocationReadingConvertor::class) )
}
}
Репозиторий предлагает набор методов для получения коллекции объектов, поиска отдельного объекта, создании новых объектов (а также можно создать свои методы в определении интерфейсы). Эти методы можно непосредственно использовать в реализации методов обработки REST-запросов:
@RestController
@RequestMapping("telemetry")
class Telemetry {
@Autowired
lateinit var repository: ProblemRepository
@GetMapping("/")
suspend fun getAll(): List<ProblemData> {
return repository.findAll()
}
@GetMapping("/{id}")
suspend fun getProblem(id:String): ProblemData? {
return repository.findByIdOrNull(UUID.fromString(id))
}
@PostMapping()
suspend fun addProblem(problemData: ProblemData) {
repository.insert(problemData)
}
}
Таким образом, Spring Data представляет удобные интерфейсы для доступа к Cassandra и, в сочетании с Spring Web, позволяет быстро создавать REST-интерфейсы для манипуляции данными из распределенного хранилища. Конечно, рассмотренный подход в production-системах вызовет проблемы из-за блокирующих вызовов и нужно либо использовать ThreadPoolExecutor, либо применять реактивную модель вызовов (в Spring Data или в драйвере Datastax) и использовать их совместно с корутинами, но это уже тема для следующей статьи.
Все разработчики знают, что код очень часто превращается со временем в "большой комок грязи" (Big Ball of Mud), поддерживать который очень тяжело и дорого. Хочу пригласить всех, кто дочитал до этого момента на бесплатный вебинар, который проведет мой коллега Сергей Окатов. На вебинаре обсудим как поддерживать чистую архитектуру приложения и контролируемо внедрять изменения. Также мы исследуем библиотеку для реализации бизнес-процессов, написанную на Kotlin. В завершении посмотрим пример модуля бизнес-логики, в котором сконцентрированы все требования заказчика.
Зарегистрироваться на вебинар