Кластер ElasticSearch на 1Ptb+

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Черновик статьи был написан еще год назад, когда я работал на крупном международном проекте, но из-за разных событий прошлого года он остался неопубликованным.

На проекте в моем ведении находилось несколько on-premise кластеров в нескольких европейских датацентрах. «Мы» в этой статье — небольшая команда DataOps из 5 человек.

Было дело я читал на Хабре статью про «Кластер Elasticsearch на 200 ТБ+» и примерял написанное к нам, у нас такой кластер считался средним, самый маленький кластер под 0,1Ptb, а большой тогда был под 0,5Ptb. Потом была поставлена задача подготовить кластер к увеличению объемов входящих данных в 2-3 раза, а срок хранения в 2 раза, т. е. объем хранимых данных, если грубо экстраполировать, должен был стать в районе 2-3Ptb.

Хочу поделиться нашим опытом, может кому пригодится.

Требования к кластеру

  • Возможность проводить анализ инцидентов, влияния инфраструктурных изменений на предоставляемые услуги, мониторинг качества услуг;

  • Максимальная допустимая задержка в поступающих данных 15 минут;

  • При задержке в данных более 30 минут следует oncall инцидент;

  • Интерфейсы для пользователей должны остаться прежними или схожими, а именно:

    • Grafana для мониторинга;

    • Kibana (Discovery режим) для более детальных расследований.

  • Срок хранения исходных журналов — изначально 1 неделя и довести до 2 недель;

  • Срок хранения метрик — постоянно.

Характеристики входного потока

  • Данные с нескольких сотен серверов собираются и передаются на один входной сервер по SFTP. При этом интервал сбора и отправки данных 5 минут. Т.е. уже заложена задержка в 5 минут в поступающих данных и на обработку данных, обеспечение доступности их, остается 10 минут. Так же данные могут приходить с большой задержкой (например, в сутки) из-за отсутствия связи с удаленным сервером.

  • 4 млн файлов общим объемом в 3Tb (запакованные в GZIP CSV-подобные файлы) в день

  • Размер файлов от нескольких килобайт до 2Gb

  • Общий объем данных после обработки ~63Tb в день

  • Суммарная скорость индексирования сообщений в ElasticSearch на Hot датанодах не выше 800K/s

Поток обработки

У нас есть несколько потоков обработки (pipelines), которые обрабатывают входные данные и заливают их в ElasticSearch. Так же есть другие обработки, которые рассчитывают метрики, но они не связаны с ElasticSearch и поэтому я их не будут указывать.

Parsing → Kafka → Filter / Enrich → Kafka → ElasticSearch

Часть архитектуры
Часть архитектуры

