Возможности языка Q и KDB+ на примере сервиса реального времени

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.
О том, что такое база KDB+, язык программирования Q, какие у них есть сильные и слабые стороны, можно прочитать в моей предыдущей статье и кратко во введении. В статье же мы реализуем на Q сервис, который будет обрабатывать входящий поток данных и высчитывать поминутно различные агрегирующие функции в режиме “реального времени” (т.е. будет успевать все посчитать до следующей порции данных). Главная особенность Q состоит в том, что это векторный язык, позволяющий оперировать не единичными объектами, а их массивами, массивами массивов и другими сложносоставными объектами. Такие языки как Q и родственные ему K, J, APL знамениты своей краткостью. Нередко программу, занимающую несколько экранов кода на привычном языке типа Java, можно записать на них в несколько строк. Именно это я и хочу продемонстрировать в этой статье.



Введение


KDB+ — это колоночная база данных, ориентированная на очень большие объемы данных, упорядоченные определенным образом (в первую очередь по времени). Используется она, в первую очередь, в финансовых организациях – банках, инвестиционных фондах, страховых компаниях. Язык Q – это внутренний язык KDB+, позволяющий эффективно работать с этими данными. Идеология Q – это краткость и эффективность, понятность при этом приносится в жертву. Обосновывается это тем, что векторный язык в любом случае будет сложен для восприятия, а краткость и насыщенность записи позволяет увидеть на одном экране гораздо большую часть программы, что в итоге облегчает ее понимание.

В статье мы реализуем полноценную программу на Q и вам, возможно, захочется попробовать ее в деле. Для этого вам понадобится собственно Q. Скачать бесплатную 32-битную версию можно на сайте компании kx – www.kx.com. Там же, если вам интересно, вы найдете справочную информацию по Q, книгу Q For Mortals и разнообразные статьи на эту тему.

Постановка задачи


Есть источник, который присылает каждые 25 миллисекунд таблицу с данными. Поскольку KDB+ применяется в первую очередь в финансах, будем считать, что это таблица сделок (trades), в которой есть следующие колонки: time (время в миллисекундах), sym (обозначение компании на бирже – IBM, AAPL,…), price (цена, по которой куплены акции), size (размер сделки). Интервал 25 миллисекунд выбран произвольно, он не слишком маленький и не слишком большой. Его наличие означает, что данные приходят в сервис уже буферизованные. Можно было бы легко реализовать буферизацию на стороне сервиса, в том числе динамическую, зависящую от текущей нагрузки, но для простоты остановимся на фиксированном интервале.

Сервис должен считать поминутно для каждого входящего символа из колонки sym набор агрегирующих функций – max price, avg price, sum size и т.п. полезную информацию. Для простоты мы положим, что все функции можно вычислять инкрементально, т.е. для получения нового значения достаточно знать два числа – старое и входящее значения. Например, функции max, average, sum обладают этим свойством, а функция медиана нет.

Также мы предположим, что входящий поток данных упорядочен по времени. Это даст нам возможность работать только с последней минутой. На практике достаточно уметь работать с текущей и предыдущей минутами на случай, если какие-то апдейты запоздали. Для простоты мы не будем рассматривать этот случай.

Агрегирующие функции


Ниже перечислены необходимые агрегирующие функции. Я взял их как можно больше, чтобы увеличить нагрузку на сервис:

  • high – max price – максимальная цена за минуту.
  • low – min price – минимальная цена за минуту.
  • firstPrice – first price – первая цена за минуту.
  • lastPrice – last price – последняя цена за минуту.
  • firstSize – first size – первый размер сделки за минуту.
  • lastSize – last size — последний размер сделки за минуту.
  • numTrades – count i – число сделок за минуту.
  • volume – sum size – сумма размеров сделок за минуту.
  • pvolume – sum price – сумма цен за минуту, необходимо для avgPrice.
  • turnover – sum price*size – суммарный объем сделок за минуту.
  • avgPrice – pvolume%numTrades – средняя цена за минуту.
  • avgSize – volume%numTrades – средний размер сделки за минуту.
  • vwap – turnover%volume – взвешенная по размеру сделки средняя цена за минуту.
  • cumVolume – sum volume – накопленный размер сделок за все время.

Сразу обсудим один неочевидный момент – как инициализировать эти колонки в первый раз и для каждой следующей минуты. Некоторые колонки типа firstPrice каждый раз нужно инициализировать значением null, их значение не определено. Другие типа volume нужно устанавливать всегда в 0. Еще есть колонки, которые требуют комбинированного подхода – например, cumVolume необходимо копировать из предыдущей минуты, а для первой установить в 0. Зададим все эти параметры используя тип данных словарь (аналог записи):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

Я добавил sym и time в словарь для удобства, теперь initWith – это готовая строчка из финальной агрегированной таблицы, где осталось задать правильные sym и time. Можно использовать ее для добавления новых строк в таблицу.

