1

我想使用parallelReduceof并行化一个 for 循环,RcppParalleljoin对结果很困难,因为每次迭代的并行输出的维度是先验未知的。使用foreachR 中的包,这可以通过foreach使用来完成.combine=rbind

library(foreach)
library(doParallel)

# some dummy function which, as a function of the iteration index, returns a Boolean
lengthy_test <- function(row_of_data){
  if(runif(1)<0.5){
    return(TRUE)
  }else{
    return(FALSE)
  }
}

input <- matrix(NA,1000,1)

output = foreach(i=1:1000,.combine=rbind)%dopar%{
  if(lengthy_test(input[i,])){
    n_rows_i = 1            # n_rows_i is the number of rows that we would like to add
  }else{
    n_rows_i = 2
  }
  matrix(rnorm(n_rows_i*3),nrow =n_rows_i, ncol = 3)
}

一个人怎么能达到同样的效果parallelReduce?关键问题是编写Split构造函数和join运算符,因为先验未知矩阵的哪一行与input矩阵的哪一行相关联output

在我的用例中, 的维度output是先验已知的,因此输出矩阵的预分配(或不带 的构造WorkerSplit没有问题。

outputC++ 中的串行实现不是问题,因为可以简单地携带一个计数器来计算已经填充了多少行矩阵。如何并行完成这对我来说并不明显。

编辑

由于 F. Privé 问:这是我尝试改编RcppParallel 文档中的代码的方法

// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;

struct Test : public Worker
{   
   // source matrix
   const RMatrix<double> input;

   // number of rows of output matrix
   int nrows;   

   // output matrix

   RMatrix<double> output;
   
   // constructors
   Test(const NumericMatrix input) : input(input), nrows(input.nrow()*2) output(nrows, 3) {}
   Test(const Test& test, Split) : input(test.input) {
     nrows = 0;
     for (int i = 0; i < input.nrow(); i++) {
       if(Rcpp::runif(1)<0.5){
         nrows = nrows + 1;
       }else{
         nrows = nrows + 2;
       }
       // now I know the dimension I have to assign to output
       // i.e. nrows rows and 3 columns 
       // but I don't know _which_ rows I have to assign
       // this is why I thought that parallelReduce might be better than parallelFor 
       // since I think I can just do this and worry about the joining later    

       output = Rmatrix(Rcpp::rnorm(nrows*3),nrows,3);
   }
   
   // accumulate just the element of the range I've been asked to
   void operator()(std::size_t begin, std::size_t end) {
      // perhaps I can leave this empty 
      // since all the work is done in the split constructor
      // but I'm not sure that the split constructor is always called
   }
     
   // join my output with that of another Test
   void join(const Test& rhs) { 
      //  here the rbind equivalent is needed
      // the order of binding is not imporant for me (maybe for others, it is)

      output = rbind(output,rhs.output); 
   }
};

然后,可以这样称呼它

// [[Rcpp::export]]
double Testcombine(NumericMatrix x){
   
   // declare the Test instance 
   Test test(x);
   
   // call parallel_reduce to start the work
   parallelReduce(0, x.nrow(), test);
   
   // return the computed test
   return test.output;
}
4

0 回答 0