经过一些研究和@MartinMorgan 的非常有用的回答,我决定做一些类似于他的回答的事情。我创建了一些模拟 findInterval 的函数,而不检查 vec 是否已排序。显然,当 x 的长度为 1 并且您一遍又一遍地调用它时,这会产生很大的不同。如果 x 的长度 >> 1 并且您可以利用 vectorizacion,则 findInterval 仅检查一次 vec 是否已排序。在以下代码块中,我创建了一些查找间隔的变体
- findInterval2,它是用 R 编写的 findInterval,作为没有排序检查的二进制搜索
- findInterval2comp,即用 cmpfun 编译的 findInterval2
- findInterval3,即 findInterval 用 C 语言编写,作为使用 inline 包编译的二进制搜索
之后,我创建了 2 个函数来测试
- testByOne,对长度为 1 的 x 运行 findInterval
- testVec,它使用矢量化
对于 testVec,我创建的所有函数都使用 Vectorize 在 x 参数中进行了矢量化。
之后,我用微基准测试了执行时间。
代码
require(inline)
# findInterval written in R as a binary search
findInterval2 <- function(x,v) {
n = length(v)
if (x<v[1])
return (0)
if (x>=v[n])
return (n)
i=1
k=n
while({j = (k-i) %/% 2 + i; !(v[j] <= x && x < v[j+1])}) {
if (x < v[j])
k = j
else
i = j+1
}
return (j)
}
findInterval2Vec = Vectorize(findInterval2,vectorize.args="x")
#findInterval2 compilated with cmpfun
findInterval2Comp <- cmpfun(findInterval2)
findInterval2CompVec <- Vectorize(findInterval2Comp,vectorize.args="x")
findInterval2VecComp <- cmpfun(findInterval2Vec)
findInterval2CompVecComp <- cmpfun(findInterval2CompVec)
sig <-signature(x="numeric",v="numeric",n="integer",idx="integer")
code <- "
if (*x < v[0]) {
*idx = -1;
return;
}
if (*x >= v[*n-1]) {
*idx = *n-1;
return;
}
int i,j,k;
i = 0;
k = *n-1;
while (j = (k-i) / 2 + i, !(v[j] <= *x && *x < v[j+1])) {
if (*x < v[j]) {
k = j;
}
else {
i = j+1;
}
}
*idx=j;
return;
"
fn <- cfunction(sig=sig,body=code,language="C",convention=".C")
# findInterval written in C
findIntervalC <- function(x,v) {
idx = as.integer(-1)
as.integer((fn(x,v,length(v),idx)$idx)+1)
}
findIntervalCVec <- Vectorize(findIntervalC,vectorize.args="x")
# The test case where x is of length 1 and you call findInterval several times
testByOne <- function(f,reps = 100, vlength = 300000, xs = NULL) {
if (is.null(xs))
xs = seq(from=1,to=vlength-1,by=vlength/reps)
v = 1:vlength
for (x in xs)
f(x,v)
}
# The test case where you can take advantage of vectorization
testVec <- function(f,reps = 100, vlength = 300000, xs = NULL) {
if (is.null(xs))
xs = seq(from=1,to=vlength-1,by=vlength/reps)
v = 1:vlength
f(xs,v)
}
基准测试
microbenchmark(fi=testByOne(findInterval),fi2=testByOne(findInterval2),fi2comp=testByOne(findInterval2Comp),fic=testByOne(findIntervalC))
Unit: milliseconds
expr min lq median uq max neval
fi 617.536422 648.19212 659.927784 685.726042 754.12988 100
fi2 11.308138 11.60319 11.734305 12.067857 71.98640 100
fi2comp 2.293874 2.52145 2.637388 5.036558 62.01111 100
fic 368.002442 380.81847 416.137318 424.250337 474.31542 100
microbenchmark(fi=testVec(findInterval),fi2=testVec(findInterval2Vec),fi2compVec=testVec(findInterval2CompVec),fi2vecComp=testVec(findInterval2VecComp),fic=testByOne(findIntervalCVec))
Unit: milliseconds
expr min lq median uq max neval
fi 4.218191 4.986061 6.875732 10.216228 68.51321 100
fi2 12.982914 13.786563 16.738707 19.102777 75.64573 100
fi2compVec 4.264839 4.650925 4.902277 9.892413 13.32756 100
fi2vecComp 13.000124 13.689418 14.072334 18.911659 76.19146 100
fic 840.446529 893.445185 908.549874 919.152187 1047.84978 100
一些观察
- 我的 C 代码一定有问题,不会那么慢
- 最好先编译再向量化,也就是先向量化再编译
- 奇怪的是 fi2comp 跑得比 fi2 快
- 再次编译矢量化编译函数不会提高其性能