-3

表大师!这是我几天前在这里发布的问题的后续问题。我向您提出一个如何提高以下应用程序性能的问题data.table

该功能(出于设想的最快目的):

prob <- function(a, ie1, b, a1, ie2, b2, ...) {
ipf  <- function(a, b, ...) {
m    <- length(a)
n    <- length(b)
if (m < n) {
    r <- rank(c(a, b), ...)[1:m] - 1:m
} else {
    r <- rank(c(a, b), ...)[(m + 1):(m + n)] - 1:n
}
s <- ifelse((n + m)^2 > 2^31, sum(as.double(r)), sum(r))/(as.double(m) * n)
return(ifelse(m < n, s, 1 - s))
}

expand.grid.alt <- function(seq1, seq2) {
cbind(rep.int(seq1, length(seq2)), c(t(matrix(rep.int(seq2, length(seq1)), nrow =   length(seq2)))))
}

if (missing(a1) | missing(b2) | missing(ie2)) {
if (ie1 == ">") {
    return(ipf(a, b))
} else {
    return(ipf(b, a))
}
} else {
if (ie1 == ">") {
    if (ie2 == ">") {
        return(ipf(a, apply(expand.grid.alt(b, b2), 1, max))/ipf(a1, b2))
    } else {
        return(1 - ipf(apply(expand.grid.alt(b, b2), 1, min), a)/(1 - ipf(a1, b2)))
    }
 } else {
    if (ie2 == ">") {
        return(1 - ipf(a, apply(expand.grid.alt(b, b2), 1, max))/ipf(a1, b2))
    } else {
        return(ipf(apply(expand.grid.alt(b, b2), 1, min), a)/(1 - ipf(a1, b2)))
    }
}
}
}

该函数的一些注意事项:此函数允许通过秩和程序比较不同的样本。它允许有效地计算例如样本 A 的抽取超过样本 B 的抽取的概率,因为样本 A 的抽取超过了样本 C 的抽取。在这种情况下,我只想计算从 A 抽取的概率鉴于 A[.I] 的平局超过了 A[-.I] 的平局,[.I] 超过了 B 的平局。其中 .I 代表所有 ID。除此之外,我想在所有日期都这样做。这就是不好的 .SD 发挥作用的地方。请注意,对于上述任务 prob() 已经是几乎可以找到的最快任务了。

数据集:

dt <- data.table(id=rep(c(rep(1,50),rep(2,50),rep(3,100),rep(4,50),rep(5,100),rep(6,50),rep(7,50),rep(8,50),rep(9,50),rep(10,50)),5),date=c(rep("2004-01-01",600),rep("2004-02-01",600),rep("2004-03-01",600),rep("2004-04-01",600),rep("2004-05-01",600)),A=runif(3000,-5,5),B=runif(3000,-5,5))

data.table的应用:

setkey(dt, id)
setkey(dt, id)
dt[,{
.SD1 <- .SD;
.SD1[,prob(.SD1$A[.I],">",.SD1$B,.SD1$A[.I],">",.SD1$A[-.I]), by=id ]
},by=date]

在我的机器上,这个任务的执行大约需要 52.1 秒。考虑到我的真实数据集有几百万行并且全部包含在所有 57 个组(ID)中,这太过分了。您对我在这里提高性能有什么建议吗?我实际上正在寻找一个 data.table 解决方案。我认为数据表语法可能效率低下。也许可以摆脱.SD?但我也对并行化的想法持开放态度。

----------

更新

下面是现状。我并行化了带来性能改进的过程。高度赞赏如何使过程更加高效的每一个提示,因为我认为我在并行化中遗漏了一些东西 - 我预计性能会有更显着的提高。

套餐

library(multicore)
library(doMC)
library(data.table)
registerDoMC(cores=4)

数据集

dt <-     data.table(id=rep(c(rep(1,50),rep(2,50),rep(3,100),rep(4,50),rep(5,100),rep(6,50),rep(7,50),rep(8,50),rep(9,50),rep(10,50)),5),date=c(rep("2004-01-01",600),rep("2004-02-01",600),rep("2004-03-01",600),rep("2004-04-01",600),rep("2004-05-01",600)),A=runif(3000,-5,5),B=runif(3000,-5,5))

prob() 函数 OP

