0

foreach我正在尝试使用该包将一个函数应用于 r 中的多个组/ID 。使用并行处理 via 需要永远运行%dopar%,所以我想知道是否可以在via或其他包中运行applyor for 循环部分以使其更快。我不熟悉或其他可以做到这一点的软件包,所以我希望了解这是否可能。示例代码如下。我的实际功能更长,有超过 20 个输入,运行时间比我发布的还要长c++rcppc++

我很感激帮助。

编辑: 我意识到我最初的问题很模糊,所以我会努力做得更好。我有一个按组包含时间序列数据的表。每个组有 > 10K 行。我在c++viarcpp中编写了一个函数,该函数按组过滤表并应用一个函数。我想遍历独特的组并像rbind使用一样组合结果,rcpp以便它运行得更快。请参阅下面的示例代码(我的实际功能更长)

library(data.table)
library(inline)
library(Rcpp)
library(stringi)
library(Runuran)

# Fake data
DT <- data.table(Group = rep(do.call(paste0, Map(stri_rand_strings, n=10, length=c(5, 4, 1),
                                                   pattern = c('[A-Z]', '[0-9]', '[A-Z]'))), 180))

df <- DT[order(Group)][
  , .(Month = seq(1, 180, 1),
      Col1 = urnorm(180, mean = 500, sd = 1, lb = 5, ub = 1000), 
      Col2 = urnorm(180, mean = 1000, sd = 1, lb = 5, ub = 1000), 
      Col3 = urnorm(180, mean = 300, sd = 1, lb = 5, ub = 1000)), 
  by = Group
  ]

# Rcpp function
#include <Rcpp.h>
using namespace Rcpp;
// [[Rcpp::plugins(cpp11)]]

// [[Rcpp::export]]
DataFrame testFunc(DataFrame df, StringVector ids, double var1, double var2) {

  // Filter by group
  using namespace std;  
  StringVector sub = df["Group"];
  std::string level = Rcpp::as<std::string>(ids[0]);
  Rcpp::LogicalVector ind(sub.size());
  for (int i = 0; i < sub.size(); i++){
    ind[i] = (sub[i] == level);
  }

  // Access the columns
  CharacterVector Group = df["Group"];
  DoubleVector Month = df["Month"];
  DoubleVector Col1 = df["Col1"];
  DoubleVector Col2 = df["Col2"];
  DoubleVector Col3 = df["Col3"];


  // Create calculations
  DoubleVector Cola = Col1 * (var1 * var2);
  DoubleVector Colb = Col2 * (var1 * var2);
  DoubleVector Colc = Col3 * (var1 * var2);
  DoubleVector Cold = (Cola + Colb + Colc);

  // Result summary
  std::string Group_ID = level;
  double SumCol1 = sum(Col1);
  double SumCol2 = sum(Col2);
  double SumCol3 = sum(Col3);
  double SumColAll = sum(Cold);

  // return a new data frame
  return DataFrame::create(_["Group_ID"]= Group_ID, _["SumCol1"]= SumCol1,
                            _["SumCol2"]= SumCol2, _["SumCol3"]= SumCol3, _["SumColAll"]= SumColAll);
}

# Test function
Rcpp::sourceCpp('sample.cpp')
testFunc(df, ids = "BFTHU1315C", var1 = 24, var2 = 76) # ideally I would like to loop through all groups (unique(df$Group))

#     Group_ID  SumCol1 SumCol2  SumCol3  SumColAll
# 1 BFTHU1315C 899994.6 1798561 540001.6 5907129174

提前致谢。

4

1 回答 1

2

我建议重新考虑我们的方法。我假设您的测试数据集与您的真实数据集相当,有 3e8 行。我估计大约 10 GB 的数据。您似乎对这些数据执行以下操作:

  • 确定唯一ID列表(约5e5)
  • 每个唯一 ID 创建一个任务
  • 这些任务中的每一个都获取完整的数据集并过滤掉所有不属于相关 ID 的数据
  • 这些任务中的每一个都添加了一些不依赖于 ID 的附加列
  • 每个任务做一个group_b(ID),但数据集中只剩下一个ID
  • 每个任务计算一些简单的方法

