最近我一直在使用future
(and future.apply
and furrr
) 在 R 中进行一些并行处理,这主要是很棒的,但是我偶然发现了一些我无法解释的东西。这可能是某个地方的错误,但也可能是我的草率编码。如果有人可以解释这种行为,将不胜感激。
设置
我正在对我的数据的不同子组进行模拟。对于每个组,我想运行模拟n
时间,然后计算结果的一些汇总统计数据。这是一些示例代码,用于重现我的基本设置并演示我看到的问题:
library(tidyverse)
library(future)
library(future.apply)
# Helper functions
#' Calls out to `free` to get total system memory used
sys_used <- function() {
.f <- system2("free", "-b", stdout = TRUE)
as.numeric(unlist(strsplit(.f[2], " +"))[3])
}
#' Write time, and memory usage to log file in CSV format
#' @param .f the file to write to
#' @param .id identifier for the row to be written
mem_string <- function(.f, .id) {
.s <- paste(.id, Sys.time(), sys_used(), Sys.getpid(), sep = ",")
write_lines(.s, .f, append = TRUE)
}
# Inputs
fake_inputs <- 1:16
nsim <- 100
nrows <- 1e6
log_file <- "future_mem_leak_log.csv"
if (fs::file_exists(log_file)) fs::file_delete(log_file)
test_cases <- list(
list(
name = "multisession-sequential",
plan = list(multisession, sequential)
),
list(
name = "sequential-multisession",
plan = list(sequential, multisession)
)
)
# Test code
for (.t in test_cases) {
plan(.t$plan)
# loop over subsets of the data
final_out <- future_lapply(fake_inputs, function(.i) {
# loop over simulations
out <- future_lapply(1:nsim, function(.j) {
# in real life this would be doing simulations,
# but here we just create "results" using rnorm()
res <- data.frame(
id = rep(.j, nrows),
col1 = rnorm(nrows) * .i,
col2 = rnorm(nrows) * .i,
col3 = rnorm(nrows) * .i,
col4 = rnorm(nrows) * .i,
col5 = rnorm(nrows) * .i,
col6 = rnorm(nrows) * .i
)
# write memory usage to file
mem_string(log_file, .t$name)
# in real life I would write res to file to read in later, but here we
# only return head of df so we know the returned value isn't filling up memory
res %>% slice_head(n = 10)
})
})
# clean up any leftover objects before testing the next plan
try(rm(final_out))
try(rm(out))
try(rm(res))
}
外部循环用于测试两种并行化策略:是并行化数据子集还是并行化 100 次模拟。
一些警告
- 我意识到并行化模拟并不是理想的设计,而且将数据分块以向每个内核发送 10-20 个模拟会更有效,但这并不是重点。我只是想了解内存中发生了什么。
- 我还认为
plan(multicore)
这里可能会更好(尽管我确定是否会)但我更感兴趣的是弄清楚发生了什么plan(multisession)
结果
我在 8-vCPU Linux EC2 上运行它(如果人们需要,我可以提供更多规格)并根据结果创建以下图(在底部绘制代码以实现可重复性):
首先,plan(list(multisession, sequential))
速度更快(正如预期的那样,请参见上面的警告),但我对内存配置文件感到困惑。plan(list(multisession, sequential))
我期望的总系统内存使用量保持不变,因为我假设res
每次循环都会覆盖对象。
但是,随着程序运行,内存使用量会稳步增长。plan(list(sequential, multisession))
似乎每次通过循环时res
都会创建对象,然后在某个地方徘徊,占用内存。在我的真实示例中,它变得足够大,以至于它填满了我的整个(32GB)系统内存并在大约中途终止了进程。
情节扭曲:它只在嵌套时发生
这是真正让我感到困惑的部分!当我将外部更改为future_lapply
常规lapply
并设置时plan(multisession)
,我看不到它!从我对这个“未来:拓扑”小插图的阅读来看,这应该是相同的,plan(list(sequential, multisession))
但情节并没有显示内存在增长(事实上,它几乎与plan(list(multisession, sequential))
上面的情节相同)
注意其他选项
实际上,我最初发现了这个,furrr::future_map_dfr()
但为了确保它不是 中的错误furrr
,我尝试了它future.apply::future_lapply()
并得到了显示的结果。我试图用 just 来编写代码future::future()
,得到了非常不同的结果,但很可能是因为我编写的代码实际上并不等效。furrr
如果没有or提供的抽象层,我没有太多直接使用期货的经验future.apply
。
同样,非常感谢对此的任何见解。
绘图代码
library(tidyverse)
logDat <- read_csv("future_mem_leak_log.csv",
col_names = c("plan", "time", "sys_used", "pid")) %>%
group_by(plan) %>%
mutate(
start = min(time),
time_elapsed = as.numeric(difftime(time, start, units = "secs"))
)
ggplot(logDat, aes(x = time_elapsed/60, y = sys_used/1e9, group = plan, colour = plan)) +
geom_line() +
xlab("Time elapsed (in mins)") + ylab("Memory used (in GB)") +
ggtitle("Memory Usage\n list(multisession, sequential) vs list(sequential, multisession)")