1

我知道我遇到的问题是线程安全问题。因为我现在拥有的代码将使用“seThreadOptions(1)”执行。我的问题是什么是克服这个问题的好习惯。

我知道这一点:带有 Rcpp 和 RcppParallel via std::shared_ptr 的线程安全函数指针将以某种方式发挥作用。而且我也一直在思考/尝试将内部功能作为并行工作者结构的一部分。实际上,我正在调用两个内部函数,我希望一个是可变的,另一个是恒定的,这让我倾向于认为我需要 2 个解决方案。

错误是 rstudio 中的 R 会话崩溃。这里有两件事需要注意: 1. 如果我设置线程选项(1),它运行良好。2. 如果我将“myfunc”移动到主 cpp 文件并简单地调用“myfunc”,这也可以正常运行。

下面是一个详细的例子:

第一个cpp文件:

// [[Rcpp::depends(RcppArmadillo)]]
// [[Rcpp::interfaces(cpp)]]
// [[Rcpp::plugins(cpp11)]]
#include "RcppArmadillo.h"
using namespace arma;
using namespace std;

// [[Rcpp::export]]
double myfunc(arma::vec vec_in){

  int Len = arma::size(vec_in)[0];
  return (vec_in[0] +vec_in[1])/Len;
}

二、cpp文件:

// [[Rcpp::depends(RcppArmadillo)]]
// [[Rcpp::depends(RcppParallel)]]
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::depends(ParallelExample)]]

#include "RcppArmadillo.h"
#include "RcppParallel.h"
#include "ParallelExample.h"
#include <random>
#include <memory>
#include <math.h>

using namespace Rcpp;
using namespace arma;
using namespace RcppParallel;
using namespace std;

struct PARALLEL_WORKER : public Worker{

  const arma::vec &input;
  arma::vec &output;

  PARALLEL_WORKER(const arma::vec &input, arma::vec &output) : input(input), output(output) {}

  void operator()(std::size_t begin, std::size_t end){


    std::mt19937 engine(1);

    // Create a loop that runs through a selected section of the total Boot_reps
    for( int k = begin; k < end; k ++){
      engine.seed(k);
      arma::vec index = input;
      std::shuffle( index.begin(), index.end(), engine);

      output[k] = ParallelExample::myfunc(index);
  }
}

};

// [[Rcpp::export]]
arma::vec Parallelfunc(int Len_in){

  arma::vec input = arma::regspace(0, 500);
  arma::vec output(Len_in);

  PARALLEL_WORKER  parallel_woker(input, output);
  parallelFor( 0, Len_in, parallel_woker);
  return output;
}

Makevars,因为我使用的是 Macintosh:

CXX_STD = CXX11

PKG_CXXFLAGS +=  -I../inst/include

和命名空间:

exportPattern("^[[:alpha:]]+")
importFrom(Rcpp, evalCpp)
importFrom(RcppParallel,RcppParallelLibs)
useDynLib(ParallelExample, .registration = TRUE)

export(Parallelfunc)
4

2 回答 2

1

当您调用 时ParallelExample::myfunc,您正在调用 中定义的函数inst/include/ParallelExample_RcppExport.h,该函数使用 R API。这是在并行环境中不能做的事情。我看到两种可能性:

  1. 转换myfunc为仅标题并将其包含在int/include/ParallelExample.h.
  2. 如果第二个 cpp 文件在同一个包中,请为myfuncinto放置一个合适的声明src/first.h,将该文件包含在两个src/first.cppandsrc/second.cpp中,并调用myfunc而不是ParallelExample::myfunc. 毕竟,如果您只想在同一个包中调用它,则无需向 R 注册函数。使用 R 注册是针对从外部调用的函数。
于 2018-08-23T10:38:19.643 回答
0

在某些方面,这有点违背了 Rcpp 的内置接口 cpp 功能的目的。

首先,cpp 保存为“ExampleInternal.h”:

// [[Rcpp::depends(RcppArmadillo)]]
// [[Rcpp::plugins(cpp11)]]
#include "RcppArmadillo.h"
using namespace arma;
using namespace std;

namespace ExampleInternal
{

  double myfunc3(arma::vec vec_in){

    int Len = arma::size(vec_in)[0];
    return (vec_in[0] +vec_in[1])/Len;
  }


}

第二:

#include "ParallelExample.h"
#include "ExampleInternal.h"
#include <random>
#include <memory>
#include <math.h>

using namespace Rcpp;
using namespace arma;
using namespace RcppParallel;
using namespace ExampleInternal;
using namespace std;

struct PARALLEL_WORKER : public Worker{

  const arma::vec &input;
  arma::vec &output;

  PARALLEL_WORKER(const arma::vec &input, arma::vec &output) : input(input), output(output) {}

  void operator()(std::size_t begin, std::size_t end){


    std::mt19937 engine(1);

    // Create a loop that runs through a selected section of the total Boot_reps
    for( int k = begin; k < end; k ++){
      engine.seed(k);
      arma::vec index = input;
      std::shuffle( index.begin(), index.end(), engine);

      output[k] = ExampleInternal::myfunc3(index);
  }
}

};

// [[Rcpp::export]]
arma::vec Parallelfunc(int Len_in){

  arma::vec input = arma::regspace(0, 500);
  arma::vec output(Len_in);

  PARALLEL_WORKER  parallel_woker(input, output);
  parallelFor( 0, Len_in, parallel_woker);
  return output;
}
于 2018-08-25T06:43:01.003 回答