aggCols нам понадобится при создании агрегирующей функции. Список необходимо инвертировать из-за особенностей порядка вычислений выражений в Q (справа налево). Цель – обеспечить вычисление в направлении от high к cumVolume, поскольку некоторые колонки зависят от предыдущих.

Колонки, которые нужно скопировать в новую минуту из предыдущей, колонка sym добавлена для удобства:

rollColumns:`sym`cumVolume;

Теперь разделим колонки на группы согласно тому, как их следует обновлять. Можно выделить три типа:

  1. Аккумуляторы (volume, turnover,..) – мы должны прибавить входящее значение к предыдущему.
  2. С особой точкой (high, low, ..) – первое значение в минуте берется из входящих данных, остальные считаются с помощью функции.
  3. Остальные. Всегда считаются с помощью функции.

Определим переменные для этих классов:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Порядок вычислений


Обновлять агрегированную таблицу мы будем в два этапа. Для эффективности мы сначала ужмем входящую таблицу так, чтобы там осталась одна строка для каждого символа и минуты. То, что все наши функции инкрементальные и ассоциативные, гарантирует нам, что результат от этого дополнительного шага не изменится. Ужать таблицу можно было бы с помощью селекта:

select high:max price, low:min price … by sym,time.minute from table

У этого способа есть минус – набор вычисляемых колонок задан заранее. К счастью, в Q селект реализован и как функция, куда можно подставить динамически созданные аргументы:

?[table;whereClause;byClause;selectClause]

Я не буду подробно описывать формат аргументов, в нашем случае нетривиальными будут только by и select выражения и они должны быть словарями вида columns!expressions. Таким образом, ужимающую функцию можно задать так:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Для понятности я использовал функцию parse, которая превращает строку с Q выражением в значение, которое может быть передано в функцию eval и которое требуется в функциональном селекте. Также отметим, что preprocess задана как проекция (т.е. функция с частично определенными аргументами) функции селект, один аргумент (таблица) отсутствует. Если мы применим preprocess к таблице, то получим ужатую таблицу.

Второй этап – это обновление агрегированной таблицы. Напишем сначала алгоритм в псевдокоде:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

В Q вместо циклов принято использовать функции map/reduce. Но поскольку Q – векторный язык и все операции мы можем спокойно применять ко всем символам сразу, то в первом приближении мы можем обойтись вообще без цикла, проделывая операции со всеми символами сразу:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Но мы можем пойти и дальше, в Q есть уникальный и исключительно мощный оператор – оператор обобщенного присваивания. Он позволяет изменить набор значений в сложной структуре данных используя список индексов, функций и аргументов. В нашем случае он выглядит так:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

К сожалению, для присвоения в таблицу нужен список строк, а не колонок, и приходится транспонировать матрицу (список колонок в список строк) с помощью функции flip. Для большой таблицы это накладно, поэтому вместо этого применим обобщенное присваивание к каждой колонке отдельно, используя функцию map (которая выглядит как апостроф):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Мы снова используем проекцию функции. Также заметьте, что в Q создание списка – это тоже функция и мы можем вызвать ее с помощью функции each(map), чтобы получить список списков.