prob1 <- function(a, ie1, b, a1, ie2, b2, ...) {
ipf  <- function(a, b, ...) {
m    <- length(a)
n    <- length(b)
sm <- seq_len(m)
sn <- seq_len(n)
if (m < n) {
  r <- rank(c(a, b), ...)[sm] - sm
} else {
  r <- rank(c(a, b), ...)[(m + sn)] - sn
}
s <- ifelse((n + m)^2 > 2^31, sum(as.double(r)), sum(r))/(as.double(m) * n)
return(ifelse(m < n, s, 1 - s))
}


if (missing(a1) | missing(b2) | missing(ie2)) {
if (ie1 == ">") {
  return(ipf(a, b))
} else {
  return(ipf(b, a))
}
} else {
if (ie1 == ">") {
  if (ie2 == ">") {
    return(ipf(a,CJ(b, b2)[,pmax(V1,V2)])/ipf(a1, b2))
  } else {
    return(1 - ipf(CJ(b, b2)[,pmin(V1,V2)], a)/(1 - ipf(a1, b2)))
  }
} else {
  if (ie2 == ">") {
    return(1 - ipf(a,CJ(b, b2)[,pmax(V1,V2)])/ipf(a1, b2))
  } else {
    return(ipf(CJ(b, b2)[,pmin(V1,V2)], a)/(1 - ipf(a1, b2)))
  }
}
}
}

prob() 函数

prob2 <- function(a, ie1, b, a1, ie2, b2, ...) { 
ipf  <- function(a, b, ...) {
m    <- length(a)
n    <- length(b)
sm <- seq_len(m)
sn <- seq_len(n)
if (m < n) {
  r <- rank(c(a, b), ...)[sm] - sm
} else {
  r <- rank(c(a, b), ...)[(m + sn)] - sn
}
s <- if((n + m)^2 > 2^31){sum(as.double(r))/(as.double(m) * n)} else{     sum(r)/(as.double(m) * n)}
return(if(m < n){s} else{1 - s})
}

if (missing(a1) | missing(b2) | missing(ie2)) {
if (ie1 == ">") {
  return(ipf(a, b))
} else {
  return(ipf(b, a))
}
} else {
if (ie1 == ">") {
  if (ie2 == ">") {

    ipfb <- pmax(rep.int(b,length(b2)), rep(b2, each = length(b)))
    return(ipf(a, ipfb) /ipf(a1, b2))
  } else {
    ipfb <- pmin(rep.int(b,length(b2)), rep(b2, each = length(b)))
    return(1 - ipf(ipfb, a)/(1 - ipf(a1, b2)))
  }
} else {
  if (ie2 == ">") {
    ipfb <- pmax(rep.int(b,length(b2)), rep(b2, each = length(b)))
    return(1 - ipf(a, ipfb )/ipf(a1, b2))
  } else {
    ipfb <- pmin(rep.int(b,length(b2)), rep(b2, each = length(b)))
    return(ipf(ipfb, a)/(1 - ipf(a1, b2)))
  }
}
}
}

prob 函数 OP - 应用于 data.table

ptm <- proc.time()
setkey(dt, id)
res1 <- dt[,{
.SD1 <- .SD;
.SD1[,prob1(.SD1$A[.I],">",.SD1$B,.SD1$A[.I],">",.SD1$A[-.I]),by=id]},by=date]
proc.time() - ptm

user  system elapsed 
6.645   0.110   6.778 

prob 函数 mnel - 应用于 data.table

ptm <- proc.time()
setkey(dt, id)
res2 <- dt[,{
.SD1 <- .SD;
.SD1[,prob2(.SD1$A[.I],">",.SD1$B,.SD1$A[.I],">",.SD1$A[-.I]),by=id]},by=date]
proc.time() - ptm

user  system elapsed 
5.914   0.065   5.999

并行化概率函数 - OP - 应用于 data.table

ptm <- proc.time()
jo        <- dt[,list(jobs=list(parallel({.SD1 <- .SD; .SD1[,prob1(.SD1$A[.I],">",.SD1$B,.SD1$A[.I],">",.SD1$A[-.I]),by=id]}))),by=date]
res3      <- data.table(date=rep(jo[,date],each=length(unique(dt$id))),rbindlist(collect(jo[,jobs])))
proc.time() - ptm


user  system elapsed 
13.882   0.537   4.715

并行化概率函数 - mnel - 应用于 data.table

ptm <- proc.time()
jo        <- dt[,list(jobs=list(parallel({.SD1 <- .SD; .SD1[,prob2(.SD1$A[.I],">",.SD1$B,.SD1$A[.I],">",.SD1$A[-.I]),by=id]}))),by=date]
res4      <- data.table(date=rep(jo[,date],each=length(unique(dt$id))),rbindlist(collect(jo[,jobs])))
proc.time() - ptm

user  system elapsed 
13.682   0.560   4.545
4

1 回答 1

5

data.table这与使用无关.SD 如果您分析原始调用(profr例如使用),您会注意到大部分时间都花在t.default了.FUNapply

