42

在这篇文章之后:multicore and data.table in R,我想知道在使用 data.table 时是否有办法使用所有内核,通常按组进行计算可以并行化。似乎plyr允许这样的操作是设计的。

4

3 回答 3

56

首先要检查的是data.table常见问题 3.1 第 2 点已被纳入:

仅对最大的组进行一次内存分配,然后将该内存重新用于其他组。可以收集的垃圾很少。

这就是 data.table 分组很快的原因之一。但是这种方法不适合并行化。并行化意味着将数据复制到其他线程,而不是花费时间。但是,我的理解是data.table分组通常比plyron.parallel更快。这取决于每组任务的计算时间,以及是否可以轻松减少该计算时间。移动数据通常占主导地位(当对大数据任务的 1 或 3 次运行进行基准测试时)。

更常见的是,到目前为止,实际上是一些问题j[.data.table. 例如,最近我们看到data.table分组性能不佳,但罪魁祸首是min(POSIXct)在 R 中聚合超过 80K 唯一 ID)。避免这个问题产生了超过 50 倍的加速。

所以咒语是: Rprof, Rprof, Rprof.

此外,同一常见问题解答中的第 1 点可能很重要:

只有该列被分组,其他 19 列被忽略,因为 data.table 检查 j 表达式并意识到它不使用其他列。

因此,data.table实际上根本不遵循拆分-应用-组合范式。它的工作方式不同。split-apply-combine 适合并行化,但它确实不能扩展到大数据。

另请参阅 data.table intro vignette 中的脚注 3:

我们想知道有多少人将并行技术部署到矢量扫描代码中

那是在说“当然,并行速度要快得多,但是使用有效的算法真正需要多长时间?”。

