Сколько школ во всем мире? Чтение сотен гигабайт данных в JVM из Apache Arrow

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

JVM основная платформа для Big Data решений, таких как Hadoop, Spark, Presto, NiFi но на производительность значительно влияют копирование/сериализация данных "на каждый чих" с последующей сборкой мусора и отсутствие SIMD оптимизаций при работе с данными.

А можно ли в программе на JVM прочитать сотни гигабайт Parquet файлов без Spark/Hadoop ? В этом нам поможет библиотека Apache Arrow - решение которым объединяются десятки решений для работы с Большими Данными.

В качестве примера измерим с неизвестной точностью "среднюю температуру по больнице" - мы посчитаем число школьных зданий по всему миру в проекте OpenStreetMap. И когда говорят что образование избыточно и в школе дают много лишних знаний, то сразу же хочется задать вопрос. Предположите куда устремятся люди освободившиеся от "оков образования" и что они смогут делать во взрослой жизни - быть только потребителем контента?

Итак, исходные данные для примера - OpenStreetMap planet-220704.osm.pbf Как их преобразовать в parquet файлы здесь рассматривать не буду, могу лишь порекомендовать OpenStreetMap Parquetizer как один из вариантов.

Начнем с классического решения для обработки - в PostGIS мы посчитем что по всему миру 1005638 зданий помеченных как школа.

Данные планеты возьмем из PostGIS в схеме совместимой с pgsnapshot, с установленным расширением h3-pg . База данных занимает 588 GB и запущена в докер контейнере на ноутбуке с 16Гб ОЗУ и M.2 накопителем Samsung 970 EVO Plus:

Для картинки с КДПВ данные были подготовлены запросом и для желающих повторить визуализацию сохранил из вместе с полигонами границ в gist:

create table school 
    as select h3_3, count(*) as "count" from ways 
        where closed and tags->'building' = 'school' 
         or (tags->'building' is not null and tags->'amenity'='school') 
       group by h3_3 order by 2 desc

Разбивка данных на регионы для агрегации производилась в иерархической системе H3 на уровне разбивки 3. И визуализированы в QGIS только те регионы в которых больше 100 зданий школ на регион

select 
   h3_to_geo_boundary_geometry(h3_3::h3index), 
   count 
  from school 
      where count>100

Насколько полны данные по школолам в OpenStreetMap - большой вопрос. Обычно данные более актуальны в городах-миллионниках. OpenStreetMap работает на тех же принципах что и википедия, так что в случае неточностей все вопросы к сообществу.

