我想使用parallelReduce
of并行化一个 for 循环,RcppParallel
但join
对结果很困难,因为每次迭代的并行输出的维度是先验未知的。使用foreach
R 中的包,这可以通过foreach
使用来完成.combine=rbind
:
library(foreach)
library(doParallel)
# some dummy function which, as a function of the iteration index, returns a Boolean
lengthy_test <- function(row_of_data){
if(runif(1)<0.5){
return(TRUE)
}else{
return(FALSE)
}
}
input <- matrix(NA,1000,1)
output = foreach(i=1:1000,.combine=rbind)%dopar%{
if(lengthy_test(input[i,])){
n_rows_i = 1 # n_rows_i is the number of rows that we would like to add
}else{
n_rows_i = 2
}
matrix(rnorm(n_rows_i*3),nrow =n_rows_i, ncol = 3)
}
一个人怎么能达到同样的效果parallelReduce
?关键问题是编写Split
构造函数和join
运算符,因为先验未知矩阵的哪一行与input
矩阵的哪一行相关联output
。
在我的用例中, 的维度output
是先验已知的,因此输出矩阵的预分配(或不带 的构造Worker
)Split
没有问题。
output
C++ 中的串行实现不是问题,因为可以简单地携带一个计数器来计算已经填充了多少行矩阵。如何并行完成这对我来说并不明显。
编辑
由于 F. Privé 问:这是我尝试改编RcppParallel 文档中的代码的方法
// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;
struct Test : public Worker
{
// source matrix
const RMatrix<double> input;
// number of rows of output matrix
int nrows;
// output matrix
RMatrix<double> output;
// constructors
Test(const NumericMatrix input) : input(input), nrows(input.nrow()*2) output(nrows, 3) {}
Test(const Test& test, Split) : input(test.input) {
nrows = 0;
for (int i = 0; i < input.nrow(); i++) {
if(Rcpp::runif(1)<0.5){
nrows = nrows + 1;
}else{
nrows = nrows + 2;
}
// now I know the dimension I have to assign to output
// i.e. nrows rows and 3 columns
// but I don't know _which_ rows I have to assign
// this is why I thought that parallelReduce might be better than parallelFor
// since I think I can just do this and worry about the joining later
output = Rmatrix(Rcpp::rnorm(nrows*3),nrows,3);
}
// accumulate just the element of the range I've been asked to
void operator()(std::size_t begin, std::size_t end) {
// perhaps I can leave this empty
// since all the work is done in the split constructor
// but I'm not sure that the split constructor is always called
}
// join my output with that of another Test
void join(const Test& rhs) {
// here the rbind equivalent is needed
// the order of binding is not imporant for me (maybe for others, it is)
output = rbind(output,rhs.output);
}
};
然后,可以这样称呼它
// [[Rcpp::export]]
double Testcombine(NumericMatrix x){
// declare the Test instance
Test test(x);
// call parallel_reduce to start the work
parallelReduce(0, x.nrow(), test);
// return the computed test
return test.output;
}