О чем статья?
Всем привет. Меня зовут Дмитрий, я системный архитектор в компании Arenadata, проектирую и разрабатываю системы мониторинга запросов ADCC (Arenadata Command Center) для Arenadata DB.
Часто при работе с разными базами данных необходимо отслеживать выполнение текущих запросов. В основном это связано с задачами администрирования или аналитики. Средства мониторинга, позволяющие управлять и наблюдать за выполнением запросов, сильно помогают в этом. Я расскажу о том, с какими задачами мы столкнулись при проектировании и реализации системы мониторинга запросов для Arenadata DB.
Arenadata DB — это аналитическая распределённая MPP-СУБД с открытым исходным кодом, построенная на основе СУБД Greenplum. В связи с этим мониторинг запросов сильно усложняется, потому что выполняются они на множестве сегментов. При этом нагрузка на каждый сегмент зависит от равномерности распределения данных и типов операций, выполняемых над ними. Прямым аналогом ADCC является система VMware Tanzu Greenplum Command Center, которая входит в состав платной версии платформы.
Я буду говорить о второй версии ADCC. Функциональные требования в новой версии по большей части остались прежними, однако были существенно повышены нефункциональные требования, которые кратко можно выразить известной триадой: производительность, отказоустойчивость, расширяемость. Это повлекло за собой глубокие изменения архитектуры; система фактически была переписана с нуля.
В результате получилось решение для детального мониторинга распределенной системы обработки данных.
Анализ архитектуры
Начали мы с анализа системной архитектуры первой версии ADCC. После изучения работы системы и кодовой базы составили диаграмму системной архитектуры.
Здесь:
Extensions — набор расширений Arenadata DB, собирающих метрики состояния запросов, блокировок, spill-файлов и пр. Написан на С.
Agent — сервис получения и обработки метрик, написанный на C++. Собирает сообщения, состоящие из нескольких UDP-пакетов. Агент устанавливается в единственном экземпляре на каждый хост (т.е. один агент обрабатывает метрики со всех сегментов в рамках одного хоста).
Router — обратный прокси-сервер для маршрутизации UI-запросов и метрик.
Web Server — основное приложение, которое отвечает за прием, фильтрацию, обработку и сохранение метрик. Также обрабатывает запросы от Web UI (который является частью Web Server) и отправляет на него обновления при помощи websockets.
Database — экземпляр PostgreSQL, в котором хранятся все данные по запросам.
Процесс обработки метрик:
Упрощенно работу системы можно описать так:
Модуль Extensions получает метрики различных типов, формирует на их основе сообщения в формате protocol buffers, разделяет каждое сообщение на части размером не больше датаграммы UDP, и отправляет их агенту на локальном хосте по UDS-сокету либо по UDP-сокету.
Agent восстанавливает из полученных датаграмм protobuf-сообщения и пересылает их по протоколу HTTP 1.1 в Web Server.
Web Server обрабатывает каждое сообщение в отдельности:
отправляет полученную метрику на Web UI;
асинхронно записывает новые данные в базу данных.
Основные проблемы этой архитектуры:
Есть немасштабируемое приложение (Web Server) для обработки метрик, получаемых с горизонтально масштабируемой системы. Более того, это же приложение обрабатывает пользовательские запросы и шлет обновления на Web UI через websockets.
Agent принимает и разбирает пакеты в одном потоке, что влияет на производительность как самого агента, так и сегментов на хосте из-за синхронной работы при отправке через UDS-сокет.
Agent отправляет сообщения на Web Server синхронно, что может влиять на производительность агента и сегментов, если Web Server недоступен, потому что на агенте быстро заполняются внутренние очереди сообщений.
Каждая метрика порождает отдельную транзакцию на запись в базу данных.
Если есть активные пользовательские сессии, то многие метрики отправляются по websockets на Web UI по одной.
Значительная часть метрик просто игнорируется на стороне Web Server. Для примера можно привести сообщения со статусом запроса. В этом случае учитываются только статусы, полученные от сегмента-координатора. При прогоне тестов оказалось, что из общего количества в 872 906 сообщений учитывалось менее 10 % из них (82 963). В итоге имеем лишнюю нагрузку на сеть и все компоненты системы.
Естественно, в этой схеме есть и другие проблемы, но их решение уже не сильно влияло на изначальную цель.
Немного о реализации расширения
Я упоминал о компоненте Extensions, но не уточнял, как он устроен и какие причины стоят за особенностями его работы.
Extensions — это два компонента Arenadata DB: gpperfmon и расширение Arenadata DB — gpADCC. Они собирают внутреннюю телеметрию Arenadata DB, которая включает:
план запроса;
параметры исполнения запроса в целом;
параметры работы узлов запроса;
блокировки объектов базы данных;
выгруженные на диск временные данные запроса, и др.
Сбор метрик выполняется двумя подходами — push- и pull-моделью.
Перехватчики
Сбор данных в push-модели выполняют перехватчики (hooks). В случае Arenadata DB (и PostgreSQL) перехватчик — это указатель на функцию, который может быть изменён расширениями. Функция вызывается в определенных местах, которые можно проинспектировать, а результат отправить вовне.
Перехватчики имеют доступ ко всей памяти процесса, который исполняет запрос; притом этот доступ безопасный и не требует блокировок: перехватчик вызывается в основном потоке исполнения. Вызов в основном потоке позволяет получать данные немедленно, на заранее известной стадии исполнения запроса. Однако это их же недостаток в случае ADCC: каждая инструкция перехватчика отнимает процессорное время у процесса исполнения запроса. Если действовать неаккуратно, то перехватчики замедлят работу системы, для мониторинга которой они предназначены.
В ADCC большинство параметров собираются перехватчиками в push-модели. Поэтому для взаимодействия Extensions и Agent используется протокол UDP: можно отправлять метрики, не дожидаясь подтверждения их получения от Agent. Благодаря этому гарантируется, что максимальное время исполнения каждого перехватчика конечно.
Несколько реальных перехватчиков в PostgreSQL, используемых ADCC на практике:
ExecutorStart_hook. Начинает исполнение SQL-команды. Позволяет изменять процесс начала исполнения SQL-команды (после планирования). ADCC использует этот hook для получения плана SQL-команды, а также определяет её уникальный идентификатор, который будет использоваться и далее.
ExecutorRun_hook. Исполняет запрос. Вызывается однократно. В ADCC используется для сбора статуса запроса; фактически, запрос начинает выполняться уже при вызове ExecutorStart_hook.
ExecutorEnd_hook. Завершает исполнение запроса. Вызывается только в случае нормального завершения, когда ошибок при выполнении не произошло. В случае аварийного завершения этот перехватчик не вызывается. В ADCC используется для сбора статуса запроса.
Процессы периодического сбора метрик
Pull-модель — это использование отдельного процесса, собирающего метрики по запросу или периодически. ADCC для этого использует Background Worker, отдельный процесс экземпляра Arenadata DB, запускаемый и останавливаемый совместно с основным процессом СУБД.
Отдельный процесс может собирать метрики асинхронно, минимально влияя на работу системы. Но есть нюанс: для сбора могут требоваться блокировки (эксклюзивный доступ к ресурсам, например, переменной); в результате мониторинг может полностью парализовать работу системы. Недостаток pull-модели: параметры снимаются не в реальном времени, а с задержкой (равной периоду сбора метрик или задержке между отправкой запроса о сборе метрик и началом сбора).
Background Worker в ADCC собирает данные (список блокировок и другую информацию) с помощью SQL-запросов. Запросы исполняются периодически, с определяемым пользователями периодом. Для единообразия протокол взаимодействия с Agent тот же, что в перехватчиках.
Данные
Далее мы занялись исследованием состава данных и их объемов, которые снимаются с Arenadata DB для средств мониторинга. Метрики могут быть переданы через два типа сокетов: UDS и UDP (последний используется, если недоступен первый). Из-за ограничения размера одного пакета для UDP (чуть менее 64 Кб для Linux) используется собственный протокол разделения сообщений на части и их последующей сборки.
Чтобы собрать полезную нагрузку, мы отключили передачу данных по UDS, запустили регрессионные тесты и при помощи утилиты tcpdump сняли дампы файлов с сообщениями на мастере и одном сегменте. Далее полученные файлы можно было использовать как источники данных с помощью udpreplay.
Анализ дампов показал следующее:
нужны не все метрики, собираемые расширениями;
нагрузка неравномерна по времени;
средняя нагрузка составила 1650 пакетов/с. или 1 Кбит/с, пиковая нагрузка достигала 45000 пакетов в секунду или 52 Кбит/с.;
некоторые типы сообщений бывают очень большого объема (порядка 50 Мб, это сообщения с планом запроса в виде форматированной строки).
Имеем неприятные последствия:
Значительную часть метрик можно отфильтровывать до отправки, а мы этого не сделали.
В случае отправки по UDP часть пакетов теряется на сетевом уровне при пиковых нагрузках.
Если хотя бы один пакет сообщения не был доставлен (если размер всего сообщения не умещается в один UDP-пакет), то всё сообщение не может быть восстановлено, а уже принятые пакеты подолгу занимают память.
Сообщение записывается в UDS синхронно. Это означает, что если по каким-то причинам агент долго обрабатывает пакет или не отвечает, то все сегменты на этом хосте в этот момент ждут завершения отправки, что замедляет базу в целом.
Требования к системе
Перед началом проектирования системы важно составить список требований, которым она должна соответствовать. Иначе можно разработать то, что просто уйдет в стол. Но не всегда требования означают ограничения, некоторые из них можно расценивать как послабления, которые, в свою очередь, дают больше свободы при проектировании и реализации. Рассмотрим пример одного из таких свойств.
В нашем случае система мониторинга должна быть:
отказоустойчивой;
доступной;
высокопроизводительной;
как можно менее сложной;
просто обновляемой;
предоставлять метрики в режиме, максимально приближенном к реальному времени.
Рассмотрю последние три пункта подробнее.
Сложность системы означает многое: это и общее количество компонентов (как своих, так и сторонних), и взаимодействие между ними, и трудоёмкость в поддержке, и практически неизбежные изменения в инфраструктурных требованиях к системе.
Обновление системы также должно быть продумано с точки зрения миграции данных, времени простоя, возможности возврата к предыдущей версии и пр.
Так как ADCC является системой мониторинга, то естественно, что к ней предъявляется и требование по времени отклика. В нашем случае оно может быть мягким. Задержка отображения метрик на клиенте в размере 1 секунды для пользователя не критична, поскольку очень быстрые запросы отслеживать в online не имеет смысла (их можно посмотреть в истории запросов), а для продолжительных запросов такая задержка незначительна. И это послабление дает гораздо больше свободы при реализации.
Что мы сделали
В итоге мы составили концепт новой архитектуры ADCC:
Компоненты новой архитектуры:
JNI library — нативная библиотека для работы с сокетами.
Extensions — компонент с расширениями. Не менялся с предыдущей версии.
Agent — новое приложение, реализованное на Java, которое принимает и фильтрует метрики от Extensions, балансирует нагрузку при отправке с помощью распределения метрик между Backend Server’ами по уникальному идентификатору запроса. Java был выбран по нескольким причинам:
Backend server реализован на Java, а Agent разрабатывается той же командой. Поэтому не нужно менять контекст при разработке, что, помимо прочего, упрощает реализацию взаимодействия Agent и Backend server.
JRE уже доступна на хостах Arenadata DB, поскольку она обеспечивает работу средства импорта/экспорта данных из внешних систем — PXF.
Backend Server — новый горизонтально масштабируемый сервис обработки и агрегации метрик, также написан на Java. Он же записывает информацию по запросам в базу данных и отправляет обновления на Web Server.
Service Registry — каталог зарегистрированных в системе сервисов ADCC.
Web Server — предыдущая версия Web Server, из которой убрана функциональность по обработке метрик и записи их в базу данных.
Web UI — клиентское web-приложение не изменилось с предыдущей версии.
Database — компонент базы данных остался без изменений с предыдущей версии.
В целом последовательность получения и обработки метрик не изменилась, но если вдаваться в детали, то отличительных особенностей довольно много, ниже мы рассмотрим часть из них.
Agent обрабатывает принятые пакеты асинхронно за счет внутренней очереди, куда складываются все принятые пакеты. Сообщения из очереди обрабатываются пулом потоков, что также увеличивает производительность агента.
Работа с сокетами реализована за счет native-библиотеки и обращение к ней через JNI. Это обусловлено сохранением протокола взаимодействия между Extensions и Agent. Как говорилось выше, он включает в себя также самописный протокол-надстройку над UDP. Экспериментально мы выяснили, что вызовы через JNI справляются с этой задачей примерно на 20 % лучше, чем аналогичная реализация на Java, поскольку до Java 16 нельзя было работать с датаграммами напрямую.
В Agent добавили фильтрацию метрик, которые не обрабатываются системой, что снижает нагрузку на сеть и компоненты.
Обмен сообщениями между Agent и Backend server заменили на gRPC, что позволило не только переиспользовать protocol buffer, но и применить мультиплексирование, дополнительные возможности HTTP 2, а также асинхронную отправку данных «из коробки».
На стороне Agent реализовали балансировку нагрузки между несколькими экземплярами Backend Server при помощи шардирования метрик по уникальному идентификатору запроса. При этом сохраняется консистентность данных при добавлении новых экземпляров Backend Server. Эта функциональность еще и помогает избавиться от неявного ограничения пропускной способности из-за мультиплексирования TCP-соединения. При наличии нескольких Backend Server’ов мы создаем по одному TCP-соединению на каждый экземпляр, что увеличивает общую пропускную способность.
Обработка метрик в Backend Server стала пакетной. Полученное сообщение складывается во внутреннюю очередь (в соответствии с типом сообщения). Отдельный фоновый процесс батчами вычитывает из всех очередей сообщения и свёртывает метрики: формирует одно insert/update-выражение из метрик, относящихся к одному запросу. Далее полученные метрики отправляются асинхронно на Web Server и записываются в базу данных с использованием JDBC batch processing.
Результаты
Нам удалось реализовать систему, которая не только решает проблемы производительности предыдущей версии, но и готова к обработке гораздо больших объемов данных. Первая версия не справлялась с нагрузкой, если включить GUC gp_enable_gpperfmon
, который позволяет периодически собирать информацию о прогрессе выполнения запроса (без этой опции состояние запроса приходит только на момент начала его выполнения и по его завершении). В новой версии даже при минимальной конфигурации системы включение gp_enable_gpperfmon
не приводит к проблемам, что позволило при помощи графического представления отслеживать состояние выполнения узлов запроса в режиме near-online.
Планы
Мы взялись за переписывание UI (с сохранением существующей функциональности), чтобы изменить протокол обмена данными между ним и Backend Server. Основная идея заключается в том, чтобы отправлять на UI только те данные, которые необходимы. Также в планах обработка получаемых от Arenadata DB метрик в отдельном потоке, замена канала обмена между Extensions и Agent на POSIX-очередь сообщений, сжатие больших сообщений и ряд других изменений.
В дальнейшей перспективе будем добавлять новые типы метрик для обработки, повысим детализацию по уже обрабатываемым метрикам, улучшим UI и UX, а также разделим хранилища метрик на холодное и горячее.
Хочу поблагодарить за помощь в подготовке этого материала моего коллегу, разработчика Greenplum, Ивана Лескина (@leskin-in). Иван очень хорошо знает устройство и тонкости работы Greenplum. Он составил детальное описание работы расширений и провел тщательное рецензирование данной статьи.