3

我有一个大型数据库(~100Gb),我需要从中提取每个条目,对其进行一些比较,然后存储这些比较的结果。我试图在单个 R 会话中运行并行查询,但没有成功。我可以一次运行多个 R 会话,但我正在寻找更好的方法。这是我尝试的:

library(RSQLite)
library(data.table)
library(foreach)
library(doMC)



#---------
# SETUP
#---------


#connect to db
db <- dbConnect(SQLite(), dbname="genes_drug_combos.sqlite")


#---------
# QUERY
#---------
# 856086 combos = 1309 * 109 * 6

registerDoMC(8)

#I would run 6 seperate R sessions (one for each i)
res_list <- foreach(i=1:6) %dopar% {

  a <- i*109-108
  b <- i*109

  pb  <- txtProgressBar(min=a, max=b, style=3)
  res <- list()

  for (j in a:b) {

    #get preds for drug combos
    statement   <- paste("SELECT * from combo_tstats WHERE rowid BETWEEN", (j*1309)-1308, "AND", j*1309)
    combo_preds <- dbGetQuery(db, statement)

    #here I do some stuff to the result returned from the query
    combo_names <- combo_preds$drug_combo
    combo_preds <- as.data.frame(t(combo_preds[,-1]))

    colnames(combo_preds)  <- combo_names

    #get top drug combos
    top_combos <- get_top_drugs(query_genes, drug_info=combo_preds, es=T)

    #update progress and store result
    setTxtProgressBar(pb, j)
    res[[ length(res)+1 ]] <- top_combos
  }
  #bind results together
  res <- rbindlist(res)
}

我没有收到任何错误,但只有一个核心启动。相比之下,如果我运行多个 R 会话,我所有的核心都会这样做。我究竟做错了什么?

4

1 回答 1

4

RSQLite我在使用同一个文件 SQLite 数据库同时访问时学到了一些东西:

1. 确保每个工人都有自己的数据库连接。

  parallel::clusterEvalQ(cl = cl, {
    db.conn <- RSQLite::dbConnect(RSQLite::SQLite(), "./export/models.sqlite");
    RSQLite::dbClearResult(RSQLite::dbSendQuery(db.conn, "PRAGMA busy_timeout=5000;"));
  })

2.使用PRAGMA busy_timeout=5000;

默认情况下,它设置为 0,并且每次您的工作人员在数据库锁定时尝试写入数据库时​​,您最终可能会遇到“数据库已锁定”错误。以前的代码PRAGMA在每个工作人员连接中设置了这个。请注意,SELECT操作永远不会被锁定,只有INSERT/DELETE/UPDATE.

3.使用PRAGMA journal_mode=WAL;

这只需要设置一次,并且默认情况下永远保持打开状态。它将向数据库添加两个(或多或少永久)文件。它将提高并发读/写性能。在这里阅读更多。

使用上述设置,我还没有遇到过这个问题。

于 2017-06-08T20:30:09.417 回答