library(profr); library(ggplot2
xpr <- profr(dt[,{
  .SD1 <- .SD;
  .SD1[,prob(.SD1$A[.I],">",.SD1$B,.SD1$A[.I],">",.SD1$A[-.I]), by=id ]
  },by=date])

ggplot(xpr)

在此处输入图像描述

您可以重写您的函数以删除对和expand.grid.alt的不必要调用matrixt

 expand.grid.alt <- function(seq1,seq2){
   cbind(rep.int(seq1,length(seq2)), rep(seq2,each=length(seq1)))
 }

但这并不能解决您apply用来获取两行矢量化最大值和最小值的事实。您可以使用pminorpmax来执行此操作(即使是一个小示例,也可以增加 8-9 倍)

pm <- function(seq1,seq2){
 pmax(rep.int(seq1,length(seq2)), rep(seq2, each = length(seq1)))}
ea <- function(seq1, seq2) {
  apply(cbind(rep.int(seq1, length(seq2)), rep(seq2, each = length(seq1))),1,max)
  }
 eaorig <- function(seq1, seq2) {
   cbind(rep.int(seq1, length(seq2)), 
         c(t(matrix(rep.int(seq2, length(seq1)), nrow =   length(seq2)))))
    }
 eao <- function(seq1,seq2) {apply( eaorig(seq1,seq2), 1, max)}

 library(microbenchmark)

 microbenchmark(pm(1:5,2:8), ea(1:5,2:8),eao(1:5,2:8))
Unit: microseconds
          expr    min      lq  median      uq     max neval
  pm(1:5, 2:8) 10.867 11.7730 12.6790 13.2820  56.446   100
  ea(1:5, 2:8) 80.895 83.6130 85.5745 88.4420 172.054   100
 eao(1:5, 2:8) 91.460 94.0265 95.6860 99.3085 137.341   100

然后我们可以重新定义

prob <- function(a, ie1, b, a1, ie2, b2, ...) {
  ipf  <- function(a, b, ...) {
    m    <- length(a)
    n    <- length(b)
    sm <- seq_len(m)
    sn <- seq_len(n)
    if (m < n) {
      r <- rank(c(a, b), ...)[sm] - sm
    } else {
      r <- rank(c(a, b), ...)[(m + sn)] - sn
    }
    s <- if((n + m)^2 > 2^31){sum(as.double(r))/(as.double(m) * n)} else{ sum(r)/(as.double(m) * n)}
    return(if(m < n){s} else{1 - s})
  }

  if (missing(a1) | missing(b2) | missing(ie2)) {
    if (ie1 == ">") {
      return(ipf(a, b))
    } else {
      return(ipf(b, a))
    }
  } else {
    if (ie1 == ">") {
      if (ie2 == ">") {

        ipfb <- pmax(rep.int(b,length(b2)), rep(b2, each = length(b)))
        return(ipf(a, ipfb) /ipf(a1, b2))
      } else {
        ipfb <- pmin(rep.int(b,length(b2)), rep(b2, each = length(b)))
        return(1 - ipf(ipfb, a)/(1 - ipf(a1, b2)))
      }
    } else {
      if (ie2 == ">") {
        ipfb <- pmax(rep.int(b,length(b2)), rep(b2, each = length(b)))
        return(1 - ipf(a, ipfb )/ipf(a1, b2))
      } else {
        ipfb <- pmin(rep.int(b,length(b2)), rep(b2, each = length(b)))
        return(ipf(ipfb, a)/(1 - ipf(a1, b2)))
      }
    }
  }
}

重写后prob,我们得到了 10 倍的速度提升

# avoiding .SD
system.time({ 
 # create lists of B by 'date
 Bs <- dt[,list(x = list(B)),keyby='date'];

 dt[,{
   z <- .BY
   BB <- Bs[z[[1]]][['x']][[1]]
   AA <- dt[!list(z[['id']]), A[date == z[['date']]]]
  prob(A, '>',BB,A,'>',AA) 
 },by='date,id']  
 })
   user  system elapsed 
   4.66    0.00    4.67 

# using original .SD approach 
system.time( dt[,{
  .SD1 <- .SD;
  .SD1[,prob(.SD1$A[.I],">",.SD1$B,.SD1$A[.I],">",.SD1$A[-.I]), by=id ]
 },by=date])
   user  system elapsed 
   4.51    0.00    4.52 

 # using probo == original function

system.time( dt[,{
 .SD1 <- .SD;
 .SD1[,probo(.SD1$A[.I],">",.SD1$B,.SD1$A[.I],">",.SD1$A[-.I]), by=id ]
},by=date])

 user  system elapsed 
  43.98    0.02   44.01

使用 OP 的更新功能进行编辑

CJ在调用中使用和 pmax[.

# if we compare this with the updated version using `CJ`

system.time( dt[,{
     .SD1 <- .SD;
     .SD1[,prob1(.SD1$A[.I],">",.SD1$B,.SD1$A[.I],">",.SD1$A[-.I]), by=id ]
 },by=date])
   user  system elapsed 
  17.23    0.00   17.27 

由于调用的开销以及在创建的键上设置键[.data.table的事实,这会更慢CJdata.table

于 2013-10-08T23:56:27.453 回答