Немного о параллельных вычислениях в R

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Публикация очень краткая. Многие думают, что параллельные вычисления в R -- это очень сложно и неприменимо к их текущим задачам.

И да и нет. Если сознательно не вдаваться в теорию, железо и всякие подробности, то можно нарисовать «3 и 1/2» почти универсальных рецепта.

Является продолжением серии предыдущих публикаций.

Используемые пакеты

Загрузка пакетов
library(tidyverse)
library(magrittr)
library(stringi)
library(glue)

library(dqrng)

library(iterators)
library(future)
library(foreach)
library(doFuture)

library(tictoc)
library(futile.logger)
library(lgr) # будем использовать его рутовый логгер `lgr`

library(hrbrthemes)

Паттерны параллелизации

Паттерн 1. Параллелизация tidyverse вычислений

Ситуация. Есть скрипт, содержащий множество пайплайнов на tidyverse.

Пример задачи. Подсчитаем среднее от суммы квадратов чисел. Для повышения эффективности параллельных вычислений важно уменьшить объемы перекачки данных между потоками. Используем пакет furrr.

`tidyverse` pipeline
registerDoFuture()
# future::plan(multiprocess)
workers <- parallel::detectCores() - 1
future::plan(multisession, workers = workers)

num_row <- 1:10^6

ff_seq <- function(x) x^2

ff_par <- function(x) mean(x^2)

tic("Считаем последовательно")
lst1 <- num_row %>%
  purrr::map_dbl(ff_seq) %>%
  mean()
toc()

tic("Считаем параллельно, вариант 1")
lst2 <- num_row %>%
  furrr::future_map_dbl(ff_seq) %>%
  mean()
toc() 

tic("Считаем параллельно, вариант 2")
lst2 <- num_row %>%
  split(cut(seq_along(.), workers, labels = FALSE)) %>%
  furrr::future_map_dbl(ff_par) %>%
  mean()
toc()

Естественно, результат зависит от аппаратной платформы и ОС, на которой все запускается. На тестовом прогоне у меня такая раскладка:

Считаем последовательно: 7.23 sec elapsed
Считаем параллельно, вариант 1: 3.43 sec elapsed
Считаем параллельно, вариант 2: 0.64 sec elapsed

Windows и Linux достаточно сильно отличаются по методам параллелизации. Linux в продуктиве сильно предпочтительнее Windows.

Паттерн 2. Локальная ручная параллелизация

Ситация. В ряде случаев при работе скрипта необходимо выполнить незначительное число разовых неунифицируемых операций. Например, загрузка справочников и различных первичных данных. Есть возможное решение, функция %<-%.

Генерация сэмплов
# создаем последовательность, матрица 20 атрибутов на 10^5 событий
nn <- 10^5
tic("Generating sample data.frame")
df <- 100 %>%
  # stri_rand_strings(length = 10, pattern = "[a-z]") %>%
  sample(10^4:10^5, .) %>%
  sample(20 * nn, replace = TRUE) %>%
  matrix(byrow = TRUE, ncol = 20) %>%
  as_tibble(.name_repair = "universal") %>%
  mutate(user_id = as.character(sample(1:as.integer(nn/10), n(), replace = TRUE))) %>%
  # сгенерируем версию объекта
  mutate(ver = sample(1:20, n(), replace = TRUE)) %>%
  select(user_id, ver, everything())
toc()

# сохраним в файл для последующей демонстрации параллелизации
demo_fpath <- here::here("temp", "demo_data.xlsx")
openxlsx::write.xlsx(df, demo_fpath, asTable = TRUE)
Два варианта загрузки файлов
plan(multisession, workers = parallel::detectCores() - 2)
# plan(sequential)
# https://github.com/HenrikBengtsson/future

# считаем, что воркеров у нас 2
tic("Объединяем последовательно обработанные файлы")
tic("Читаем файлы последовательно")
res_lst <- list()
for (j in 1:6) {
  res_lst[[j]] <- { readxl::read_excel(demo_fpath) %>% head(5)}
}
toc()
seq_df <- bind_rows(res_lst)
toc()


tic("Объединяем параллельно обработанные файлы")
tic("Читаем файлы параллельно")
df1 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df2 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df3 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df4 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df5 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df6 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
toc()
par_df <- bind_rows(df1, df2, df3, df4, df5, df6)
toc()

all_equal(seq_df, par_df)

Некая разница наблюдается. Пример исключительно для демонстрации принципа. На тестовом прогоне у меня такая раскладка:

Объединяем последовательно обработанные файлы: 46.23 sec elapsed
Объединяем параллельно обработанные файлы: 37.82 sec elapsed

Паттерн 3. Параллелизация сложного процессинга

Ситуация. Много вычислительного данных, много кода, процессинг потенциально независим.

Пример.
Сделаем общее задание. Будем считать число сочетаний $C_n^k$. Дополнительно добавим несколько вариантов логирования при параллельных вычислениях.

Генерация списка заданий.
Подготовка логгеров
# подготовка логгеров
flog_logname <- here::here("log", "job_futile.log")
lgr_logname <- here::here("log", "job_lgr.log")

initLogging <- function(log_file){

  lgr <- get_logger_glue("logger")

  lgr$set_propagate(FALSE)
  lgr$set_threshold("all")
  lgr$set_appenders(list(
    console = AppenderConsole$new(
      threshold = "info"
    ),
    file = AppenderFile$new(
      file = log_file,
      threshold = "all"
    )
  ))

  lgr  
}

