我已经阅读了一些关于这些主题的问题以及一些教程,但未能解决我的问题,所以决定问自己。
我收集了大量类型为 A、B、C 的大文件;在某些情况下,我需要将 B、C 与 A 分开。我在具有 64 个 CPU 和 240GB 的远程服务器上工作,所以我很自然地想同时使用它的电源和处理。我拥有的一个重要知识是,如果 a_i 文件只能与 b_i 成功连接,b_(i+1) 来自 B,对于 C 也是如此。我最初的尝试是为 'a_i' 文件创建一个 'join_i' 函数,然后运行它并行(我有 448 个文件)。然而,没有显着的时间改进,事实上,当我观察性能时——可悲的是,CPU 的负载百分比非常低。据我所知,我认为瓶颈是 IO,尤其是因为所有文件都很大。这是一个有效的假设吗?无论如何,在第二次尝试时,我决定按顺序浏览每个文件,但在每次迭代中使用并行优势。但是,经过多次尝试,我也没有在这里获得任何运气。我试图在下面做一个最小的例子,其中并行要慢得多(实际上在我的真实数据上它冻结了)。这里有什么问题?是代码错误还是对 R 中的并行工作原理有更深的误解?此外,我尝试了一些 multidplyr 和 mclapply,但在这两种情况下也没有运气。我还想指出,读取文件需要的不仅仅是连接本身:在 1 次迭代中读取大约需要 30 秒(我使用 fread,通过 cmd 在其中解压缩),而连接大约需要 10 秒。鉴于此,这里最好的策略是什么?提前致谢!我试图在下面做一个最小的例子,其中并行要慢得多(实际上在我的真实数据上它冻结了)。这里有什么问题?是代码错误还是对 R 中的并行工作原理有更深的误解?此外,我尝试了一些 multidplyr 和 mclapply,但在这两种情况下也没有运气。我还想指出,读取文件需要的不仅仅是连接本身:在 1 次迭代中读取大约需要 30 秒(我使用 fread,通过 cmd 在其中解压缩),而连接大约需要 10 秒。鉴于此,这里最好的策略是什么?提前致谢!我试图在下面做一个最小的例子,其中并行要慢得多(实际上在我的真实数据上它冻结了)。这里有什么问题?是代码错误还是对 R 中的并行工作原理有更深的误解?此外,我尝试了一些 multidplyr 和 mclapply,但在这两种情况下也没有运气。我还想指出,读取文件需要的不仅仅是连接本身:在 1 次迭代中读取大约需要 30 秒(我使用 fread,通过 cmd 在其中解压缩),而连接大约需要 10 秒。鉴于此,这里最好的策略是什么?提前致谢!在 1 次迭代中读取大约需要 30 秒(我使用 fread,通过 cmd 在其中解压缩),而加入大约需要 10 秒。鉴于此,这里最好的策略是什么?提前致谢!在 1 次迭代中读取大约需要 30 秒(我使用 fread,通过 cmd 在其中解压缩),而加入大约需要 10 秒。鉴于此,这里最好的策略是什么?提前致谢!
library(dplyr)
A=data.frame(cbind('a', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
B=data.frame(cbind('b', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
C=data.frame(cbind('c', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
chunk_join=function(i, A, B, C)
{
A_i=A %>% filter(X2==i)
B_i=B %>% filter(X2==i) %>% select(X1, X3)
C_i=C %>% filter(X2==i) %>% select(X1, X3)
join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
}
library(parallel)
library(foreach)
cl = parallel::makeCluster(10)
doParallel::registerDoParallel(cl)
# not parallel
s1=Sys.time()
join1=data.frame()
join1 = foreach(j=1:10, .combine='rbind',
.packages=c('dplyr'),
.export=c('chunk_join','A', 'B', 'C')) %do%
{
join_i=chunk_join(j, A, B, C)
}
t1=Sys.time()-s1
colnames(join1)[4:5]=c('joinedB', 'joinedC')
r1=c(sum(!is.na(join1$joinedB)), sum(!is.na(join1$joinedC)))
# parallel
s2=Sys.time()
join2=data.frame()
join2 = foreach(j=1:10, .combine='rbind',
.packages=c('dplyr'),
.export=c('chunk_join','A', 'B', 'C')) %dopar%
{
join_i=chunk_join(j, A, B, C)
}
t2=Sys.time()-s2
stopCluster(cl)
colnames(join2)[4:5]=c('joinedB', 'joinedC')
r2=c(sum(!is.na(join2$joinedB)), sum(!is.na(join2$joinedC)))
R=rbind(r1, r2)
T=rbind(t1, t2)
R
T
在我的服务器上,%do% 大约需要 5s,%dopar% 需要超过 1m。请注意,这是为了加入本身,甚至不考虑制作集群的时间。顺便说一句,有人也可以评论我应该有多少个集群吗?比如说,我在 X 个均匀大小的块上划分数据并有 Y 个 CPU 可用——我应该尽可能多地放置 Y,还是 X,或其他数量的集群?