2

我已经阅读了一些关于这些主题的问题以及一些教程,但未能解决我的问题,所以决定问自己。

我收集了大量类型为 A、B、C 的大文件;在某些情况下,我需要将 B、C 与 A 分开。我在具有 64 个 CPU 和 240GB 的远程服务器上工作,所以我很自然地想同时使用它的电源和处理。我拥有的一个重要知识是,如果 a_i 文件只能与 b_i 成功连接,b_(i+1) 来自 B,对于 C 也是如此。我最初的尝试是为 'a_i' 文件创建一个 'join_i' 函数,然后运行它并行(我有 448 个文件)。然而,没有显着的时间改进,事实上,当我观察性能时——可悲的是,CPU 的负载百分比非常低。据我所知,我认为瓶颈是 IO,尤其是因为所有文件都很大。这是一个有效的假设吗?无论如何,在第二次尝试时,我决定按顺序浏览每个文件,但在每次迭代中使用并行优势。但是,经过多次尝试,我也没有在这里获得任何运气。我试图在下面做一个最小的例子,其中并行要慢得多(实际上在我的真实数据上它冻结了)。这里有什么问题?是代码错误还是对 R 中的并行工作原理有更深的误解?此外,我尝试了一些 multidplyr 和 mclapply,但在这两种情况下也没有运气。我还想指出,读取文件需要的不仅仅是连接本身:在 1 次迭代中读取大约需要 30 秒(我使用 fread,通过 cmd 在其中解压缩),而连接大约需要 10 秒。鉴于此,这里最好的策略是什么?提前致谢!我试图在下面做一个最小的例子,其中并行要慢得多(实际上在我的真实数据上它冻结了)。这里有什么问题?是代码错误还是对 R 中的并行工作原理有更深的误解?此外,我尝试了一些 multidplyr 和 mclapply,但在这两种情况下也没有运气。我还想指出,读取文件需要的不仅仅是连接本身:在 1 次迭代中读取大约需要 30 秒(我使用 fread,通过 cmd 在其中解压缩),而连接大约需要 10 秒。鉴于此,这里最好的策略是什么?提前致谢!我试图在下面做一个最小的例子,其中并行要慢得多(实际上在我的真实数据上它冻结了)。这里有什么问题?是代码错误还是对 R 中的并行工作原理有更深的误解?此外,我尝试了一些 multidplyr 和 mclapply,但在这两种情况下也没有运气。我还想指出,读取文件需要的不仅仅是连接本身:在 1 次迭代中读取大约需要 30 秒(我使用 fread,通过 cmd 在其中解压缩),而连接大约需要 10 秒。鉴于此,这里最好的策略是什么?提前致谢!在 1 次迭代中读取大约需要 30 秒(我使用 fread,通过 cmd 在其中解压缩),而加入大约需要 10 秒。鉴于此,这里最好的策略是什么?提前致谢!在 1 次迭代中读取大约需要 30 秒(我使用 fread,通过 cmd 在其中解压缩),而加入大约需要 10 秒。鉴于此,这里最好的策略是什么?提前致谢!

library(dplyr) 

A=data.frame(cbind('a', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
B=data.frame(cbind('b', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
C=data.frame(cbind('c', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)


chunk_join=function(i, A, B, C)
{
  A_i=A %>% filter(X2==i)
  B_i=B %>% filter(X2==i) %>% select(X1, X3)
  C_i=C %>% filter(X2==i) %>% select(X1, X3)
  join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
}

library(parallel)
library(foreach)
cl = parallel::makeCluster(10)
doParallel::registerDoParallel(cl)

# not parallel 

s1=Sys.time()
join1=data.frame()
join1 = foreach(j=1:10, .combine='rbind', 
                .packages=c('dplyr'), 
                .export=c('chunk_join','A', 'B', 'C')) %do%
                {
                  join_i=chunk_join(j, A, B, C)
                }
t1=Sys.time()-s1
colnames(join1)[4:5]=c('joinedB', 'joinedC')
r1=c(sum(!is.na(join1$joinedB)), sum(!is.na(join1$joinedC)))

# parallel 
s2=Sys.time()
join2=data.frame()
join2 = foreach(j=1:10, .combine='rbind', 
                .packages=c('dplyr'), 
                .export=c('chunk_join','A', 'B', 'C')) %dopar%
                {
                  join_i=chunk_join(j, A, B, C)
                }
t2=Sys.time()-s2
stopCluster(cl)
colnames(join2)[4:5]=c('joinedB', 'joinedC')
r2=c(sum(!is.na(join2$joinedB)), sum(!is.na(join2$joinedC)))

R=rbind(r1, r2)
T=rbind(t1, t2)

R
T

在我的服务器上,%do% 大约需要 5s,%dopar% 需要超过 1m。请注意,这是为了加入本身,甚至不考虑制作集群的时间。顺便说一句,有人也可以评论我应该有多少个集群吗?比如说,我在 X 个均匀大小的块上划分数据并有 Y 个 CPU 可用——我应该尽可能多地放置 Y,还是 X,或其他数量的集群?

4

1 回答 1

2

多线程速度慢的原因有两个:

1) 数据传输到新线程 2) 数据从新线程传输回主线程

在 unix 系统上使用 可以完全避免问题 #1 mclapply,除非数据被修改,否则它不会复制数据。(makeCluster默认情况下使用套接字传输数据)。

使用 无法避免问题 #2 mclapply,但您可以做的是尽量减少传输回主线程的数据量。

天真的麦克拉普利:

join3 = mclapply(1:10, function(j) {
  join_i=chunk_join(j, A, B, C)
}, mc.cores=4) %>% rbindlist

稍微聪明一点的 mclapply:

chunk_join2=function(i, A, B, C)
{
  A_i=A %>% filter(X2==i)
  B_i=B %>% filter(X2==i) %>% select(X1, X3)
  C_i=C %>% filter(X2==i) %>% select(X1, X3)
  join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
  join_i[,c(-1,-2,-3)]
}
A <- arrange(A, X2)
join5 = mclapply(1:10, function(j) {
  join_i=chunk_join2(j, A, B, C)
}, mc.cores=4) %>% rbindlist
join5 <- cbind(A, join5)

基准:

Single threaded: 4.014s 

Naive mclapply: 1.860 s

Slightly smarter mclapply: 1.363 s

如果您的数据有很多列,您可以看到问题 #2 将如何完全使系统陷入困境。例如,您可以通过返回 B 和 C 的索引而不是整个 data.frame 子集来做得更好。

于 2019-04-10T22:16:31.647 回答