4

我必须从 Web API (NCBI entrez) 中检索一个大型数据集,该数据集将我限制为每秒一定数量的请求,例如 10 个(示例代码将限制您在没有 API 密钥的情况下为三个)。我正在使用 furrr 的 future_* 函数来并行化请求以尽快获取它们,如下所示:

library(tidyverse)
library(rentrez)
library(furrr)

plan(multiprocess)

api_key <- "<api key>"
# this will return a crap-ton of results
srch <- entrez_search("nuccore", "Homo sapiens", use_history=T, api_key=api_key)

total <- srch$count
per_request <- 500 # get 500 records per parallel request
nrequest <- total %/% per_request + as.logical(total %% per_request)

result <- future_map(seq(nrequest),function(x) {
  rstart <- (x - 1) * per_request
  return(entrez_fetch(
    "nuccore",
    web_history = srch$web_history,
    rettype="fasta",
    retmode="xml",
    retstart=rstart,
    retmax=per_request,
    api_key=api_key
  ))
}

显然,对于nrequest > 10(或任何限制)的情况,我们将立即违反速率限制。

我看到了两个看似显而易见的简单解决方案,这两个似乎都有效。一种是在发出请求之前引入一个随机的短延迟,如下所示:

future_map(seq(nrequest),function(x) {
  Sys.sleep(runif(1,0,5))
  # ...do the request...
}

第二种是将并发请求的数量限制在速率限制内,通过plan(multiprocess,workers=<max_concurrent_requests>)或使用将信号量设置为速率限制的semaphore包,如下所示:

# this sort of assumes individual requests take long enough to cause
# a wait for the semaphore to be long enough
# for this case, they do
rate_limit <- 10
lock = semaphore(rate_limit)
result <- future_map(seq(nrequest),function(x) {
  rstart <- (x - 1) * per_request
  acquire(lock)
  s <- entrez_fetch(
    "nuccore",
    web_history = srch$web_history,
    rettype="fasta",
    retmode="xml",
    retstart=rstart,
    retmax=per_request,
    api_key=api_key
  )
  release(lock)
  return(s)
}

但是,我真正想做的是限制请求率而不是并发请求的数量。Quentin Pradet有一篇很棒的文章,介绍了如何在 python 中使用异步 io http 请求来做到这一点。我试图将其调整为 R,但遇到了这样的问题:future_* 函数中跨线程/进程共享的任何变量都被复制而不是实际共享,因此修改(即使受信号量锁保护)不会在之间共享线程/进程,因此无法实现我们所依赖的计数器存储桶以使该方法起作用!

有没有一种巧妙的方法来限制并行请求的速率,而不必限制同时请求的数量?还是我想太多了,应该坚持限制数量?

4

0 回答 0