Golang-генератор TSV данных для импорта в ClickHouse

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Занимаясь написанием статьи о Data Vault по разведению кроликов, возникла потребность сгенерировать много данных для ClickHouse. Все генераторы, что смотрел - так и не придумал как сделать 50ГБ данных быстро и эффективно с их помощью. Поэтому решил развлечься и субботний день провести со старым другом. Сразу скажу - я не занимаюсь разработкой на Golang. Это скорее хобби. Так что прошу не судить строго.

Итак, задача:

  • сгенерировать данные для Data Vault от точки N до текущего дня

  • сделать это в формате TSV, так как ClickHouse просто молниеносно грузит данный формат

  • генерация должна быть быстрой

  • данных должно быть много

  • должны быть поддержаны связи между таблицами

Также приложу схему Vault, для которого все делается. Описание можно почитать по ссылке в начале статьи.

В общем звучит все отвратительно. Если зависимости - значит память. А как без них в БД.

После долгих раздумий за чашкой кофе, я сформулировал для себя концепт будущей утилиты:

  • данные должны поточно писаться на диск в момент формирования, и пропадать из памяти

  • в памяти должен храниться только какой то небольшой интервал времени

Далее было принято решение реализовывать подобие пайпов с использованием потоков Golang, которые должны были генерировать первичные ключи и временной ряд.

Первым делом набросал структуру, которая будет занята генерацией:

type Context struct {
    pk int
    timestamp_ time.Time
    hash string
    source_ string
    RandomHash []string
}

type TableGenerator struct {
    files map[string]*os.File
    numPipe <-chan int
    datePipe <-chan time.Time
    context Context
}

Основная структура TableGenerator собственно будет хранить ссылки на файлы, в которые идет запись, пайпы, а также контекст.

Первым дело создадим два метода, которые позволят инициализировать структуру, а также добавлять файлы для записи.

// Инициалиазция
func (t *TableGenerator) Init() {
    t.files = make(map[string]*os.File)
}

// Открыть файл для очередной таблицы
func (t TableGenerator) AddFile(tableName string) *os.File {
    f, err := os.Create(fmt.Sprintf("tables/%s.tsv", tableName))    
    if err != nil {
        panic(fmt.Sprintf("TableGenerator.AddFile: %s", err.Error()))
    }
    t.files[tableName] = f
    return t.files[tableName]
}

Первичная инициализация структуры и открытие файлов в главном методе приложения выглядит так:

var generator TableGenerator
generator.Init()
for _, tbl := range tableList {
    f := generator.AddFile(tbl)
    defer  f.Close()
}

После того как файлы открыты, нам потребуется рекурсивный метод по наполнению таблиц. На верхнем уровне мы будем бежать по таблицам родителям - h_cities - и спускаться в каждом новом уровне ниже до h_animals. Таким образом на каждую строку верхнего уровня, будет получен набор дочерних. И мы сможем последовательно писать данные спускаясь вниз и возвращаясь обратно к родителю. Таким образом на нижнем уровне рекурсии достаточно знать контекст только одного родителя, что нам и требуется - меньшее потребление памяти.

  • горда (1 уровень)

  • фермы для городов (2 уровень)

  • кролики для ферм (3 уровень)

Сигнатура метода проста:

func WriteTable (table Table, generator TableGenerator) {
  ...
}

table - это некие метаданные таблицы. Их я принял решение генерировать из JSON на базе такой структуры:

type Field struct {
    Name string `json:"name"`
    Type string `json:"type"`
    Values []string `json:"values"`
    Const string `json:"const"`
    Delta int `json:"delta"`
    ConcatWithParent int `json:"concatWithParent"`
    LinkField string `json:"linkField"`
}

type Table struct {
    Name string `json:"name"`
    LinkName string `json:"linkName"`
    Rownum int `json:"rownum"`
    MinRownum int `json:"minRownum"`
    MaxRownum int `json:"maxRownum"`
    Fields []Field `json:"fields"`
    Childs []Table `json:"childs"`
}