但是,如果您已经分析(使用Rprof),并且每个组的任务确实是计算密集型的,那么 datatable-help 上的 3 个帖子(包括“多核”一词可能会有所帮助:

数据表帮助上的多核帖子

当然,有很多任务在 data.table 中并行化会很好,并且有一种方法可以做到这一点。但它还没有完成,因为通常其他因素会咬人,所以它的优先级很低。如果您可以发布带有基准和 Rprof 结果的可重现虚拟数据,那将有助于提高优先级。

于 2013-02-08T00:19:30.660 回答
9

我已经按照@matt dowle 之前的Rprof、Rprof、Rprof 口头禅做了一些测试。

我发现并行化的决定取决于上下文。但可能很重要。根据测试操作(例如foo下面,可以定制)和使用的核心数量(我尝试 8 和 24),我得到不同的结果。

结果如下:

  1. 使用 8 个内核,我看到此示例中的并行化提高了 21%
  2. 使用 24 个核心,我看到了 14% 的改进

我还查看了一些真实世界(不可共享)的数据/操作,这些数据/操作显示了与 24 个内核并行的更大(33%或两个不同的测试)改进。2018 年 5 月编辑一组新的真实示例案例显示,与 1000 个组的并行操作相比,改进了接近 85%。25%

R> sessionInfo() # 24 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)

attached base packages:
[1] parallel  stats     graphics  grDevices utils     datasets  methods
[8] base

other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.2          data.table_1.10.4

R> sessionInfo() # 8 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: macOS Sierra 10.12.4

attached base packages:
[1] parallel  stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.5          data.table_1.10.4     

下面的例子:

library(data.table)
library(stringi)
library(microbenchmark)

set.seed(7623452L)
my_grps <- stringi::stri_rand_strings(n= 5000, length= 10)

my_mat <- matrix(rnorm(1e5), ncol= 20)
dt <- data.table(grps= rep(my_grps, each= 20), my_mat)

foo <- function(dt) {
  dt2 <- dt ## needed for .SD lock
  nr <- nrow(dt2)

  idx <- sample.int(nr, 1, replace=FALSE)

  dt2[idx,][, `:=` (
    new_var1= V1 / V2,
    new_var2= V4 * V3 / V10,
    new_var3= sum(V12),
    new_var4= ifelse(V10 > 0, V11 / V13, 1),
    new_var5= ifelse(V9 < 0, V8 / V18, 1)
  )]


  return(dt2[idx,])
}

split_df <- function(d, var) {
  base::split(d, get(var, as.environment(d)))
}

foo2 <- function(dt) {
  dt2 <- split_df(dt, "grps")

  require(parallel)
  cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
  clusterExport(cl, varlist= "foo")
  clusterExport(cl, varlist= "dt2", envir = environment())
  clusterEvalQ(cl, library("data.table"))

  dt2 <- parallel::parLapply(cl, X= dt2, fun= foo)

  parallel::stopCluster(cl)
  return(rbindlist(dt2))
}

print(parallel::detectCores()) # 8

microbenchmark(
  serial= dt[,foo(.SD), by= "grps"],
  parallel= foo2(dt),
  times= 10L
)

Unit: seconds
     expr      min       lq     mean   median       uq      max neval cld
   serial 6.962188 7.312666 8.433159 8.758493 9.287294 9.605387    10   b
 parallel 6.563674 6.648749 6.976669 6.937556 7.102689 7.654257    10  a 

print(parallel::detectCores()) # 24

Unit: seconds
     expr       min        lq     mean   median       uq      max neval cld
   serial  9.014247  9.804112 12.17843 13.17508 13.56914 14.13133    10   a
 parallel 10.732106 10.957608 11.17652 11.06654 11.30386 12.28353    10   a

分析:

我们可以使用这个答案对@matt dowle 对分析的原始评论提供更直接的回应。

结果,我们确实看到大部分计算时间是由base而不是由 处理的data.tabledata.table正如预期的那样,操作本身非常快。虽然有些人可能会争辩说,这证明了内部不需要并行性data.table,但我认为这个工作流/操作集并不是非典型的。也就是说,我强烈怀疑大多数大型data.table聚合都包含大量非data.table代码;并且这与交互式使用与开发/生产使用相关。因此,我得出结论,并行性data.table对于大型聚合是有价值的。

library(profr)

prof_list <- replicate(100, profr::profr(dt[,foo(.SD), by= "grps"], interval = 0.002),
                       simplify = FALSE)

pkg_timing <- fun_timing <- vector("list", length= 100)
for (i in 1:100) {
  fun_timing[[i]] <- tapply(prof_list[[i]]$time, paste(prof_list[[i]]$source, prof_list[[i]]$f, sep= "::"), sum)
  pkg_timing[[i]] <- tapply(prof_list[[i]]$time, prof_list[[i]]$source, sum)
}

sort(sapply(fun_timing, sum)) #  no large outliers

fun_timing2 <- rbindlist(lapply(fun_timing, function(x) {
  ret <- data.table(fun= names(x), time= x)
  ret[, pct_time := time / sum(time)]
  return(ret)
}))

pkg_timing2 <- rbindlist(lapply(pkg_timing, function(x) {
  ret <- data.table(pkg= names(x), time= x)
  ret[, pct_time := time / sum(time)]
  return(ret)
}))

fun_timing2[, .(total_time= sum(time),
                avg_time= mean(time),
                avg_pct= round(mean(pct_time), 4)), by= "fun"][
  order(avg_time, decreasing = TRUE),][1:10,]

pkg_timing2[, .(total_time= sum(time),
                avg_time= mean(time),
                avg_pct= round(mean(pct_time), 4)), by= "pkg"][
  order(avg_time, decreasing = TRUE),]

结果:

                      fun total_time avg_time avg_pct
 1:               base::[    670.362  6.70362  0.2694
 2:      NA::[.data.table    667.350  6.67350  0.2682
 3:       .GlobalEnv::foo    335.784  3.35784  0.1349
 4:              base::[[    163.044  1.63044  0.0655
 5:   base::[[.data.frame    133.790  1.33790  0.0537
 6:            base::%in%    120.512  1.20512  0.0484
 7:        base::sys.call     86.846  0.86846  0.0348
 8: NA::replace_dot_alias     27.824  0.27824  0.0112
 9:           base::which     23.536  0.23536  0.0095
10:          base::sapply     22.080  0.22080  0.0089

          pkg total_time avg_time avg_pct
1:       base   1397.770 13.97770  0.7938
2: .GlobalEnv    335.784  3.35784  0.1908
3: data.table     27.262  0.27262  0.0155

交叉发布在github/data.table

于 2017-06-28T23:22:57.487 回答
2

的(不过,这可能不值得,正如@Alex W 所指出的那样)。

下面提供了一个简单的模式来做到这一点。为简单起见,我使用了一个不值得的示例(使用mean函数),但它显示了模式。

例子:

假设您想按 iris 数据集中的 Species 计算 Petal.Length 的平均值。

您可以直接使用 data.table 来完成它:

as.data.table(iris)[by=Species,,.(MPL=mean(Petal.Length))]
      Species   MPL
1:     setosa 1.462
2: versicolor 4.260
3:  virginica 5.552

但是,如果mean是一个足够长时间运行且昂贵的计算(可能由分析确定,尽管有时它只是“显而易见的”),您可能喜欢使用parallel::mclapply. 由于最小化与 mclapply 生成的所有子进程的通信可以大大减少整体计算,而不是将选择从 data.table 传递到每个子进程,您只想传递选择的索引。此外,通过首先对 data.table 进行排序,您可以只传递这些索引的范围(最大值和最小值)。像这样:

> o.dt<-as.data.table(iris)[order(Species)] # note: iris happens already to be ordered
> i.dt<-o.dt[,by=Species,.(irange=.(range(.I)))]
> i.dt
      Species  irange
1:     setosa    1,50
2: versicolor  51,100
3:  virginica 101,150


> result<-mclapply(seq(nrow(i.dt)),function(r) o.dt[do.call(seq,as.list(i.dt[r,irange][[1]])),.(MPL=mean(Petal.Length))])
> result
[[1]]
     MPL
1: 1.462

[[2]]
    MPL
1: 4.26

[[3]]
     MPL
1: 5.552

> result.dt<-cbind(i.dt,rbindlist(result))[,-2]
> result.dt
      Species   MPL
1:     setosa 1.462
2: versicolor 4.260
3:  virginica 5.552

回顾模式:

  • 订购输入。
  • 计算每个组的索引范围。
  • 定义一个匿名function来提取包含组成员的行,并执行所需的计算(在本例中为均值)。
  • 在索引范围的行索引上使用 mclapply 将该函数应用于每个组。
  • 用于rbindlist将结果作为 data.table 获取,将其放入cbind输入,然后将其删除索引列(除非您出于其他原因需要保留它们)。

笔记:

  • 决赛rbindlist通常很昂贵,可能会根据您的应用程序跳过)。

去做:

  • 让 data.table 团队相信这个模式足够通用和有用,额外的 data.table 索引选项应该调用它。想象一下,传递 mc=TRUE 将调用此模式,并在...中支持其他并行选项
iris.dt[by=Species,,.(MPL=mean(Petal.Length)), mc=TRUE, mc.preschedule=FALSE, mc.set.seed=TRUE,...]
于 2019-03-10T01:44:49.233 回答