6

下面的代码在 Windows 和 Ubuntu 平台上产生不同的结果。我理解这是因为处理并行处理的方法不同。

总结:
我不能在 Linux 上并行insert/数据( , ),而我可以在 Windows 上做到这一点rbindmclapplymcmapply

感谢@Hong Ooi 指出mclapply不能在 Windows 上并行工作,但下面的问题仍然有效。

当然,same 没有多个插入data.frame,每个插入都在单独的 data.frame 中执行。

library(R6)
library(parallel)

# storage objects generator
cl <- R6Class(
    classname = "cl",
    public = list(
        data = data.frame(NULL),
        initialize = function() invisible(self),
        insert = function(x) self$data <- rbind(self$data, x)
    )
)

N <- 4L # number of entities
i <- setNames(seq_len(N),paste0("n",seq_len(N)))

# random data.frames
set.seed(1)
ldt <- lapply(i, function(i) data.frame(replicate(sample(3:10,1),sample(letters,1e5,rep=TRUE))))

# entity storage
lcl1 <- lapply(i, function(i) cl$new())
lcl2 <- lapply(i, function(i) cl$new())
lcl3 <- lapply(i, function(i) cl$new())

# insert data
invisible({
    mclapply(names(i), FUN = function(n) lcl1[[n]]$insert(ldt[[n]]))
    mcmapply(FUN = function(dt, cl) cl$insert(dt), ldt, lcl2, SIMPLIFY=FALSE)
    lapply(names(i), FUN = function(n) lcl3[[n]]$insert(ldt[[n]]))
})

### Windows

sapply(lcl1, function(cl) nrow(cl$data)) # mclapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000
sapply(lcl2, function(cl) nrow(cl$data)) # mcmapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000
sapply(lcl3, function(cl) nrow(cl$data)) # lapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000

### Unix

sapply(lcl1, function(cl) nrow(cl$data)) # mclapply
#n1 n2 n3 n4
# 0  0  0  0
sapply(lcl2, function(cl) nrow(cl$data)) # mcmapply
#n1 n2 n3 n4
# 0  0  0  0
sapply(lcl3, function(cl) nrow(cl$data)) # lapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000

问题是:

如何在 Linux 平台上实现rbind并行成单独的 s?data.frame

在我的情况下, PS 非内存存储SQLite不能被视为解决方案。

4

2 回答 2

3

问题是mclapply并且mcmapply不打算与具有副作用的功能一起使用。您的函数正在修改列表中的对象,但mclapply不会将修改后的对象发送回主进程:它只返回函数显式返回的值。mclapply这意味着当工人作为回报退出时,您的结果会丢失。

通常我会将代码更改为不依赖于副作用,并返回被修改的对象。这是一种使用方法clusterApply,它也可以在 Windows 上并行工作:

library(R6)
library(parallel)
cl <- R6Class(
    classname = "cl",
    public = list(
        data = data.frame(NULL),
        initialize = function() invisible(self),
        insert = function(x) self$data <- rbind(self$data, x)))

N <- 4L # number of entities
i <- setNames(seq_len(N),paste0("n",seq_len(N)))
set.seed(1)
ldt <- lapply(i, function(i)
  data.frame(replicate(sample(3:10,1),sample(letters,1e5,rep=TRUE))))
nw <- 3  # number of workers
clust <- makePSOCKcluster(nw)
idx <- splitIndices(length(i), nw)
nameslist <- lapply(idx, function(iv) names(i)[iv])

lcl4 <- do.call('c', clusterApply(clust, nameslist, 
  function(nms, cl, ldt) {
    library(R6)
    lcl4 <- lapply(nms, function(n) cl$new())
    names(lcl4) <- nms
    lapply(nms, FUN = function(n) lcl4[[n]]$insert(ldt[[n]]))
    lcl4
  }, cl, ldt))

如果您想创建一次对象列表然后并行修改对象多次,则此方法不起作用。这也是可能的,但你必须有持久的工人。在这种情况下,您会在所有任务完成后从工作人员那里获取修改后的对象。不幸的是,mclapply它不使用持久化的worker,所以在这种情况下你必须使用基于集群的函数,例如clusterApply. 这是一种方法:

# Initialize the cluster workers
clusterEvalQ(clust, library(R6))
clusterExport(clust, c('cl', 'ldt'))
clusterApply(clust, nameslist, function(nms) {
  x <- lapply(nms, function(n) cl$new())
  names(x) <- nms
  assign('lcl4', x, pos=.GlobalEnv)
  NULL
})

# Insert data into lcl4 on each worker
clusterApply(clust, nameslist, function(nms) {
  lapply(nms, FUN = function(n) lcl4[[n]]$insert(ldt[[n]]))
  NULL
})

# Concatenate lcl4 from each worker
lcl4 <- do.call('c', clusterEvalQ(clust, lcl4))

这与之前的方法非常相似,只是它将流程分为三个阶段:worker 初始化、任务执行和结果检索。clusterExport我还使用和以更传统的方式初始化了工人clusterEvalQ

于 2015-05-26T18:32:00.767 回答
0

我认为 Windows 版本mclapply正在运行,因为它将其工作委托给lapply. 检查时序或 CPU 核心使用情况可以验证这一点。根据R source, Windowsmclapplymcmapply被顺序版本取代。

看来,代码的并行化方式出了点问题,目前看不出到底是什么。

于 2015-05-22T12:41:25.647 回答