0

我正在尝试使用 R 从 MySQL 数据库中并行获取数据。以下代码一一获取数据并且工作正常。但我想通过发送多个查询并将其保存到不同的变量中来加速这个过程。稍后我将合并变量内的时间序列。

library(RMySQL)
dbConnect(MySQL(), user='external', password='xxxxxxx', dbname='GMT_Minute_Data', host='xx.xx.xxx.xxx')

sqlData <-select TradeTime, Open, High, Low, Close from ad where tradetime between ‘2014-01-01’ and ‘2015-10-20’
data1= dbFetch(sqlData, n=-1)
sqlData <-select TradeTime, Open, High, Low, Close from ty where tradetime between ‘2014-01-01’ and ‘2015-10-20’
data2 = dbFetch(sqlData, n=-1)
sqlData <-select TradeTime, Open, High, Low, Close from ax where tradetime between ‘2014-01-01’ and ‘2015-10-20’
data3 = dbFetch(sqlData, n=-1)

connections <- dbListConnections(MySQL())
for(i in connections) {dbDisconnect(i)}

我尝试使用以下代码并行获取数据:

library(foreach)
library(doParallel)
library(RMySQL)

fetchData<- function(nInst, inst1, inst2, inst3, inst4, inst5, startDate, endDate, con1){

  inst<-NULL
  sqlData <-NULL

  if(nInst==1)
    inst<-inst1
  else if(nInst==2)
    inst<-inst2
  else if(nInst==3)
    inst<-inst3
  else if(nInst==4)
    inst<-inst4
  else if(nInst==5)
    inst<-inst5

  sqlData <- dbSendQuery(con1, paste0('select TradeTime, Open, High, Low, Close from ', inst, ' where tradetime between \'',  startDate, '\' and \'',  endDate, '\'' ))
  data1 = dbFetch(sqlData, n=-1)
  print(head(data1))

  data1 
}

cluster = makeCluster(5, type = "SOCK")
registerDoParallel(cluster)
mydb <- NULL
clusterEvalQ(cluster, {

  mydb <- dbConnect(MySQL(), user='external', password='xxxxxx', dbname='GMT_Minute_Data', host='xx.xx.xxx.xxx')
  NULL
})


allDataList<-foreach(n =1:2, .verbose=TRUE, .packages=('RMySQL')) %dopar% {
  fetchData(n, inst1, inst2, inst3, inst4, inst5, startDate, endDate, mydb)

}
stopCluster(cluster)
on.exit(dbDisconnect(mydb))

有时代码只为第一个仪器获取数据,而不是为其余的仪器获取数据。

如果有人知道解决方案,请提供帮助。

谢谢,

4

1 回答 1

0

我认为问题在于 foreach 将mydb变量自动导出给工作人员,从而违背了使用它进行初始化的目的clusterEvalQ。数据库连接无法正确序列化并发送到其他机器,这就是为什么使用clusterEvalQ. foreach.verbose=TRUE选项让您验证它mydb不是自动导出的。如果它说它是自动导出的,则需要阻止它。

在您的示例中,您可以mydb通过简单地删除语句来防止自动导出mydb <- NULL,但我建议您使用 foreach.noexport='mydb'选项来确保它永远不会自动导出。这是一个简化的示例:

library(doParallel)

fetchData <- function(ignore) {
  mydb
}

cluster <- makeCluster(5, type = "SOCK")
registerDoParallel(cluster)

clusterEvalQ(cluster, {
  mydb <- sample(100, 1) # different value for each worker
  NULL
})

r <- foreach(n=1:2, .noexport='mydb', .verbose=TRUE) %dopar% { 
  fetchData(n)
}

在这种情况下,foreach 分析该fetchData函数并注意到它使用了一个名为 的变量mydb。因此,如果mydb在主服务器上定义,它将自动导出它,除非您告诉它不要这样做。这就是为什么我建议使用.noexport='mydb'即使它没有在本地环境中定义。它可以双重确保您的函数不使用损坏的数据库连接。

于 2016-04-22T13:32:49.400 回答