type DataBase struct {
    Tables []Table `json:"tables"`
}
Кому интересен пример моего JSON

{
	"tables": [
		{
			"name": "h_cities",
			"rownum": 10,
			"fields": [
				{"name": "city_code", "type": "pk"},
				{"name": "city_hsh", "type": "hash"}, 
				{"name": "timestamp_", "type": "timestamp", "delta": 3000}, 
				{"name": "source_", "type": "string", "const":"R"}
			],
			"childs": [
				{
					"name": "h_farms",
					"linkName": "l_city_farms",
					"minRownum": 1,
					"maxRownum": 10,
					"fields": [
						{"name": "farm_num", "type": "pk", "concatWithParent": 1},
						{"name": "farm_hsh", "type": "hash"}, 
						{"name": "timestamp_", "type": "timestamp"}, 
						{"name": "source_", "type": "string", "const":"R"}
					],
					"childs": [
						{
							"name": "h_animals",
							"linkName": "l_animal_farms",
							"minRownum": 1,
							"maxRownum": 100,
							"fields": [
								{"name": "animal_num", "type": "pk", "concatWithParent": 1},
								{"name": "animal_hsh", "type": "hash"}, 
								{"name": "timestamp_", "type": "timestamp", "start": "parent"}, 
								{"name": "source_", "type": "string", "const":"R"}
							],
							"childs": [
								{
									"name": "s_animal_attrs",
									"type": "satellite",
									"rownum": 1,
									"fields": [
										{"name": "city_hsh", "type": "copy", "linkField": "parent.hash"},
										{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
										{"name": "source_", "type": "copy", "linkField": "parent.source_"},
										{"name": "sex", "type": "list", "values": ["М", "Ж"]},
										{"name": "color", "type": "list", "values": ["Черный", "Белый", "Красный"]},
										{"name": "birthdate", "type": "copy", "linkField": "timestamp_"}
									]
								},
								{
									"name": "s_animal_lifecycle",
									"type": "satellite",
									"rownum": 1,
									"fields": [
										{"name": "city_hsh", "type": "copy", "linkField": "parent.hash"},
										{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
										{"name": "source_", "type": "copy", "linkField": "parent.source_"},
										{"name": "status", "type": "list", "values": ["Жив", "Мертв", "Продан", "Продан живым"]}
									]
								},
								{
									"name": "l_animal_tree",
									"type": "recursion",
									"rownum": 1,
									"fields": [
										{"name": "animal_hsh", "type": "copy", "linkField": "parent.hash"},
										{"name": "animal_mother_hsh", "type": "parentRandom"},
										{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
										{"name": "source_", "type": "copy", "linkField": "parent.source_"}
									]
								}
							]
						},
						{
							"name": "l_farm_referals",
							"rownum": 1,
							"fields": [
								{"name": "ref_farm_hsh", "type": "copy", "linkField": "parent.hash"},
								{"name": "attract_farm_hsh", "type": "parentRandom"},
								{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
								{"name": "source_", "type": "copy", "linkField": "parent.source_"}
							]
						}
					]
				},
				{
					"name": "s_city_attrs",
					"type": "satellite",
					"rownum": 1,
					"fields": [
						{"name": "city_hsh", "type": "copy", "linkField": "parent.hash"},
						{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
						{"name": "source_", "type": "copy", "linkField": "parent.source_"},
						{"name": "name_", "type": "string"}
					]
				}
			]
		}
	]
}

Итак файлы открыты, начинаем обход JSON. Первым идет заполнение таблицы верхнего уровня - h_cities - она самая маленькая.

Каждая таблица, перед тем как писаться должна инициализировать пайпы в соответствии с типами полей. Поэтому обходим структуру таблиц:

// откроем пайпы для полей разного типа
for _, fld := range table.Fields {
    switch (fld.Type) {
        case "pk":
            generator.numPipe = GenKey(1, rownum)
        case "timestamp":
            sdate := generator.context.timestamp_
            // если есть дельта - то нужно отнять от текущего ее и использовать эту дату для старта timeline
            if (fld.Delta > 0) {
                sdate = time.Now().AddDate(0, 0, -fld.Delta)
            }
            generator.datePipe = GenTimeline(1, rownum, sdate)
    }
}

Т.е если для генерируемого поля - указан тип pk (первичный ключ), то стартуем пайпу с генерацией числа от 1 до количества строк в таблице. Если тип timestamp - то стартуем временной ряд.

Какие правила выбраны для генерации даты:

  • для таблиц верхнего уровня (h_cities) указывается delta -N дней от текущей даты. От нее начинают генерироваться даты создания городов

  • для таблиц уровнем ниже - h_farms берется интервал от даты генерации города до текущего дня. Т.е пока город существует и до сего дня - могут открываться фермы

  • ну и так далее вниз по иерархии. Переменная generator.context.timestamp_ как раз и хранит дату родительской таблицы.

Ну и собственно сама реализация пайпов:

// Генерируем последовательность чисел от snum до enum
func GenKey(snum, enum int) <-chan int {
    numPipe := make(chan int)
    // функция - поток, которая будет играть роль пайпы для генерации числового ряда
    go func() { 
        for i := snum; i <= enum; i++ {
            numPipe <- i
        }
        close(numPipe)
    }()
    return numPipe
}

// Данный пайплайн нужен, чтобы реализовать механизм генерации по временному ряду
func GenTimeline(snum, enum int, sdate time.Time) <-chan time.Time {
    timePipe := make(chan time.Time)
    // функция - поток, которая будет играть роль пайпы для генерации временного ряда
    go func() {
        // первым этапом сделаем равномерный ряд (неравномеризацию будем накручивать следом)
        // просто рассчитаем шаг каждой строки и будем его прибавлять к стартовой дате
        step := (time.Now().Sub(sdate).Hours() / 24)/(float64)(enum - snum + 1)
        for i := snum; i <= enum; i++ { 
            timePipe <- sdate.AddDate(0, 0, (int)(step * (float64)(i)))
        }
        close(timePipe)
    }()
    return timePipe
}

Первый метод прост - он выкидывает по требованию основного потока число от 1 до N. Это сделано для того, чтобы разгрузить основной код от счетчиков и прочего ненужного хлама.

Второй поинтереснее. Он генерирует дату от T до текущей. Делает это равномерно. Таким образом, работая в паре, оба эти метода будут возвращать в основной код данные для последовательной записи, но при этом основной алгоритм остается чист от хранения какой либо ненужной промежуточной информации о текущем этапе генерации данных для конкретной таблицы.

После того как пайпы открыты - приступаем к формированию строки

for _, fld := range table.Fields {
  ...
  switch (fld.Type) {
      case "pk":
          str = strconv.Itoa(<-generator.numPipe)
          pk = str
      case "timestamp":
          ts = <-generator.datePipe
          str = ts.Format("2006-01-02")
          timestamp = str
      case "string":
          str = GenString()
      case "list":
          str = GenStringByList(fld.Values)
      case "copy":
          str = fieldsMap[fld.LinkField]
      case "parentRandom":
          str = generator.context.RandomHash[rand.Intn(len(generator.context.RandomHash))]
      default:
          str = "UNKNOWN_COLUMN_TYPE"
  }
  ...
  fields = append(fields, str)
}

Все сгенерированные поля сохраняются в срез fields.

Часть генераторов - это просто функции, а не потоки. Сделано это потому, что они не должны хранить контекста на протяжении всего заполнения таблицы - поэтому и не нужно усложнять. А вот первичный ключ и временной ряд содержат контекст и его удобнее хранить в отдельном параллельном методе.

Особое внимание стоит обратить на тип поля parentRandom. Это механизм обеспечения связки со случайным родителем. Поле generator.context.RandomHash хранит случайные N хэшей родителя, и когда надо мы просто читаем один случайный. Таким образом рекурсия + небольшое окно хэшей помогает нам рандомизировать связи в БД. Что дает интересный эффект. Так как данные пишутся упорядочено, то данные Хэши родителей всегда созданы ранее, чем дети. Что делает данные еще и логичными.

После того как срез полей заполнен, раскрываем его в метод записи - это просто запись данных на ФС в формате TSV, ничего интересного внутри нет:

// записываем основную таблицу
TSVWriteLine(generator.files[table.Name], fields...)

Последним самым интересным моментом является отправка всего этого в рекурсию, чтобы создавать сложны структуры одним универсальным методом:

// уходим в рекурсию по дочерним таблицам
for _, tbl := range table.Childs {
    WriteTable(tbl, generator)
}

Как итог. 200 строк кода способные генерировать 1ГБ связанных данных в минуту. Примерно с такой же скоростью эти данные грузятся в ClickHouse, мне показалось даже быстрее.

Потребление ресурсов:

В основном запись на диск. Пока формировались 50ГБ данных ничего не менялось.

Сами данные выглядят как то так:

h_cities
h_cities

Возьмем 69 город и посмотрим его фермы:

h_farms
h_farms

Как видите, несмотря на то, что в коде вообще нет огромных таблиц для создания связей, связи создались успешно + время создания ферм стартует от даты создания города.

Как итог - мы добились последовательной генерации больших объемов связанных данных в 200 строк кода. За счет:

  • пайпов, которые вынесли контекст и не пришлось создавать кучу переменных для каждой таблицы

  • последовательной генерации по временному ряду

  • рекурсии - каждая дочерняя таблица создается тем же кодом что и родительская

  • помещения конфига в адский JSON

Ну и в основной статье я обещал поведать откуда взялись такие странные названия городов - страстный гусь и страстный тапок. Тут все просто. Чтобы все сделать быстро и хорошо провести время, я использовал простой метод:

// Генерируем случайную строку
func GenString() string {
    // TODO утащить это в настроечный файл
    rndTable := [][]string {{"Милый", "Красный", "Малый", "Большой", "Страстный", "Кривой", "Высокий", "Томный", "Хромой","Отличный",
                             "Ужасный", "Великий", "Нижний", "Верхний", "Суровый", "Крошечный", "Мытарский",
                             "Упоротый", "Пьяный", "Шебутной", "Воскресший", "Наивный", "Хвостатый", "Няшный"},
                            {"Рог", "Нос", "Хряк", "Жук", "Зуб", "Рот", "Утес", "Яр", "Мост","Журавль", "Слон", "Конь", "Тапок", "Танк",
                             "Люк", "Мух", "Хряк", "Гусь", "Жбан", "Клоп", "Сон", "Портвейн"}}

    n0 := rand.Intn(len(rndTable[0]))
    n1 := rand.Intn(len(rndTable[1]))

    return fmt.Sprintf("%s %s", rndTable[0][n0], rndTable[1][n1])
}

Так что, когда я говорю своим друзьям неайтишникам, что писать код еще и весело - они не верят. А зря. Милый Жбан и Ужасный Клоп с ними поспорили бы :)

Источник: https://habr.com/ru/articles/764678/


Интересные статьи

Интересные статьи

Всем привет. Мы продолжаем цикл публикаций о том, как наша BI-платформа «Форсайт» работает с данными. В этой статье мы бы хотели продолжить рассказ про виртуализацию данных. И рассказать о том, как с ...
Обожаю свою френдленту в Facebook. Каждый квартал непременно появляются пафосные посты о запрете несчастному Марку забирать персональную информацию в свою метавселенную. Они непременно перемежаются фо...
Сегодня на простом примере рассмотрим – как провести краткий обзор неструктурированных данных в виде графа знаний.Для примера возьмем набор текстов из обращений с портала...
Недавно столкнулся с проблемой выбора квартиры и конечно первым делом решил узнать, что происходит на рынке недвижимости и, как это обычно бывает, половина экспертов с youtube.com говорят, что не...
Что делать, если ваш запрос к базе выполняется недостаточно быстро? Как узнать, оптимально ли запрос использует вычислительные ресурсы или его можно ускорить? На последней конференции HighLoad++ ...