在这篇文章之后:multicore and data.table in R,我想知道在使用 data.table 时是否有办法使用所有内核,通常按组进行计算可以并行化。似乎plyr
允许这样的操作是设计的。
3 回答
首先要检查的是data.table
常见问题 3.1 第 2 点已被纳入:
仅对最大的组进行一次内存分配,然后将该内存重新用于其他组。可以收集的垃圾很少。
这就是 data.table 分组很快的原因之一。但是这种方法不适合并行化。并行化意味着将数据复制到其他线程,而不是花费时间。但是,我的理解是data.table
分组通常比plyr
on.parallel
更快。这取决于每组任务的计算时间,以及是否可以轻松减少该计算时间。移动数据通常占主导地位(当对大数据任务的 1 或 3 次运行进行基准测试时)。
更常见的是,到目前为止,实际上是一些问题j
在[.data.table
. 例如,最近我们看到data.table
分组性能不佳,但罪魁祸首是min(POSIXct)
(在 R 中聚合超过 80K 唯一 ID)。避免这个问题产生了超过 50 倍的加速。
所以咒语是: Rprof
, Rprof
, Rprof
.
此外,同一常见问题解答中的第 1 点可能很重要:
只有该列被分组,其他 19 列被忽略,因为 data.table 检查 j 表达式并意识到它不使用其他列。
因此,data.table
实际上根本不遵循拆分-应用-组合范式。它的工作方式不同。split-apply-combine 适合并行化,但它确实不能扩展到大数据。
另请参阅 data.table intro vignette 中的脚注 3:
我们想知道有多少人将并行技术部署到矢量扫描代码中
那是在说“当然,并行速度要快得多,但是使用有效的算法真正需要多长时间?”。
但是,如果您已经分析(使用Rprof
),并且每个组的任务确实是计算密集型的,那么 datatable-help 上的 3 个帖子(包括“多核”一词可能会有所帮助:
当然,有很多任务在 data.table 中并行化会很好,并且有一种方法可以做到这一点。但它还没有完成,因为通常其他因素会咬人,所以它的优先级很低。如果您可以发布带有基准和 Rprof 结果的可重现虚拟数据,那将有助于提高优先级。
我已经按照@matt dowle 之前的Rprof、Rprof、Rprof 口头禅做了一些测试。
我发现并行化的决定取决于上下文。但可能很重要。根据测试操作(例如foo
下面,可以定制)和使用的核心数量(我尝试 8 和 24),我得到不同的结果。
结果如下:
- 使用 8 个内核,我看到此示例中的并行化提高了 21%
- 使用 24 个核心,我看到了 14% 的改进。
我还查看了一些真实世界(不可共享)的数据/操作,这些数据/操作显示了与 24 个内核并行的更大(33%
或两个不同的测试)改进。2018 年 5 月编辑一组新的真实示例案例显示,与 1000 个组的并行操作相比,改进了接近 85%。25%
R> sessionInfo() # 24 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods
[8] base
other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.2 data.table_1.10.4
R> sessionInfo() # 8 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: macOS Sierra 10.12.4
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods base
other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.5 data.table_1.10.4
下面的例子:
library(data.table)
library(stringi)
library(microbenchmark)
set.seed(7623452L)
my_grps <- stringi::stri_rand_strings(n= 5000, length= 10)
my_mat <- matrix(rnorm(1e5), ncol= 20)
dt <- data.table(grps= rep(my_grps, each= 20), my_mat)
foo <- function(dt) {
dt2 <- dt ## needed for .SD lock
nr <- nrow(dt2)
idx <- sample.int(nr, 1, replace=FALSE)
dt2[idx,][, `:=` (
new_var1= V1 / V2,
new_var2= V4 * V3 / V10,
new_var3= sum(V12),
new_var4= ifelse(V10 > 0, V11 / V13, 1),
new_var5= ifelse(V9 < 0, V8 / V18, 1)
)]
return(dt2[idx,])
}
split_df <- function(d, var) {
base::split(d, get(var, as.environment(d)))
}
foo2 <- function(dt) {
dt2 <- split_df(dt, "grps")
require(parallel)
cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
clusterExport(cl, varlist= "foo")
clusterExport(cl, varlist= "dt2", envir = environment())
clusterEvalQ(cl, library("data.table"))
dt2 <- parallel::parLapply(cl, X= dt2, fun= foo)
parallel::stopCluster(cl)
return(rbindlist(dt2))
}
print(parallel::detectCores()) # 8
microbenchmark(
serial= dt[,foo(.SD), by= "grps"],
parallel= foo2(dt),
times= 10L
)
Unit: seconds
expr min lq mean median uq max neval cld
serial 6.962188 7.312666 8.433159 8.758493 9.287294 9.605387 10 b
parallel 6.563674 6.648749 6.976669 6.937556 7.102689 7.654257 10 a
print(parallel::detectCores()) # 24
Unit: seconds
expr min lq mean median uq max neval cld
serial 9.014247 9.804112 12.17843 13.17508 13.56914 14.13133 10 a
parallel 10.732106 10.957608 11.17652 11.06654 11.30386 12.28353 10 a
分析:
我们可以使用这个答案对@matt dowle 对分析的原始评论提供更直接的回应。
结果,我们确实看到大部分计算时间是由base
而不是由 处理的data.table
。data.table
正如预期的那样,操作本身非常快。虽然有些人可能会争辩说,这证明了内部不需要并行性data.table
,但我认为这个工作流/操作集并不是非典型的。也就是说,我强烈怀疑大多数大型data.table
聚合都包含大量非data.table
代码;并且这与交互式使用与开发/生产使用相关。因此,我得出结论,并行性data.table
对于大型聚合是有价值的。
library(profr)
prof_list <- replicate(100, profr::profr(dt[,foo(.SD), by= "grps"], interval = 0.002),
simplify = FALSE)
pkg_timing <- fun_timing <- vector("list", length= 100)
for (i in 1:100) {
fun_timing[[i]] <- tapply(prof_list[[i]]$time, paste(prof_list[[i]]$source, prof_list[[i]]$f, sep= "::"), sum)
pkg_timing[[i]] <- tapply(prof_list[[i]]$time, prof_list[[i]]$source, sum)
}
sort(sapply(fun_timing, sum)) # no large outliers
fun_timing2 <- rbindlist(lapply(fun_timing, function(x) {
ret <- data.table(fun= names(x), time= x)
ret[, pct_time := time / sum(time)]
return(ret)
}))
pkg_timing2 <- rbindlist(lapply(pkg_timing, function(x) {
ret <- data.table(pkg= names(x), time= x)
ret[, pct_time := time / sum(time)]
return(ret)
}))
fun_timing2[, .(total_time= sum(time),
avg_time= mean(time),
avg_pct= round(mean(pct_time), 4)), by= "fun"][
order(avg_time, decreasing = TRUE),][1:10,]
pkg_timing2[, .(total_time= sum(time),
avg_time= mean(time),
avg_pct= round(mean(pct_time), 4)), by= "pkg"][
order(avg_time, decreasing = TRUE),]
结果:
fun total_time avg_time avg_pct
1: base::[ 670.362 6.70362 0.2694
2: NA::[.data.table 667.350 6.67350 0.2682
3: .GlobalEnv::foo 335.784 3.35784 0.1349
4: base::[[ 163.044 1.63044 0.0655
5: base::[[.data.frame 133.790 1.33790 0.0537
6: base::%in% 120.512 1.20512 0.0484
7: base::sys.call 86.846 0.86846 0.0348
8: NA::replace_dot_alias 27.824 0.27824 0.0112
9: base::which 23.536 0.23536 0.0095
10: base::sapply 22.080 0.22080 0.0089
pkg total_time avg_time avg_pct
1: base 1397.770 13.97770 0.7938
2: .GlobalEnv 335.784 3.35784 0.1908
3: data.table 27.262 0.27262 0.0155
交叉发布在github/data.table
是的(不过,这可能不值得,正如@Alex W 所指出的那样)。
下面提供了一个简单的模式来做到这一点。为简单起见,我使用了一个不值得的示例(使用mean
函数),但它显示了模式。
例子:
假设您想按 iris 数据集中的 Species 计算 Petal.Length 的平均值。
您可以直接使用 data.table 来完成它:
as.data.table(iris)[by=Species,,.(MPL=mean(Petal.Length))]
Species MPL
1: setosa 1.462
2: versicolor 4.260
3: virginica 5.552
但是,如果mean
是一个足够长时间运行且昂贵的计算(可能由分析确定,尽管有时它只是“显而易见的”),您可能喜欢使用parallel::mclapply
. 由于最小化与 mclapply 生成的所有子进程的通信可以大大减少整体计算,而不是将选择从 data.table 传递到每个子进程,您只想传递选择的索引。此外,通过首先对 data.table 进行排序,您可以只传递这些索引的范围(最大值和最小值)。像这样:
> o.dt<-as.data.table(iris)[order(Species)] # note: iris happens already to be ordered
> i.dt<-o.dt[,by=Species,.(irange=.(range(.I)))]
> i.dt
Species irange
1: setosa 1,50
2: versicolor 51,100
3: virginica 101,150
> result<-mclapply(seq(nrow(i.dt)),function(r) o.dt[do.call(seq,as.list(i.dt[r,irange][[1]])),.(MPL=mean(Petal.Length))])
> result
[[1]]
MPL
1: 1.462
[[2]]
MPL
1: 4.26
[[3]]
MPL
1: 5.552
> result.dt<-cbind(i.dt,rbindlist(result))[,-2]
> result.dt
Species MPL
1: setosa 1.462
2: versicolor 4.260
3: virginica 5.552
回顾模式:
- 订购输入。
- 计算每个组的索引范围。
- 定义一个匿名
function
来提取包含组成员的行,并执行所需的计算(在本例中为均值)。 - 在索引范围的行索引上使用 mclapply 将该函数应用于每个组。
- 用于
rbindlist
将结果作为 data.table 获取,将其放入cbind
输入,然后将其删除索引列(除非您出于其他原因需要保留它们)。
笔记:
- 决赛
rbindlist
通常很昂贵,可能会根据您的应用程序跳过)。
去做:
- 让 data.table 团队相信这个模式足够通用和有用,额外的 data.table 索引选项应该调用它。想象一下,传递 mc=TRUE 将调用此模式,并在...中支持其他并行选项
iris.dt[by=Species,,.(MPL=mean(Petal.Length)), mc=TRUE, mc.preschedule=FALSE, mc.set.seed=TRUE,...]