Ein bisschen über paralleles Rechnen in R.

Die Veröffentlichung ist sehr kurz. Viele Leute denken, dass paralleles Rechnen in R sehr schwierig ist und nicht auf ihre aktuellen Aufgaben anwendbar ist.





Ja und nein. Wenn Sie nicht absichtlich auf Theorie, Hardware und alle möglichen Details eingehen, können Sie "3 und 1/2" von fast universellen Rezepten zeichnen. Diese Beispiele ähneln bewusst produktiven Aufgaben und nicht einem entmannten Paar von Kunststofflinien.





Es ist eine Fortsetzung einer Reihe früherer Veröffentlichungen .





Gebrauchte Pakete

Pakete laden
library(tidyverse)
library(magrittr)
library(stringi)
library(glue)

library(dqrng)

library(iterators)
library(future)
library(foreach)
library(doFuture)

library(tictoc)
library(futile.logger)
library(lgr) #      `lgr`

library(hrbrthemes)
      
      



Parallelisierungsmuster

Muster 1. Parallelisierung von Tidyverse-Berechnungen

Situation. Es gibt ein Skript, das viele Pipelines für enthält tidyverse



.





Beispielaufgabe. Berechnen wir den Durchschnitt der Summe der Quadrate der Zahlen. Um die Effizienz des parallelen Rechnens zu verbessern, ist es wichtig, den Umfang der Datenübertragung zwischen Threads zu reduzieren. Wir verwenden das furrr- Paket .





`tidyverse` Pipeline
registerDoFuture()
# future::plan(multiprocess)
workers <- parallel::detectCores() - 1
future::plan(multisession, workers = workers)

num_row <- 1:10^6

ff_seq <- function(x) x^2

ff_par <- function(x) mean(x^2)

tic(" ")
lst1 <- num_row %>%
  purrr::map_dbl(ff_seq) %>%
  mean()
toc()

tic(" ,  1")
lst2 <- num_row %>%
  furrr::future_map_dbl(ff_seq) %>%
  mean()
toc() 

tic(" ,  2")
lst2 <- num_row %>%
  split(cut(seq_along(.), workers, labels = FALSE)) %>%
  furrr::future_map_dbl(ff_par) %>%
  mean()
toc()
      
      



Das Ergebnis hängt natürlich von der Hardwareplattform und dem Betriebssystem ab, auf dem alles ausgeführt wird. Bei einem Testlauf habe ich folgendes Layout:





 : 7.23 sec elapsed
 ,  1: 3.43 sec elapsed
 ,  2: 0.64 sec elapsed
      
      



Windows und Linux unterscheiden sich in ihrer Parallelisierung erheblich. Linux in der Produktion wird Windows stark vorgezogen.





Muster 2. Lokale manuelle Parallelisierung

. . , . , %<-%



.





#  ,  20   10^5 
nn <- 10^5
tic("Generating sample data.frame")
df <- 100 %>%
  # stri_rand_strings(length = 10, pattern = "[a-z]") %>%
  sample(10^4:10^5, .) %>%
  sample(20 * nn, replace = TRUE) %>%
  matrix(byrow = TRUE, ncol = 20) %>%
  as_tibble(.name_repair = "universal") %>%
  mutate(user_id = as.character(sample(1:as.integer(nn/10), n(), replace = TRUE))) %>%
  #   
  mutate(ver = sample(1:20, n(), replace = TRUE)) %>%
  select(user_id, ver, everything())
toc()

#       
demo_fpath <- here::here("temp", "demo_data.xlsx")
openxlsx::write.xlsx(df, demo_fpath, asTable = TRUE)
      
      



plan(multisession, workers = parallel::detectCores() - 2)
# plan(sequential)
# https://github.com/HenrikBengtsson/future

# ,     2
tic("   ")
tic("  ")
res_lst <- list()
for (j in 1:6) {
  res_lst[[j]] <- { readxl::read_excel(demo_fpath) %>% head(5)}
}
toc()
seq_df <- bind_rows(res_lst)
toc()


