2

尝试此代码

library(future)
library(foreach)

ncores <- 3
cl <- parallel::makeCluster(3)
avail <- bigstatsr::FBM(ncores, 1, type = "integer", init = 1)
doFuture::registerDoFuture()

res <- vector("list", 5)
for (i in seq_along(res)) {

  while (sum(avail[]) == 0) {
    cat("Waiting..\n")
    Sys.sleep(0.5)
  }
  ind.avail <- which(avail[] == 1)
  cat("Available:", length(ind.avail), "\n")

  plan(cluster, workers = cl[ind.avail])
  foo <- foreach(i = 3:1) %dopar% {
    Sys.sleep(i)
  }

  print(one <- ind.avail[1])
  avail[one] <- 0; print(avail[])
  res[[i]] <- cluster(workers = cl[one], {
    Sys.sleep(5)
    avail[one] <- 1
    i
  })
}

sapply(res, resolved)
parallel::stopCluster(cl)

我得到的错误:Initialization of plan() failed, because the test future used for validation failed. The reason was: Unexpected result (of class ‘NULL’ != ‘FutureResult’) retrieved for ClusterFuture future (label = ‘&lt;none>’, expression = ‘NA’)

我试图重现我的真实问题的示例的解释:

  • 我在两个步骤中循环了很多次(这里是 5 次)
  • 第一步很容易与 foreach 并行
  • 第二步不容易并行化,依赖于第一步

所以我的想法是在所有可用集群上并行化第一步,并仅使用一个集群异步运行第二步。在此异步作业完成之前,此集群将不再可用。然后下一步将减少一个可用的集群,依此类推。当第一步没有可用的集群时,它会等待一些异步作业完成并释放一些集群。

4

1 回答 1

1

我可以重现这个。我相信您正在设法通过调用一个集群节点来破坏与主 R 进程和集群节点的通信,该plan()集群节点包含尚未带回主 R 进程的未来结果。(我试图想出一个这种类型的腐败的更简单的例子,但如果不花更多的时间,这并不明显。)

未来的框架已经检测到这一点(因此出现错误)。我已经更新了未来的开发版本,以提供更多关于正在发生的事情的线索和证据:

Error: Initialization of plan() failed, because the test future used for
  validation failed. The reason was: Unexpected result (of class ‘character’
  != ‘FutureResult’) retrieved for ClusterFuture future (label = 
  ‘future-plan-test’, expression = ‘NA’): future-grmall. This suggests that
  the communication with ClusterFuture worker (‘SOCKnode’ #1) is out of sync.

我认为你可以通过确保在再次使用它们的工人之前收集已解决的期货的价值来解决这个问题。该plan(cluster, ...)调用验证至少可以成功解决一个未来。

于 2019-01-07T04:59:47.230 回答