Kafkabeat — это внутренее решение, написаное на Go с использованием родных библиотек от ElasticSearch и Sarama для Apache Kafka ( https://github.com/Shopify/sarama ).

Железо (34 сервера (CPU:80, RAM:384Gb) , 1 датацентр) и роли

  • 1 — Ingest

  • 8 — Hot ES (8x8 = 64 датаноды)

  • 12 — Warm ES (12x8 = 96 датанод)

  • 8 — Kafka

  • 5 — Processing (KafkaStreams)

  • 10Gb сетевые интерфейсы

Первые шаги

Архитектура кластера досталась мне в наследство as is и требовалось вносить изменения без остановки всего кластера. На момент начала планирования работ по расширению кластера он уже был перегружен и работал на максимуме возможностей этого архитектурного решения.

Поэтому первое что сделали:

  • Провели ревизию имеющегося железа

  • Измерили характеристики входного потока

  • Спрогнозировали будущие характеристики входного потока

и сели думать что делать дальше.

Варианты:

  1. Линейное увеличение железа

  2. Пересмотр процессинга

  3. Уход в облака

  4. Замена системы хранения

Вариант 1. Увеличение количества серверов

Был проведен стоимостной анализ с учетом стоимости серверов, жестких дисков, дисковых полок, стоимости аренды и обслуживания стоек в датацентрах, вендорное обслуживани и т. п. в горизонте на 10 лет.

Результат: дорого.

Но дополнительные 14 серверов мы получили для того, чтобы решить как текущие проблемы с работой кластера, так и с расчетом на изменения, которые требовалось придумать и провести

Вариант 2. Пересмотр процессинга

Была проведена большая работа по переработке процессинга, но с точки зрения ElasticSearch кластера важна только часть, касающаяся индексирования документов в ElasticSearch.

В начале нам надо было решить проблему с индексированием имеющихся объемов при имеющемся оборудовании. Как решение мы использование разделение потока на две части с учетом возраста входных данных (но можно было использовать и другие критерии). Критерий был выбран таким образом, чтобы существующий процессинг и скорость работы ElasticSearch кластера обеспечивали гарантированное индексирование данных в течение получаса после момента события, а оставшиеся данные обрабатываются в другом потоке и заносятся в ElasticSearch в течение дня.

Этот подход обеспечил наличие порядка 20-40% входных данных в праймтайм и получение всех данных с задержкой в 2-3 часа. И если раньше весь поток работал по принципу FIFO, что приводило к единому лагу для всех данных, то новое разделение на два FIFO потока позволило «приоритизировать» данные и обеспечить их оперативное наличие в конечном хранилище (т. е. принцип «хороша ложка к обеду» был соблюден — лучше иметь какие-либо данные сейчас, чем все данные, но через несколько часов)

Возраст входящих файлов
Возраст входящих файлов

Параллельно решали вопросы с тюнингом ElasticSearch и индексированием даных.

И в каких-то случаях рассматривали самые различные варианты подготовки данных и использование различных существующих инструментов для их занесения в ElasticSearch. В нашем случае Apache Kafka осталась непревзойденным механизмом для хранения данных и расспараллеливания загрузки этих данных в ElasticSearch.

Взяли Kafkabeat (внутренняя разработка), взяли расхваливаемый Vector (https://vector.dev/) и я накидал небольшую утилиту на C++ с использованием cppkafka (https://github.com/mfontanini/cppkafka) и elasticlient (https://github.com/seznam/elasticlient), rapidjson (https://github.com/Tencent/rapidjson/). Провели тестирование и получили что Kafkabeat самый медленный, Vector быстрее Kafkabeat на ~10% , а мое решение, получившее название Baikalbeat, перспективно, но требует доделки.

Замеры производительности решений Kafka -> ElasticSearch
Замеры производительности решений Kafka -> ElasticSearch

Пришлось глубже залезть в нутро этих механизмов и в следующих версиях, отказаться от: парсинга JSON, библиотек для ElasticSearch. И добавить несколько потоков. Парсинг JSON нужен был для того, чтобы из kafka-сообщений извлекать название ElasticSearch-индекса для вставки данных, но решил заменить это на поиск по подстроке. Библиотеку работы с kafka не стал трогать, так как к ней никаких претензий не было — тестовый миллион записей выкачивался за 9 секунд. А библиотеку для ElasticSearch заменил на небольшую имплементацию ElasticSearch Bulk API с POST запросами.

В результате тестов выяснили: Baikalbeat при пачках в 50K сообщений может легко прибить тестовые инстансы ElasticSearch, при пачках в 25-35K наибольшая производительность.

Когда мы заменили все наши инстансы Kafkabeat-ов, на Baikalbeat, то сразу получили требуемую скорость индексирования в 1500-1600K/s. Т.е. мы добились скорости индексирования данные в режиме реального времени и практически полное отсутствие лага на индексировании.

Лаг (разница между временем события и временем попаданием в ES) данных в ElasticSearch
Лаг (разница между временем события и временем попаданием в ES) данных в ElasticSearch

(Данные обычно к нам приходят с минимальной задержкой в 5 и более минут от момента события)

Скорость индексирования ElasticSearch (документов в секунду)
Скорость индексирования ElasticSearch (документов в секунду)

Вариант 3. Уход в облака

В ходе анализа облаков изучили разные варианты, провели консультирование с разными специалистами и вывод получился один: на текущий момент для нас альтернативы нет.

Что изучили более-менее детально:

  • Google Cloud

  • Microsoft Azure

  • Amazon AWS (включая managed ElasticSearch)

  • ElasticSearch Cloud

  • Yandex Cloud (включая Clickhouse, см. вариант 4)

Какие минусы:

  • Альтернативы Kibana Discovery режиму пока нет

  • Облака не подходят для хранения и обработки требуемых нам объемов данных в ElasticSearch

  • Все облачные ElasticSearch расчитаны на «средние» кластера и они хуже в управляемости, чем наши bare metal.

  • Стоимость решения эквивалентна или выше (sic!), чем просто покупка железа (вариант 1).

Вариант 4. Замена системы хранения

Мы изучили несколько вариантов:

  • Другие системы хранения и обработки данных и остановились на Clickhouse;

  • Другие системы хранения данных (NetApp).

Hidden text

Offtopic

Clickhouse — на мой взгляд чудесная система, создаваемая гениальными троешниками, которые создали СУБД мало зная про СУБД. (Ребята, если вы будете читать мою статью, то вы создали действительно хорошую систему, но за некоторые ее выкрутасы и архитектурные решения, особенности обновления и т.п. хотелось ее сразу выкинуть и забыть, так как детские проблемы из-за отсутствия опыта работы и знания крупных СУБД, а вот уши MySQL там торчали везде)

Clickhouse мы опробовали на части наших данных и он показал хорошую производительность, быстрые ответы на запросы, изумительный объем хранения данных (в ~6 меньше,чем ElasticSearch). Но затем начались сложности:

  • Как спланировать размер кластера с учетом нашей нагрузки. Пришлось делать замеры и свою методику расчета.

  • Как решать вопросы стабильности работы с kafka (решили)

  • Как решать вопросы стабильной работы TTL (решили)

  • Как восстанавливать после сбоев — тут всего не решили, опять же порой восстановление кластера через Zookeeper напоминало танцы с бубном.

  • Документации по сопровождению и решению проблем — практически нет.

  • Нет нормального визуального интерфейса, разве только Grafana. Всякие Tabix, Metabase, Redash — сыро и малопригодно для инженеров из других специализаций. Думал что хотя бы Yandex DataLens спасет — нет, там тоже многое печально и другая цель в визуализации информации.

Большое спасибо тем ребятам из Яндекс, кто общался со мной по вопросам создания Clickhouse кластера на 0,5-1Ptb, консультировал по DataLens.

В результате Clickhouse мы используем, но только как backend для graphite.

Про другие системы хранения. Посмотрели на SAS, SSD, NVME носители, NetApp. Нам предоставили тестовые хранилища на разных носителях — провели замеры для оценки производительности, а затем стали думать как вписаться в бюджет.

В результате остановились на NetApp хранилище с гибридной реализацией в объеме на >1Ptb для начала.

Реорганизация кластера

Получили мы дополнительные 14 серверов (CPU: 96, RAM: 1,5Tb) и NetApp для хранения данных. NetApp подключен ко всем 48 серверам.

Шаги:

  1. Подготовка Ansible ролей и плейбуков

  2. Первоначальная настройка серверов

  3. Настройка сервисов и создание нового кластера (Kafka, processing, ElasticSearch)

  4. Перенастройка процессинга для перенаправления части пайплайнов в новый кластер

  5. Мониторинг

Запустили новый кластер на части серверов (3-5) и стали изучать его работу. В начале в новом кластере подняли весь процессинг, а потом запустили туда копирование данных. В начале это сделали через процессинг на входном сервера, а потом улучшили схему: подключили NetApp через NFS на входной сервер, стали туда складывать файлы, информацию о файла в Kafka topic, затем с помощью kafkamirror копируем данные в новый кластер новую Kafka. В результате новый кластер начал получать данные без серьезной нагрузки на старый кластер.

Дальнейшие шаги должны были быть такими:

  1. Мониторинг в течение 1 недели;

  2. Отключение процессинга данных в старом кластере;

  3. Подготовка нового пайплайна, включение его и мониторинга в новом кластере;

  4. Перераспределение ролей серверов для добавления дополнительных Kafka / ElasticSearch нод, если потребуется.

Но пришлось внести коррективы, так как после переноса еще пары пайплайнов мы столкнулись с деградацией скорости работы всего нового кластера:

  • Kafka стала медленнее работать;

  • ElasticSearch стал медленнее проводить индексацию и остальные операции;

  • Включение дополнительных серверов в Kafka / ElasticSearch кластеры лишь усугубили ситуацию.

Стали изучать ситуацию и выяснили что мы вышли на пороговые значения производительности NetApp хранилища. Родного мониторинга NetApp у нас нет, поэтому сделали запрос к вендору и от него получили отчет по почти 100% утилизации NetApp. Тем самым мы должны были пересмотреть архитектуру кластеров.

Провели анализ использования дисковых операций и выяснили что наиболее прожорливым оказался ElasticSearch (я вот предполагал что это будет Kafka). В качестве решения проблемы мы перенастроили 2-3 старых сервера на SAS винтах в качестве Hot датанод для ElasticSearch, а ElasticSearch датаноды на новых серверах с NetApp в качестве хранилища стали Warm. И проблема с утилизацией NetApp сразу исчезла — утилизация снизилась до 30-40%. На удивление производительность датанод на SAS оказалась не сильно меньше производительности датанод на SSD.

Рекомендации

  • Внимательно читать документацию на EleastiSearch, документация них настолько хороша (https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html), что толку от платной поддержки особо нет.

  • Разделять физически или ограничивать лимитами датаноды на использование ресурсов, оптимальнее всего использовать docker контейнеры (в чистом виде или в k8s), к сожалению, ElasticSearch не может использовать эффективно большие объемы памяти.

  • Для максимальной скорости индексации желательно:

    • Использовать роли Hot, Warm в рамках ILM

    • Под Hot выделять наиболее быстрые хранилища, причем следует учитывать что размер этих хранилищ следует рассчитывать исходя из требуемой скорости индексирования (у нас это 30-35K/s на датаноду), количества датанод и времени хранения данных в стадии Hot (в нашем случае это 1 день), т. е.:

      • Общий объем хранилища для N часов = Размер записи * Количество записей за период N часов * K1

      • Количество датанод = Требуемая производительность / Производительность одной датаноды

      • Объем хранилища для одной датаноды = Общий объем хранилища для N часов / Количество датанод

      • K1 — коэффициент для учета watermark-ов, времени миграции данных с Hot датанод на Warm датаноды. Рекомендую использовать равным 1.2, но если у вас большой поток и пока первый индекс переливается, то успеваете получить новые данные в схожем объеме, то используйте коэффициент 2-2.2

    • Индексы разделять на шарды по количеству Hot-датанод, следует так же не забывать про параметры total_shards_per_node (> 1), чтобы при выходе из строя 1-2 датанод не положить весь процессинг.

    • Исключить реплицирование в Hot индексах

    • Использовать ILM для мигрирования индексов на Warm / Cold датаноды без переиндексирования, но с созданием реплик при необходимости (в нашем случае на Netapp наличие реплик не требуется)

    • Рекомендации для индексов:

      • использовать mappings

      • установить refresh_interval в достаточное для работы значение (у нас это сейчас 300s)

      • уменьшить количество скидываний translog ( translog.sync_interval в минуты)

    • При больших объемах данных не следует забыть о параметре max_shards_per_node в cluster/settings

    • Так же будет проблема в ElasticSearch с обработкой большого количества входящих запросов, следовательно надо внести правки для pool_threads

  • Для оперативного мониторинга достаточно использовать несколько характеристик:

    • статус кластера и время пребывания в этих статусах

    • количество датанод

    • количество мастернод

    • количество ILM ошибок

    • индексы с размером более 50Гб

    • датаноды с наибольшим заполнением хранилища

  • Для расследований и оптимизации желательно мониторить почти все характеристики как ElasticSearch, так и хостов: от скорости индексирования, состояния каждой датаноды, использования памяти, процессора и т. д. Встроенный мониторинг в ElasticSearch порой не дает достаточной информации.

Заключение

Большое спасибо всем ребятам, с которым я сопровождал этот и другие кластеры:

Дмитрий (за настройку всего и решение всевозможных проблем), Стас (за разработку и модернизацию процессинга), Николай (за разработку и модернизацию мониторинга, процессинга), Михаил (за тестирование, отладку Baikalbeat и нового кластера в целом).

Источник: https://habr.com/ru/post/716430/


Интересные статьи

Интересные статьи

Привет, Хабр!Меня зовут Сергей Исупов, я Data Scientist и являюсь участником профессионального сообщества NTA. В рамках данной публикации я постарался не только поделиться своим практическим опы...
Привет, мы команда СберМегаМаркета, и это обзорная статья о нашей площадке, пробный камень для блога Хабре. За нашими плечами спешный переезд с PHP на GO, ребрендинг и решение таких задач, с которыми ...
Всем привет! Представляю вашему вниманию новую версию программы KeyClusterer, предназначенной для группировки семантического ядра методами Hard и Soft. Читать дальше &rarr...
Последние ~полгода для работы с Cassandra в Kubernetes мы использовали Rook operator. Однако, когда нам потребовалось выполнить весьма тривиальную, казалось бы, операцию: поменять параметры в...
Здравствуйте. Я уже давно не пишу на php, но то и дело натыкаюсь на интернет-магазины на системе управления сайтами Битрикс. И я вспоминаю о своих исследованиях. Битрикс не любят примерно так,...