6

我正在尝试设置一个并行任务,每个工作人员都需要进行数据库查询。我正在尝试为每个工作人员设置一个连接,如this question中所示,但每次我尝试它都会<Expired PostgreSQLConnection:(2781,0)>为我注册的工作人员返回。

这是我的代码:

cl <- makeCluster(detectCores())
registerDoParallel(cl)

clusterEvalQ(cl, {
  library(RPostgreSQL)
  drv<-dbDriver("PostgreSQL")
  con<-dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost")

})

如果我尝试运行我foreach的错误,它会失败task 1 failed - "expired PostgreSQLConnection"

当我进入 postgres 服务器状态时,它会显示所有已创建的活动会话。

我在与我的主 R 实例中的 postgres 交互时没有任何问题。

如果我跑

clusterEvalQ(cl, {
  library(RPostgreSQL)
  drv<-dbDriver("PostgreSQL")
  con<-dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost")
  dbGetQuery(con, "select inet_client_port()")

})

然后它将返回所有客户端端口。它没有给我过期的通知,但是如果我尝试运行我的 foreach 命令,它将失败并出现相同的错误。

编辑:

我已经在 Ubuntu 和 2 台 Windows 计算机上尝试过,它们都给出了相同的错误。

另一个编辑:

现在3台windows电脑

4

1 回答 1

8

我能够在本地重现您的问题。我不完全确定,但我认为问题与clusterEvalQ内部工作方式有关。例如,您说这dbGetQuery(con, "select inet_client_port()) 给了您客户端端口输出。如果查询实际上是在集群节点上评估/执行的,那么您将无法看到此输出(与您无法直接读取在外部集群节点上执行的任何其他输出或打印语句相同的方式)。

因此,据我了解,评估首先在本地环境中执行,相关函数和变量随后被复制/导出到各个集群节点。这适用于任何其他类型的函数/变量,但显然不适用于数据库连接。如果连接/端口映射链接到主 R 实例,则连接将无法从从属实例工作。clusterExport如果您尝试使用该函数来导出在主实例上创建的连接,您也会得到完全相同的错误。

作为替代方案,您可以在各个foreach任务中创建单独的连接。我已经用本地数据库验证了以下工作:

library(doParallel)
nrCores = detectCores()
cl <- makeCluster(nrCores)
registerDoParallel(cl)
clusterEvalQ(cl,library(RPostgreSQL))
clusterEvalQ(cl,library(DBI))

result <- foreach(i=1:nrCores) %dopar%
{
  drv <- dbDriver("PostgreSQL")
  con <- dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost")
  queryResult <- dbGetQuery(con, "fetch something...")
  dbDisconnect(con)
  return(queryResult)
}
stopCluster(cl)

但是,现在您必须考虑到每次foreach迭代都会创建和断开新连接。因此,您可能会产生一些性能开销。您显然可以通过智能地拆分查询/数据来规避这种情况,以便在同一次迭代中完成大量工作。理想情况下,您应该将工作分配到您可用的尽可能多的内核中。

于 2015-06-28T20:22:19.337 回答