0

我对 Rcpp 比较陌生,最近学习了如何从 R 传输我的代码。但是,它仍然不够快,我正在尝试实现 RcppParallel,但运气不佳。基本上,我有一个 Gibbs 采样器通过 Metropolis-Hastings 更新一堆变量,因为我的大多数变量都没有封闭形式的后验。我终于能够让我的代码在没有错误的情况下运行,但不幸的是,我使用 RcppParallel 的代码与我的旧代码的速度相同,我不知道为什么。

为简化起见,假设我想更新一个变量 U,即 ap by r 矩阵。我有一个函数 logFC_U 来计算对数后验,然后有一个函数 MH_U 来执行 Metropolis-Hastings 步骤并返回 U 的更新列。

#include <RcppDist.h>
#include <RcppArmadillo.h>
#include <RcppParallel.h>

#include <cmath>
#include <random>
#include <string>
#include <vector>

using namespace Rcpp;
using namespace arma;
using namespace RcppParallel;

// [[Rcpp::depends(RcppArmadillo, RcppDist, RcppParallel)]]

double logFC_U(arma::vec &var_val, std::size_t j, 
                const arma::mat &Y,     const arma::mat &X, 
                const arma::mat &eta,   const arma::mat &lambda,
                const arma::mat &U0,    const arma::mat &V, 
                const arma::vec &delta, double tau){
  double lp = 0; 
  mat U = U0;
  U.col(j) = var_val;
  mat B = U*diagmat(delta)*V.t();
  double SS = arma::as_scalar(sum(sum(log(1+exp(X*B+eta*lambda.t())))));
  lp += arma::as_scalar(-0.5*tau*U.col(j).t()*U.col(j) 
                          + delta(j)*U.col(j).t()*X.t()*Y*V.col(j)-SS);
  return lp;
}

arma::vec MH_U(std::size_t i, std::size_t  j, 
               const arma::mat &Y,     const arma::mat &X, 
               const arma::mat &eta,   const arma::mat &lambda,
               const arma::mat &U,     const arma::mat &V, 
               const arma::vec &delta, double tau, double eps) {
  arma::mat U_new = U;
  arma::vec var_current, var_new;
  arma::mat B = U*diagmat(delta)*V.t();
  
  int p = U.n_rows;
  var_current = U.col(j); 
  var_new = U.col(j);
  var_new(i) = rnorm(1, var_current(i), eps)(0);
  U_new.col(j) = var_new;
  
  arma::vec res = vec(p+1, arma::fill::zeros); // first element will store acceptance int
  
  // get log posterior for var_current, var_new
  double l0 = logFC_U(var_current, j, Y, X, eta, lambda, U,     V, delta, tau);
  double l1 = logFC_U(var_new,     j, Y, X, eta, lambda, U_new, V, delta, tau);
  
  // acceptance probability
  double prob = std::min(1.0, arma::as_scalar(exp(l1-l0)));
  
  // sample binomial(1,prob)
  res(0) = R::rbinom(1, prob);
  
  // return var_current if acc=0 and var_new if acc=1
  res.subvec(1, p) = res(0)*var_new + (1-res(0))*var_current;
  return res;
}

然后我有一个函数循环遍历 U 的所有元素,一次更新一个元素,并返回另一个矩阵,该矩阵基本上是两个彼此相邻堆叠的矩阵:第一个显示哪些元素已更新或未使用 1 和 0,接下来是更新后的 U 矩阵。

// [[Rcpp::export]]
arma::mat RegUpdateU(const arma::mat &Y,     const arma::mat &X, 
                     const arma::mat &eta,   const arma::mat &lambda,
                     const arma::mat &U,     const arma::mat &V, 
                     const arma::vec &delta, double tau, double eps){
  
  arma::mat res(U.n_rows, 2*U.n_cols);
  
  for(int i=0; i<U.n_rows; i++){
    for(int j=0; j<U.n_cols; j++){
      arma::vec prop = MH_U(i, j, Y, X, eta, lambda, U, V, delta, tau, eps);
      res(i,j) = prop(0);
      res(i,j+U.n_cols) = prop(i+1);
    }
  }
  
  return(res);
}

这是我尝试并行化的步骤。由于我需要使用 RcppArmadillo(我认为)来执行所有矩阵乘法,因此我使用在另一篇文章中找到的一种技术将输入转换为 arma::mat 和 arma::vec。我还尝试将输入保留为 NumericMatrix 和 NumericVector,然后在 MH_U 函数内将它们转换为 arma::mat 和 arma::vec,但出现错误,即没有匹配函数可调用“MH_U”,并且我无法解决这个问题。

struct UpdateU : public Worker {
  
