2

最近我一直在使用future(and future.applyand furrr) 在 R 中进行一些并行处理,这主要是很棒的,但是我偶然发现了一些我无法解释的东西。这可能是某个地方的错误,但也可能是我的草率编码。如果有人可以解释这种行为,将不胜感激。

设置

我正在对我的数据的不同子组进行模拟。对于每个组,我想运行模拟n时间,然后计算结果的一些汇总统计数据。这是一些示例代码,用于重现我的基本设置并演示我看到的问题:

library(tidyverse)
library(future)
library(future.apply)

# Helper functions

#' Calls out to `free` to get total system memory used
sys_used <- function() {
  .f <- system2("free", "-b", stdout = TRUE)
  as.numeric(unlist(strsplit(.f[2], " +"))[3])
}

#' Write time, and memory usage to log file in CSV format
#' @param .f the file to write to 
#' @param .id identifier for the row to be written
mem_string <- function(.f, .id) {
  .s <- paste(.id, Sys.time(), sys_used(), Sys.getpid(), sep = ",")
  write_lines(.s, .f, append = TRUE)
}

# Inputs
fake_inputs <- 1:16
nsim <- 100
nrows <- 1e6

log_file <- "future_mem_leak_log.csv"
if (fs::file_exists(log_file)) fs::file_delete(log_file)

test_cases <- list(
  list(
    name = "multisession-sequential",
    plan = list(multisession, sequential)
  ),
  list(
    name = "sequential-multisession",
    plan = list(sequential, multisession)
  )
)

# Test code

for (.t in test_cases) {
  plan(.t$plan)
  
  # loop over subsets of the data 
  final_out <- future_lapply(fake_inputs, function(.i) {
    # loop over simulations
    out <- future_lapply(1:nsim, function(.j) {
      # in real life this would be doing simulations, 
      # but here we just create "results" using rnorm()
      res <- data.frame(
        id = rep(.j, nrows),
        col1 = rnorm(nrows) * .i,
        col2 = rnorm(nrows) * .i,
        col3 = rnorm(nrows) * .i,
        col4 = rnorm(nrows) * .i,
        col5 = rnorm(nrows) * .i,
        col6 = rnorm(nrows) * .i
      )

      # write memory usage to file
      mem_string(log_file, .t$name)

      # in real life I would write res to file to read in later, but here we
      # only return head of df so we know the returned value isn't filling up memory
      res %>% slice_head(n = 10) 
    })
  })

  # clean up any leftover objects before testing the next plan
  try(rm(final_out))
  try(rm(out))
  try(rm(res))
}

外部循环用于测试两种并行化策略:是并行化数据子集还是并行化 100 次模拟。

一些警告

  • 我意识到并行化模拟并不是理想的设计,而且将数据分块以向每个内核发送 10-20 个模拟会更有效,但这并不是重点。我只是想了解内存中发生了什么。
  • 我还认为plan(multicore)这里可能会更好(尽管我确定是否会)但我更感兴趣的是弄清楚发生了什么plan(multisession)

结果

我在 8-vCPU Linux EC2 上运行它(如果人们需要,我可以提供更多规格)并根据结果创建以下图(在底部绘制代码以实现可重复性):

内存曲线图

首先,plan(list(multisession, sequential))速度更快(正如预期的那样,请参见上面的警告),但我对内存配置文件感到困惑。plan(list(multisession, sequential))我期望的总系统内存使用量保持不变,因为我假设res每次循环都会覆盖对象。

但是,随着程序运行,内存使用量会稳步增长。plan(list(sequential, multisession))似乎每次通过循环时res都会创建对象,然后在某个地方徘徊,占用内存。在我的真实示例中,它变得足够大,以至于它填满了我的整个(32GB)系统内存并在大约中途终止了进程。

情节扭曲:它只在嵌套时发生

这是真正让我感到困惑的部分!当我将外部更改为future_lapply常规lapply并设置时plan(multisession),我看不到它!从我对这个“未来:拓扑”小插图的阅读来看,这应该是相同的,plan(list(sequential, multisession))但情节并没有显示内存在增长(事实上,它几乎与plan(list(multisession, sequential))上面的情节相同)

一级情节

注意其他选项

实际上,我最初发现了这个,furrr::future_map_dfr()但为了确保它不是 中的错误furrr,我尝试了它future.apply::future_lapply()并得到了显示的结果。我试图用 just 来编写代码future::future(),得到了非常不同的结果,但很可能是因为我编写的代码实际上并不等效。furrr如果没有or提供的抽象层,我没有太多直接使用期货的经验future.apply

同样,非常感谢对此的任何见解。

绘图代码

library(tidyverse)

logDat <- read_csv("future_mem_leak_log.csv",
                   col_names = c("plan", "time", "sys_used", "pid")) %>%
  group_by(plan) %>% 
  mutate(
    start = min(time), 
    time_elapsed = as.numeric(difftime(time, start, units = "secs"))
  )

ggplot(logDat, aes(x = time_elapsed/60, y = sys_used/1e9, group = plan, colour = plan)) + 
  geom_line() + 
  xlab("Time elapsed (in mins)") + ylab("Memory used (in GB)") +
  ggtitle("Memory Usage\n  list(multisession, sequential) vs list(sequential, multisession)") 

4

0 回答 0