tic("   ")
tic("  ")
df1 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df2 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df3 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df4 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df5 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df6 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
toc()
par_df <- bind_rows(df1, df2, df3, df4, df5, df6)
toc()

all_equal(seq_df, par_df)
      
      



. . :





   : 46.23 sec elapsed
   : 37.82 sec elapsed
      
      



3.

. , , .





.

. $C_n^k$



. .





.




#  
flog_logname <- here::here("log", "job_futile.log")
lgr_logname <- here::here("log", "job_lgr.log")

initLogging <- function(log_file){

  lgr <- get_logger_glue("logger")

  lgr$set_propagate(FALSE)
  lgr$set_threshold("all")
  lgr$set_appenders(list(
    console = AppenderConsole$new(
      threshold = "info"
    ),
    file = AppenderFile$new(
      file = log_file,
      threshold = "all"
    )
  ))

  lgr  
}

invisible(flog.appender(appender.tee(flog_logname)))
invisible(flog.threshold(INFO))
lgr <- initLogging(lgr_logname)
      
      



"Start batch processing" %T>%
  flog.info() %T>%
  lgr$info()

#   
# https://github.com/HenrikBengtsson/doFuture
# https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html
registerDoFuture()
# future::plan(multiprocess)
future::plan(multisession, workers = parallel::detectCores())
# future::plan(sequential)
# plan(future.callr::callr)

tic("Batch processing")
start_time <- Sys.time()

foreach(it = iter(jobs_tbl, by = "row"), .export = c("start_time"), 
        # .packages = 'futile.logger',
        .verbose = FALSE, .inorder = FALSE, .errorhandling = "remove") %dopar% {

          start <- Sys.time() - start_time

          #    
          flog.appender(appender.tee(flog_logname))
          lgr <- initLogging(lgr_logname)

          res <- arrangements::npermutations(k = it$k, n = it$n, bigz = TRUE)

          # https://www.jottr.org/2020/11/06/future-1.20.1-the-future-just-got-a-bit-brighter/
          message("     message from thread")

          glue("Step {it$idx_str} finished. RAM used {capture.output(pryr::mem_used())}.",
               "PID: {Sys.getpid()}",
               "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->",
               .sep = " ") %T>%
            flog.info() %T>%
            lgr$info()

          #   
          return(list(pid = Sys.getpid(), start = start, finish = Sys.time() - start_time))
        } -> output_lst
flog.info("Foreach finished")

checkmate::assertList(output_lst, any.missing = FALSE, null.ok = FALSE, min.len = 1)
output_tbl <- dplyr::bind_rows(output_lst)
# rm(output_lst)

#    --------------
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
flog.info(capture.output(toc()))
      
      



() () . windows.





.
#      
output_tbl %>%
  mutate_at("pid", as.factor) %>%
  mutate_at(vars(start, finish), as.numeric) %>%
  ggplot(aes(start, pid, colour = pid)) +
  geom_point(size = 3, alpha = .7) +
  geom_point(aes(x=finish), shape = 4, size = 3, colour = "black") +
  geom_vline(aes(xintercept = start, colour = pid), lty = "dashed", alpha = 0.7) +
  ggthemes::scale_fill_tableau("Tableau 10") +
  theme_ipsum_rc() +
  xlim(c(0, 5))
      
      



, , , . , « »:





  1. (worker) . (, , …), . () .





  2. , core - 1, . , reduce , . .





  3. .





  4. , . , , ( , , API ..). .





  5. , . .





  6. Für eine Reihe von Aufgaben im Zusammenhang mit langen synchronen Anforderungen von externen Systemen (typische Vertreter sind REST-API / Web-Scrapping) können Sie viel mehr Taschenrechner als die verfügbaren Kerne erstellen. Sie hängen immer noch die meiste Zeit im Standby-Modus. Kann als separater Betriebssystemprozess ausgeführt werden, indem das entsprechende Backend konfiguriert wird. registerDoFuture();



    plan(future.callr::callr).



    Dies ist die verbleibende Hälfte des Rezepts.





Vorherige Veröffentlichung - "Nuancen des Betriebs von R-Lösungen in einer Unternehmensumgebung?" ...








All Articles