Чтобы набор вычисляемых колонок не был фиксирован, создадим выражение выше динамически. Определим сначала функции для вычисления каждой колонки, используя переменные row и inp для ссылки на агрегированные и входные данные:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Некоторые колонки особые, их первое значение не должно вычисляться функцией. Мы можем определить, что оно первое по колонке row[`numTrades] – если в ней 0, то значение первое. В Q есть функция выбора — ?[Boolean list;list1;list2] – которая выбирает значение из списка 1 или 2 в зависимости от условия в первом аргументе:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Тут я вызвал обобщенное присваивание с моей функцией (выражение в фигурных скобках). В нее передается текущее значение (первый аргумент) и дополнительный аргумент, который я передаю в 4-м параметре.

Отдельно добавим аккумуляторные колонки, поскольку для них функция одна и та же:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Это обычное по меркам Q присваивание, только присваиваю я сразу список значений. Наконец, создадим главную функцию:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Этим выражением я динамически создаю функцию из строки, которая содержит выражение, которое я приводил выше. Результат будет выглядеть так:

{[aggTable;idx;inp] rows:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Порядок вычисления колонок инвертирован, поскольку в Q порядок вычисления справа налево.

Теперь у нас есть две основные функции, необходимые для вычислений, осталось добавить немного инфраструктуры и сервис готов.

Финальные шаги


У нас есть функции preprocess и updateAgg, которые делают всю работу. Но необходимо еще обеспечить правильный переход через минуты и вычислить индексы для агрегации. В первую очередь определим функцию init:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

Также определим функцию roll, которая будет менять текущую минуту:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Нам понадобится функция для добавления новых символов:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

И, наконец, функция upd (традиционное название этой функции для Q сервисов), которая вызывается клиентом, для добавления данных:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

Вот и все. Вот полный код нашего сервиса, как и обещалось, всего несколько строк:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Тестирование


Проверим производительность сервиса. Для этого запустим его в отдельном процессе (поместите код в файл service.q) и вызовите функцию init:

q service.q –p 5566

q)init[]

В другой консоли запустите второй Q процесс и подсоединитесь к первому:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

Сначала создадим список символов – 10000 штук и добавим функцию для создания случайной таблицы. Во второй консоли:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Я добавил в список символов три настоящих, чтобы было удобнее искать их в таблице. Функция rnd создает случайную таблицу с n строками, где время меняется от t до t+25 миллисекунд.

Теперь можно попробовать послать данные в сервис (добавим первые десять часов):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Можно проверить в сервисе, что таблица обновилась:

\c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Результат:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Проведем теперь нагрузочное тестирование, чтобы выяснить сколько данных сервис может обрабатывать в минуту. Напомню, что мы установили интервал для апдейтов в 25 миллисекунд. Соответственно, сервис должен (в среднем) укладываться хотя бы в 20 миллисекунд на апдейт, чтобы дать время пользователям запросить данные. Введите следующее во втором процессе:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 – это две минуты. Можно попробовать запустить сначала для 1000 строк каждые 25 миллисекунд:

start 1000

В моем случае результат получается в районе пары миллисекунд на апдейт. Так что я сразу увеличу количество строк до 10.000:

start 10000

Результат:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Снова ничего особенного, а ведь это 24 миллиона строк в минуту, 400 тысяч в секунду. Больше 25 миллисекунд апдейт тормозил только 5 раз, видимо при смене минуты. Увеличим до 100.000:

start 100000

Результат:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Как видим, сервис едва справляется, но тем не менее ему удается удержаться на плаву. Такой объем данных (240 миллионов строк в минуту) чрезвычайно велик, в таких случаях принято запускать несколько клонов (или даже десятков клонов) сервиса, каждый из которых обрабатывает только часть символов. Тем не менее, результат впечатляющий для интерпретируемого языка, который ориентирован в первую очередь на хранение данных.

Может возникнуть вопрос, почему время растет нелинейно вместе с размером каждого апдейта. Причина в том, что ужимающая функция – это фактически С функция, которая работает гораздо эффективнее updateAgg. Начиная с какого-то размера апдейта (в районе 10.000), updateAgg достигает своего потолка и дальше ее время выполнения не зависит от размера апдейта. Именно за счет предварительного шага Q сервис в состоянии переваривать такие объемы данных. Это подчеркивает, насколько важно, работая с большими данными, выбирать правильный алгоритм. Еще один момент – правильное хранение данных в памяти. Если бы данные хранились не по-колоночно или не были упорядочены по времени, то мы бы познакомились с такой вещью, как TLB cache miss – отсутствие адреса страницы памяти в кэше адресов процессора. Поиск адреса занимает где-то в 30 раз больше времени в случае неудачи и в случае рассеянных данных может замедлить сервис в несколько раз.

Заключение


В этой статье я показал, что база KDB+ и Q пригодны не только для хранения больших данных и простого доступа к ним через селект, но и для создания сервисов обработки данных, которые способны переваривать сотни миллионов строк/гигабайты данных даже в одном отдельно взятом Q процессе. Сам язык Q позволяет исключительно кратко и эффективно реализовывать алгоритмы, связанные с обработкой данных за счет своей векторной природы, встроенного интерпретатора диалекта SQL и очень удачного набора библиотечных функций.

Я замечу, что изложенное выше, это лишь часть возможностей Q, у него есть и другие уникальные особенности. Например, чрезвычайно простой IPC протокол, который стирает границу между отдельными Q процессами и позволяет объединять сотни этих процессов в единую сеть, которая может располагаться на десятках серверов в разных концах света.
Источник: https://habr.com/ru/company/dbtc/blog/470596/


Интересные статьи

Интересные статьи

Недавно мы анонсировали ряд улучшений для Mac, в частности новые версии приложений Microsoft 365 для компьютеров Mac на базе процессора M1. Благодаря этому повысится производительност...
Для обычных процессорных языков уже написано довольно много спецификаций для Ghidra, однако для графических ничего нет. Оно и понятно, ведь там своя специфика: предикаты, константы, ч...
В процессе работы над очередным проектом в команде возникли споры по поводу использования формата XML или SQL в Liquibase. Естественно про Liquibase уже написано много статей, но как всегда хочет...
В отличие от рассылки, подписчик сам определял, когда он хочет получить следующую запись — мгновенно после предыдущей или через неделю. Для этого он отправлял пустое письмо на специальный адрес. ...
В предыдущей статье Где хранятся ваши константы на микроконтроллере CortexM (на примере С++ IAR компилятора), был разобран вопрос о том, как расположить константные объекты в ROM. Теперь же я...