对我来说,这似乎是非常低效的内存使用。一般来说,对于这样的问题,你会想要“共享内存并行”,但foreach只给你“进程并行”。进程并行的缺点是它增加了内存成本。

此外,您正在丢弃基本 R / dplyr / data.table / SQL 引擎 / 中存在的所有分组和聚合代码......您或任何在这里阅读您的问题的人都不太可能能够改进这些现有的代码库。

我的建议:

  • 忘记“进程并行性”(暂时)
  • 如果您有足够的 RAM,请尝试使用带有/ /的简单dplyr管道。mutategroup_bysummarize
  • 如果这还不够快,请了解聚合如何与 一起工作data.table,众所周知,它更快并通过 OpenMP 提供“共享内存并行”。
  • 如果您的计算机没有足够的内存并且正在交换,那么请寻找内存不足计算的可能性。我个人会使用(嵌入式)数据库。

为了使这一点更明确。这是data.table唯一的解决方案:

library(data.table)
library(stringi)

# Fake data
set.seed(42)
var1 <- 24
var2 <- 76

DT <- data.table(Group = rep(do.call(paste0, Map(stri_rand_strings, n=10, length=c(5, 4, 1),
                                                 pattern = c('[A-Z]', '[0-9]', '[A-Z]'))), 180))
setkey(df, Group)

df <- DT[order(Group)][
  , .(Month = seq(1, 180, 1),
      Col1 = rnorm(180, mean = 500, sd = 1), 
      Col2 = rnorm(180, mean = 1000, sd = 1), 
      Col3 = rnorm(180, mean = 300, sd = 1)), 
  by = Group
  ][, c("Cola", "Colb", "Colc") := .(Col1 * (var1 * var2), 
                                     Col2 * (var1 * var2),
                                     Col3 * (var1 * var2))
    ][, Cold := Cola + Colb + Colc]


# aggregagation
df[, .(SumCol1 = sum(Col1),
       SumCol2 = sum(Col2),
       SumCol3 = sum(Col3),
       SumColAll = sum(Cold)), by = Group]

我通过引用添加计算列。聚合步骤使用 提供的分组功能data.table。如果您的聚合更复杂,您还可以使用一个函数:

# aggregation function
mySum <- function(Col1, Col2, Col3, Cold) {
  list(SumCol1 = sum(Col1),
       SumCol2 = sum(Col2),
       SumCol3 = sum(Col3),
       SumColAll = sum(Cold))
}

df[, mySum(Col1, Col2, Col3, Cold), by = Group]

如果在使用 C++ 时聚合可能更快(不是这样的情况sum!),您甚至可以使用它:

# aggregation function in C++
Rcpp::cppFunction('
Rcpp::List mySum(Rcpp::NumericVector Col1, 
                 Rcpp::NumericVector Col2, 
                 Rcpp::NumericVector Col3, 
                 Rcpp::NumericVector Cold) {
    double SumCol1 = Rcpp::sum(Col1);
    double SumCol2 = Rcpp::sum(Col2);
    double SumCol3 = Rcpp::sum(Col3);
    double SumColAll = Rcpp::sum(Cold);             
    return Rcpp::List::create(Rcpp::Named("SumCol1") = SumCol1,
                              Rcpp::Named("SumCol2") = SumCol2,
                              Rcpp::Named("SumCol3") = SumCol3,
                              Rcpp::Named("SumColAll") = SumColAll);
}
')

df[, mySum(Col1, Col2, Col3, Cold), by = Group]

在所有这些示例中,摸索和循环都留给了data.table,因为您自己这样做不会有任何收获。

于 2018-11-18T23:02:53.610 回答