invisible(flog.appender(appender.tee(flog_logname)))
invisible(flog.threshold(INFO))
lgr <- initLogging(lgr_logname)
Многопоточные расчеты
"Start batch processing" %T>%
  flog.info() %T>%
  lgr$info()

# инициализируем параллельную обработку
# https://github.com/HenrikBengtsson/doFuture
# https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html
registerDoFuture()
# future::plan(multiprocess)
future::plan(multisession, workers = parallel::detectCores())
# future::plan(sequential)
# plan(future.callr::callr)

tic("Batch processing")
start_time <- Sys.time()

foreach(it = iter(jobs_tbl, by = "row"), .export = c("start_time"), 
        # .packages = 'futile.logger',
        .verbose = FALSE, .inorder = FALSE, .errorhandling = "remove") %dopar% {

          start <- Sys.time() - start_time

          # инициализируем логгер в потоке
          flog.appender(appender.tee(flog_logname))
          lgr <- initLogging(lgr_logname)

          res <- arrangements::npermutations(k = it$k, n = it$n, bigz = TRUE)

          # https://www.jottr.org/2020/11/06/future-1.20.1-the-future-just-got-a-bit-brighter/
          message("     message from thread")

          glue("Step {it$idx_str} finished. RAM used {capture.output(pryr::mem_used())}.",
               "PID: {Sys.getpid()}",
               "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->",
               .sep = " ") %T>%
            flog.info() %T>%
            lgr$info()

          # вернем тайминги тоже
          return(list(pid = Sys.getpid(), start = start, finish = Sys.time() - start_time))
        } -> output_lst
flog.info("Foreach finished")

checkmate::assertList(output_lst, any.missing = FALSE, null.ok = FALSE, min.len = 1)
output_tbl <- dplyr::bind_rows(output_lst)
# rm(output_lst)

# терминируем параллельную обработку --------------
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
flog.info(capture.output(toc()))

Для иллюстрации процесса нарисуем график запуска (точка) и завершения (крестик) задач на вычислителях. Хорошо видны первичные затраты на старт потоков windows.

Код построения графика.
# посмотрим графически на порядок запуска вычислителей
output_tbl %>%
  mutate_at("pid", as.factor) %>%
  mutate_at(vars(start, finish), as.numeric) %>%
  ggplot(aes(start, pid, colour = pid)) +
  geom_point(size = 3, alpha = .7) +
  geom_point(aes(x=finish), shape = 4, size = 3, colour = "black") +
  geom_vline(aes(xintercept = start, colour = pid), lty = "dashed", alpha = 0.7) +
  ggthemes::scale_fill_tableau("Tableau 10") +
  theme_ipsum_rc() +
  xlim(c(0, 5))

Заключение

При параллелизации задач, для достижения максимальной эффективности вычислений следует учитывать ряд важных моментов, вытекающих из принципов работы компьютеров, ОС и теоретических пределов. Если не погружаться глубоко в детали, резюмируем в виде «проверочных пунктов»:

  1. Инициализация вычислителей (worker) является достаточно дорогостоящей. Требуется породить новое окружение (поток, кластер, …), его инициализировать. Для коротких вычислений (секунды) затраты на инициализацию могут оказаться существенно выше однопоточного вычисления.

  2. При выделении потоков на одной машине, рекомендуется отдавать под вычислители core - 1, или чуть меньше. Один поток выполняет роль мастера, раздающего задания и выполняющего reduce ответов, получаемых от вычислителей. Ну и самой операционке тоже могут быть нужны ресурсы.

  3. Дескрипторы файлов и коннектов к БД не переходят границы потоков.

  4. Накладные расходы на перегон больших объемов данных из мастер потока в вычислитель и обратно могут оказаться по времени существенно выше, чем время вычисления. Оптимально, если мастер поток дает метаинформацию по заданию, а вычислитель уже сам загружает эти данные (из БД, из файлов, из API и т.д.). Ну и результат наверх должен уходить максимально агрегированный.

  5. Состав заданий надо максимально готовить в мастер потоке, а вниз спускать грубую вычислительную подзадачу. Повышается прозрачность и воспроизводимость.

  6. Для ряда задач, связанных с длинными синхронными запросами внешних системы (типичные представители -- REST API/Web scrapping), можно создавать вычислителей много больше чем доступных ядер. Они все равно висят большую часть времени в режиме ожидания. Можно запускать в виде отдельных процессов ОС с помощью настройки соответствующего бэкенда registerDoFuture(); plan(future.callr::callr)

Предыдущая публикация -- «Нюансы эксплуатации R решений в enterprise окружении?».

Источник: https://habr.com/ru/post/543940/


Интересные статьи

Интересные статьи

Тема COVID-19 сейчас звучит из каждого утюга, и каждый, кто был военным аналитиком в январе, экономистом в нефтяной отрасли в феврале, сейчас внезапно превратился в вирусолога. Тем не менее из ин...
Параллельные или распределенные вычисления — вещь сама по себе весьма нетривиальная. И среда разработки должна поддерживать, и DS специалист должен обладать навыками проведения параллельных вычис...
И снова здравствуйте. В комментариях к предыдущей статье я обещал написать о защите от радона и его ДПР. Что ж, выполняю это обещание. Как я уже говорил в предыдущей статье, радон представ...
Тема статьи навеяна результатами наблюдений за методикой создания шаблонов различными разработчиками, чьи проекты попадали мне на поддержку. Порой разобраться в, казалось бы, такой простой сущности ка...
С версии 12.0 в Bitrix Framework доступно создание резервных копий в автоматическом режиме. Задание параметров автоматического резервного копирования производится в Административной части на странице ...