  // inputs
  const double eps;
  const double tau;
  const RMatrix<double> Y;
  const RMatrix<double> X;
  const RMatrix<double> Eta;
  const RMatrix<double> Lambda;
  const RMatrix<double> U;
  const RMatrix<double> V;
  const RVector<double> Delta;
  std::string proposal;
  std::size_t n, d, p, r, q;
  
  // output matrix to write to
  RMatrix<double> res;
  
  UpdateU(const NumericMatrix Y,     const NumericMatrix X,
          const NumericMatrix Eta,   const NumericMatrix Lambda,
          const NumericMatrix U,     const NumericMatrix V,
          const NumericVector Delta, const double tau, const double eps,
          std::size_t n, std::size_t d, std::size_t p, 
          std::size_t q, std::size_t r, NumericMatrix res)
    : Y(Y), X(X), Eta(Eta), Lambda(Lambda), U(U), V(V), Delta(Delta), 
      tau(tau), eps(eps), n(n), d(d), p(p), q(q), r(r), res(res) {}
  
  // convert inputs to arma::mat
  arma::mat convertY(){
    RMatrix<double> y = Y;
    arma::mat MAT(y.begin(), n, d, false);
    return MAT;
  }
  arma::mat convertX(){
    RMatrix<double> x = X;
    arma::mat MAT(x.begin(), n, p, false);
    return MAT;
  }
  arma::mat convertEta(){
    RMatrix<double> eta = Eta;
    arma::mat MAT(eta.begin(), n, q, false);
    return MAT;
  }
  arma::mat convertLambda(){
    RMatrix<double> lambda = Lambda;
    arma::mat MAT(lambda.begin(), d, q, false);
    return MAT;
  }
  arma::mat convertU(){
    RMatrix<double> u = U;
    arma::mat MAT(u.begin(), p, r, false);
    return MAT;
  }
  arma::mat convertV(){
    RMatrix<double> v = V;
    arma::mat MAT(v.begin(), d, r, false);
    return MAT;
  }
  arma::vec convertDelta(){
    RVector<double> delta = Delta;
    arma::vec VEC(delta.begin(), r, false);
    return VEC;
  }
  
  // function call operator that work for the specified range (begin/end)
  void operator()(std::size_t begin, std::size_t end) {
    for (std::size_t i = begin; i < end; i++) {
      for(std::size_t j=0; j<U.ncol(); j++){
        arma::mat y = convertY();
        arma::mat x = convertX();
        arma::mat eta = convertEta();
        arma::mat lambda = convertLambda();
        arma::mat u = convertU();
        arma::mat v = convertV();
        arma::vec delta = convertDelta();
        
        arma::vec prop = MH_U(i, j, y, x, eta, lambda, u, v, delta, tau, eps);
        
        // write to output matrix
        res(i,j) = prop(0);
        res(i,j+U.ncol()) = prop(i+1);
      }
    }
  }
};

NumericMatrix ParallelUpdateU(NumericMatrix Y, NumericMatrix X, NumericMatrix eta, NumericMatrix lambda, 
                          NumericMatrix U, NumericMatrix V, NumericVector delta, double tau, double eps) {
  
  // allocate the matrix we will return
  NumericMatrix res(U.nrow(), 2*U.ncol());
  
  std::size_t n = Y.nrow(), d = Y.ncol(), p = X.ncol(), q = eta.ncol(), r = U.ncol();
  
  // create the worker
  UpdateU updateU(Y, X, eta, lambda, U, V, delta, tau, eps, n, d, p, q, r, res);
  
  // call it with parallelFor
  std::size_t numThreads=8;
  parallelFor(0, U.nrow(), updateU, numThreads);
  
  return res;
}

有谁知道为什么我的原始函数 RegUpdateU 和并行化版本 ParallelUpdateU 的速度相同?

library(rbenchmark)
n=1000
d=5
p=3
q=2
r=3
U=matrix(0.1, nrow=p, ncol=r)
V=matrix(0.1, nrow=d, ncol=r)
delta=rep(0.1, length=r)
Y=matrix(rbinom(n*d, 1, 0.5), nrow=n)
X=matrix(rbinom(n*p, 1, 0.5), nrow=n)
b=matrix(rnorm(n*q), nrow=n)
lambda=matrix(1, nrow=d, ncol=q)
tauU=1
epsU=1

benchmark(ParallelUpdateU(Y, X, b, lambda, U, V, delta, tauU, epsU),
          RegUpdateU(Y, X, b, lambda, U, V, delta, tauU, epsU))

如果有人可以帮助我,我将不胜感激。我已经在这上面花了几个星期,但还没有弄清楚。现在,我的整个 Gibbs 采样器需要 90 多个小时才能运行的数据远不及我想用于解决问题的真实数据的大小。这是我在 stackoverflow 上的第一篇文章,所以如果我需要提供任何其他信息,请告诉我。谢谢!!!!!

4

0 回答 0