0

我的问题是在两个非常大的矩阵的配对列之间应用欧几里得距离,即(只有 x[,1] 和 y[,1],...,x[,n] 和 y[,n])。这很简单,但我需要为非常大的文件执行此操作。使用 parLapply 以正常方式将其设置为并行化需要很长时间才能将 x,y 输入 clusterExport。我尝试使用 bigmemory 包,但不断收到错误报告(找不到文件 PRED.desc)。为了进一步提高速度,我尝试分块处理它们。

library(parallel)
library(doParallel)
library(bigmemory) 

### Fake Random Data ##
PRED<-matrix(rnorm(10**9,0,1),nrow=1000)
ACTUAL<-matrix(rnorm(10**9,0,1),nrow=1000)
Names<-paste0("Stock_Number_",1:10**6)
###

big_PRED<-bigmemory::as.big.matrix(PRED,type="double", descriptorfile="PRED.desc")
big_ACTUAL<-bigmemory::as.big.matrix(ACTUAL,type="double", descriptorfile="ACTUAL.desc")

NUMCores<-parallel::detectCores()-1
cl <- parallel::makePSOCKcluster(NUMCores); doParallel::registerDoParallel(cl)
L<- ncol(PRED); inds <- split(seq_len(L), sort(rep_len(seq_len(NUMCores), L)))
DistFunction<-function(xi,yi){PRED=attach.big.matrix("PRED.desc")
                              ACTUAL=attach.big.matrix("ACTUAL.desc")
                              VEC<-Vectorize(function(xi,yi){dist(rbind(PRED[,xi],Actual[,yi]))}) 
                              return(VEC(xi,yi))
}

clusterExport(cl, varlist=c("DistFunction","inds","attach.big.matrix"))
clusterEvalQ(cl, library(bigmemory))
parEucDist<-function(clVAR){parallel::parSapply(clVAR,seq_along(inds), function(UU){Index=inds[[UU]];return(DistFunction(Index,Index))})}
full_EDist<-parEucDist(clVAR=cl)
parallel::stopCluster(cl)

我也尝试了以下方法,但它们都给了我 Serialize(data, node$con) 中的错误:写入连接时出错

#### bigmemory ###
NUMCores<-parallel::detectCores()-1
cl <- parallel::makePSOCKcluster(NUMCores); 
doParallel::registerDoParallel(cl)
L<- ncol(PRED); inds <- split(seq_len(L), sort(rep_len(seq_len(NUMCores), L)))
DistFunction<-function(xi,yi){VEC<-Vectorize(function(xi,yi){dist(rbind(PRED[,xi],Actual[,yi]))}); return(VEC(xi,yi))}
distVEC=matrix(NA,ncol=length(seq_along(inds)),nrow=length(inds[[1]]))
clusterExport(cl, varlist=c("DistFunction","inds","attach.big.matrix"))
clusterEvalQ(cl, library(bigmemory))
full_EDist<-foreach(i=seq_along(inds), .combine=c) 
                %dopar% { 
                     Index=inds[[i]]
                     PRED=attach.big.matrix("PRED.desc")
                     ACTUAL=attach.big.matrix("ACTUAL.desc")
                     return(DistFunction(Index,Index))}
parallel::stopCluster(cl)

#### bigstatsr ###
require(bigstatsr)
FBM_PRED<-bigstatsr::as_FBM(PRED,type="double")
FBM_ACTUAL<-bigstatsr::as_FBM(ACTUAL,type="double")

NUMCores<-parallel::detectCores()-1
cl <- parallel::makePSOCKcluster(NUMCores)
doParallel::registerDoParallel(cl)
L<- ncol(PRED); inds <- split(seq_len(L), sort(rep_len(seq_len(NUMCores), L)))
DistFunction<-function(xi,yi){VEC<-Vectorize(function(xi,yi) {dist(rbind(PRED[,xi],Actual[,yi]))}); return(VEC(xi,yi))}
distVEC=matrix(NA,ncol=length(seq_along(inds)),nrow=length(inds[[1]]))
clusterExport(cl, varlist=c("DistFunction","inds"))
clusterEvalQ(cl, library(bigstatsr))
full_EDist<-foreach(i=seq_along(inds), .combine=c) 
             %dopar% { Index=inds[[i]]
                       PRED=FBM_PRED[,Index]
                       ACTUAL=FBM_ACTUAL[,Index]
                       distVEC[,i]<-DistFunction(Index,Index)}
parallel::stopCluster(cl)
4

1 回答 1

0

没关系。我让它工作。

### Fake Random Data ##
PRED<-matrix(rnorm(10**9,0,1),nrow=1000)
ACTUAL<-matrix(rnorm(10**9,0,1),nrow=1000)
Names<-paste0("Stock_Number_",1:10**6)
###


require(bigstatsr)
FBM_PRED<-bigstatsr::as_FBM(PRED,type="double")
FBM_ACTUAL<-bigstatsr::as_FBM(ACTUAL,type="double")

NUMCores<-parallel::detectCores()-1
cl <- parallel::makePSOCKcluster(NUMCores); doParallel::registerDoParallel(cl)
L<- ncol(PRED); inds <- split(seq_len(L), sort(rep_len(seq_len(NUMCores), L)))
DistFunction<-function(xi,yi,PRED,ACTUAL){VEC<-Vectorize(function(xi,yi){dist(rbind(PRED[,xi],ACTUAL[,yi]))}); return(VEC(xi,yi))}
clusterExport(cl, varlist=c("DistFunction","inds","FBM_PRED","FBM_ACTUAL"))
clusterEvalQ(cl, library(bigstatsr))
parEucDist<-function(clVAR){parallel::parSapply(clVAR,seq_along(inds), 
                               function(UU){Index=inds[[UU]]
                                            return(DistFunction(Index,Index,FBM_PRED,FBM_ACTUAL))})}
full_EDist<-parEucDist(clVAR=cl)
full_EDist<-do.call(c,full_EDist)
parallel::stopCluster(cl)
于 2021-10-28T16:45:36.027 回答