А теперь решение по подсчету количества школ во всем мире на Apache Arrow. Добавляем зависимости в maven

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.github.igor-suhorukov</groupId>
    <artifactId>osm_parquet_dataset_example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <arrow.version>9.0.0</arrow.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-dataset</artifactId>
            <version>${arrow.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.arrow</groupId>
            <artifactId>arrow-memory-unsafe</artifactId>
            <version>${arrow.version}</version>
        </dependency>
    </dependencies>

</project>

И считаем школы по миру:

package com.github.igorsuhorukov.arrow.osm.example;

import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.impl.UnionMapReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.util.Text;

import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.StreamSupport;

public class CalculateSchoolCount {

    public static final int BATCH_SIZE = 100000;
    public static final Text BUILDING_KEY = new Text("building");
    public static final Text SCHOOL_VALUE = new Text("school");
    public static final Text AMENITY_KEY = new Text("amenity");

    public static void main(String[] args) throws Exception{
        if(args.length!=1){
            throw new IllegalArgumentException("Specify source dataset path for parquet files");
        }
        File datasetPath = new File(args[0]);
        if(!datasetPath.exists()){
            throw new IllegalArgumentException();
        }

        long startTime = System.currentTimeMillis();
        try (BufferAllocator allocator = new RootAllocator()) {
            FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator,
                    NativeMemoryPool.getDefault(), FileFormat.PARQUET, datasetPath.toURI().toURL().toExternalForm());
            final Dataset dataset = factory.finish();
            ScanOptions options = new ScanOptions(BATCH_SIZE);
            final Scanner scanner = dataset.newScan(options);
            try {
                AtomicLong totalRows = new AtomicLong();
                StreamSupport.stream(scanner.scan().spliterator(), true).forEach(scanTask -> {
                    long rowCount=0;
                    try (ArrowReader reader = scanTask.execute()) {
                        while (reader.loadNextBatch()) {
                            VectorSchemaRoot root = reader.getVectorSchemaRoot();
                            BitVector closed = (BitVector) root.getVector("closed");
                            MapVector tags = (MapVector) root.getVector("tags");
                            UnionMapReader tagsReader = tags.getReader();
                            for(int row=0, size = root.getRowCount(); row < size; row++){
                                if(closed.get(row) != 0){
                                    tagsReader.setPosition(row);
                                    boolean building=false;
                                    boolean buildingSchool=false;
                                    boolean amenitySchool=false;
                                    while (tagsReader.next()){
                                        Text key = (Text) tagsReader.key().readObject();
                                        Text value = (Text) tagsReader.value().readObject();
                                        if(key!=null && key.equals(BUILDING_KEY)){
                                            if(value!=null && value.equals(SCHOOL_VALUE)){
                                                buildingSchool = true;
                                                break;
                                            }
                                            building=true;
                                        } else if(key!=null && value!=null && key.equals(AMENITY_KEY) && value.equals(SCHOOL_VALUE)){
                                            amenitySchool = true;
                                            break;
                                        }
                                    }
                                    if(buildingSchool || (building && amenitySchool)){
                                        rowCount++;
                                    }
                                }
                            }
                            tags.close();
                            closed.close();
                            root.close();
                        }
                        totalRows.addAndGet(rowCount);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                    try {
                        scanTask.close();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
                long executionTime = System.currentTimeMillis() - startTime;
                System.out.println(totalRows.get()+" ("+executionTime+" ms)");

            } finally {
                AutoCloseables.close(scanner, dataset);
            }
        }
    }
}

Итак, решение далеко от идеального как минимум по лишним аллокациям и отсутствующим фичам в java Dataset API - не хватает чтения только требуемых для расчета колонок, filters pushdown. Но и идея была показать что обработать 188,2Гб сжатых zstd parquet файлов можно в java без Hadoop/Spark!

Dataset API в Java еще далек от функционала Python, так что желающих помочь проекту с 10К звездочками на Github тут полный простор для действий! Из пока отсутствующих полезных фич - работа со схемами партиционирования данных Hive/directory/file name style. Еще ждет слияния мой PR на поддержку Apache ORC формата в java Dataset API.

Для интерсующихся как читать данные в Arrow Dataset API можете посмотреть мою реализацию arrow_to_database загрузчика Apache Parquet и Arrow IPC файлов в базу данных через JDBC ну и конечно же документацию Apache Arrow.

Если тема работы с Apache Arrow или геоданными интересна, пишите про что хотели бы чтобы я рассказал в следующей статье, то добро пожаловать в комментарии к посту!

Источник: https://habr.com/ru/post/686860/


Интересные статьи

Интересные статьи

Objection.js — сравнительно молодая и минималистичная ORM-библиотека для Node.js, которая сильно упрощает взаимодействие с базами данных и не перегружена дополнительными функциями, как Sequelize или T...
Привет! Подошел к концу двенадцатый набор в Школу Программистов hh.ru. Самое время рассказать, как Петр Васильевич раздавал премии менеджерам, кто вышел победителем из "Релиза до выходных" благодаря р...
Привет!Согласие на обработку персональных данных теперь запрашивают на самых разных ресурсах при регистрации. Уведомления вида «Регистрируясь здесь, я даю согласие...» стали почти таким же привычным, ...
Прим. перев.: в этой статье сербский «инженер по масштабируемости» нагруженного онлайн-проекта в подробностях рассказывает о своем опыте оптимизации большой БД на базе MySQL. Проведена он...
Однажды, предо мной встала интересная задача — сделать подсветку для электронной книжки. Ну, вообще, это было побочным квестом к восстановлению кнопок на ней, но этот апгрейд меня серьёзно ув...