20

我有一个大代码,聚合步骤是当前速度方面的瓶颈。

在我的代码中,我想加快数据分组步骤以更快。我的数据的 SNOTE(简单的非平凡示例)如下所示:

library(data.table)
a = sample(1:10000000, 50000000, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e)
system.time(c.dt <- dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)])
   user  system elapsed 
 60.107   3.143  63.534

对于如此大的数据示例,这非常快,但就我而言,我仍在寻找进一步的加速。就我而言,我有多个内核,所以我几乎可以肯定一定有一种方法可以使用这种计算能力。

我愿意将我的数据类型更改为 data.frame 或 idata.frame 对象(理论上 idata.frame 应该比 data.frames 更快)。

我做了一些研究,似乎 plyr 包有一些并行功能可能会有所帮助,但我仍在努力为我正在尝试做的分组做这件事。在另一篇 SO 帖子中,他们讨论了其中一些想法。由于它使用了 foreach 函数,我仍然不确定通过这种并行化可以实现多少。根据我的经验,对于数百万个快速操作来说, foreach 函数并不是一个好主意,因为内核之间的通信工作最终会减慢并行化工作的速度。

4

2 回答 2

13

你可以并行聚合data.table吗?是的。

这值得么?没有。这是上一个答案未能突出显示的关键点。

正如Matt Dowledata.table and parallel computing中解释的那样,在并行运行操作时需要在分发之前制作副本(“块”)。这会减慢速度。在某些情况下,当您无法使用data.table(例如运行许多线性回归)时,值得在内核之间拆分任务。但不是聚合——至少在data.table涉及时是这样。

简而言之(除非另有证明),聚合使用data.table并停止担心使用doMC. data.table与其他可用的聚合相比,它已经非常快了——即使它不是多核的!


以下是您可以自己运行的一些基准测试,比较使用with和的data.table内部聚合。结果首先列出。byforeachmclapply

#-----------------------------------------------

# TL;DR FINAL RESULTS (Best to Worst)
# 3 replications, N = 10000:
# (1)  0.007 -- data.table using `by`
# (2)  3.548 -- mclapply with rbindlist
# (3)  5.557 -- foreach with rbindlist
# (4)  5.959 -- foreach with .combine = "rbind"
# (5) 14.029 -- lapply

# ----------------------------------------------

library(data.table)

## And used the following to create the dt
N <- 1e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")
setkey(dt, "a")

# TEST AGGREGATION WITHOUT PARALLELIZATION ---------------------------
## using data.tables `by` to aggregate
round(rowMeans(replicate(3, system.time({
    dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)]
}))), 3)
# [1] 0.007 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617

## using `lapply`
round(rowMeans(replicate(3, system.time({
    results <- lapply(unique(dt[["a"]]), function(x) {
        dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1])]
    })
    rbindlist(results)
}))), 3)
# [1] 14.029 elapsed for N == 10,000

# USING `mclapply` FORKING ---------------------------------
## use mclapply
round(rowMeans(replicate(3, system.time({
    results <- mclapply(unique(dt[["a"]]),
    function(x) {
        dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    }, mc.cores=4)
    rbindlist(results)
}))), 3)
# [1] 3.548 elapsed for N == 10,000


# PARALLELIZATION USING `doMC` PACKAGE ---------------------------------
library(doMC)
mc = 4
registerDoMC(cores=mc)
getDoParWorkers()
# [1] 4

## (option a) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
    foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar%
    dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
}))), 3)
# [1] 5.959 elapsed for N == 10,000

## (option b) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
    results <-
      foreach(x=unique(dt[["a"]])) %dopar%
        dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    rbindlist(results)
}))), 3)
# [1] 5.557 elapsed for N == 10,000

registerDoSEQ()
getDoParWorkers()
# [1] 1
于 2015-06-23T17:18:12.147 回答
8

如果您有多个可用的内核,为什么不利用可以使用其键快速过滤和分组 data.table 中的行的事实:

library(doMC)
registerDoMC(cores=4)


setkey(dt, "a")

finalRowOrderMatters = FALSE # FALSE can be faster
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=finalRowOrderMatters) %dopar% 
     dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]

请注意,如果唯一组(即length(unique(a)))的数量相对较少,则删除参数会更快.combine,将结果返回到列表中,然后调用rbindlist结果。在我对两个内核和 8GB RAM 的测试中,阈值约为 9,000 个唯一值。这是我用来进行基准测试的内容:

# (otion a)
round(rowMeans(replicate(3, system.time({
# ------- #
  foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar% 
     dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
# ------- #
}))), 3) 
# [1]  1.243 elapsed for N ==  1,000
# [1] 11.540 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 57.404 elapsed for N == 50,000



# (otion b)
round(rowMeans(replicate(3, system.time({
# ------- #
    results <- 
      foreach(x=unique(dt[["a"]])) %dopar% 
         dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    rbindlist(results)
# ------- #
}))), 3)
# [1]  1.117 elapsed for N ==  1,000
# [1] 10.567 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 76.613 elapsed for N == 50,000


## And used the following to create the dt
N <- 5e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")
于 2013-10-06T04:00:55.277 回答