Разработчики потоков обработки данных на сервисе Apache NiFi знают, что все интересующие события процессоров можно вывести в интерфейс в раздел "Bulletin Board". Однако, если вы в текущий момент не следите за интерфейсом, увидеть сообщение становится невозможным.
Apache NiFi хранит логи (по умолчанию) в папке "./logs". Процессоры пишут свои события в файл "nifi-app.log". И если у вас есть доступ к каталогу с логами, то никаких трудностей нет - открываем файл, читаем, фиксим поток. А если у вас нет доступа? Или есть желание оперативно получить сведения о событии, чтобы понять контекст его возникновения? Ситуации бывают разные. Рассмотрим простой способ получить информацию из лога.
Иные варианты получения данных из логов
Применяем filebeat - ставим, настраиваем на папку, шлем логи в ELK.
Настраиваем logback.xml - применяем логгер, отправляющий данные по сети на LogStash, например стандартный или кастомный.
Для чтения файла лога воспользуемся процессором TailFile, далее конвертируем полученные данные, разобьём на отдельные записи, извлечем данные в атрибуты и отправим себе в канал Slack:
Настройки TailFile - читаем один файл, указываем его расположение, начальную позицию и периодичность:
Далее преобразуем полученные данные с помощью ConvertRecord:
Чтение данных выполняется с помощью GrokReader, позволяющего структурировать текстовые данные. О Grok есть много публикаций, например тут от @chemtech.
В настройках указываем шаблон, и выставляем параметр, добавляющий данные к предыдущему сообщению, если они не подходят под шаблон.
Структуре лога соответствует выражение:
%{TIMESTAMP_ISO8601:date} %{LOGLEVEL:Level} \[%{DATA:thread}] %{DATA:logger} %{GREEDYDATA:message}
В итоге имеем Json вида:
{
"date" : "2022-06-01 23:34:02,905",
"Level" : "INFO",
"thread" : "pool-10-thread-1",
"logger" : "o.a.n.c.r.WriteAheadFlowFileRepository",
"message" : "Initiating checkpoint of FlowFile Repository",
"stackTrace" : null,
"_raw" : "ds (Stop-the-world time = 2 milliseconds, ..."
}
Извлекаем данные из контента. в атрибуты с помощью EvaluateJsonPath:
Далее шлем сообщения в канал Slack:
Пример отправки в Slack:
В заключение скажу, что этот подход не является моей личной разработкой, известен давно, и можно найти различные примеры потоков Nifi, читающих логи. Как вариант, можно слать логи не в Slack, а в телеграмм или Elastic.
Такая структура не является панацеей, и не претендует на уровень production. По хорошему, все логи надо обрабатывать через инфраструктуру, для этого предназначенную - ELK, Zabbix и т.д.
Каким образом я использую такой вариант:
При разработке потока создаю свой уровень логирования, и отлавливаю определенные события через LogEvent / LogAttribute.
При долговременом тестировании потоков (test / stage) отлавливаю ошибки.
На проде ловлю ошибки загрузки, чтобы успеть быстро загрузить вручную данные и успеть до старта загрузки DWH.
Полезные ссылки:
Документация Apache Nifi.
Полезный YouTube канал, где я и увидел принцип построения такого потока.
Сообщество в телеграмм
Всем добра.