我正在使用 R,并且在 Linux 机器上的 7 个内核上运行并行代码。
使用小型数据集时,我的代码大约需要 2 个小时并且运行良好。当使用 6 倍大的数据集时,代码需要更长的时间(可能是因为它需要交换),但随后会随机结束,有时为 10%,有时为 18、20、30% 左右。看起来完全随机。RAM 使用率通常在 90% 左右,SWAP 使用率低于 50%。
我正在使用foreach
后端doSNOW
。这是错误代码:
Error in unserialize(socklist[[n]]) : error reading from connection
Calls: %dopar% ... tryCatch -> tryCatchList -> tryCatchOne -> <Anonymous>
Execution halted
*** caught bus error ***
address 0x7f829d2adbd0, cause 'non-existent physical address'
An irrecoverable exception occurred. R is aborting now ...
outfile="outfile.out"
这是通过在makeCluster
调用中设置获得的 SNOW 输出文件:
starting worker for localhost:11567
Type: EXEC
Loading required package: MASS
Loading required package: survival
Loading required package: sp
Attaching package: 'raster'
The following objects are masked from 'package:MASS':
area, select
Attaching package: 'data.table'
The following object is masked from 'package:raster':
shift
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
Type: EXEC
*** caught bus error ***
address 0x7ffada863636, cause 'non-existent physical address'
Traceback:
1: .Call("Rsx_nc4_get_vara_double", as.integer(ncid), as.integer(varid), as.integer(c.start), as.integer(c.count), fixmiss, imvstate, as.double(passed_missval), PACKAGE = "ncdf4")
2: ncvar_get_inner(ncid2use, varid2use, nc$var[[li]]$missval, addOffset, scaleFact, start = start, count = count, verbose = verbose, signedbyte = signedbyte, collapse_degen = collapse_degen)
3: getfun(nc, varid = zvar, start = start, count = count)
4: .readBrickCellsNetCDF(x, cells, layer, nl)
5: .cellValues(x, i)
6: .doExtract(x, i, drop = drop)
7: por[i]
8: por[i]
9: as.vector(por[i])
10: mainF(as.vector(por[i]))
11: eval(expr, envir, enclos)
12: eval(.doSnowGlobals$expr, envir = .doSnowGlobals$exportenv)
13: doTryCatch(return(expr), name, parentenv, handler)
14: tryCatchOne(expr, names, parentenv, handlers[[1L]])
15: tryCatchList(expr, classes, parentenv, handlers)
16: tryCatch(eval(.doSnowGlobals$expr, envir = .doSnowGlobals$exportenv), error = function(e) e)
17: fun(quote(list(i = 23339L)))
18: do.call("fun", lapply(args, enquote))
19: docall(msg$data$fun, msg$data$args)
20: doTryCatch(return(expr), name, parentenv, handler)
21: tryCatchOne(expr, names, parentenv, handlers[[1L]])
22: tryCatchList(expr, classes, parentenv, handlers)
23: tryCatch(docall(msg$data$fun, msg$data$args), error = handler)
24: doTryCatch(return(expr), name, parentenv, handler)
25: tryCatchOne(expr, names, parentenv, handlers[[1L]])
26: tryCatchList(expr, classes, parentenv, handlers)
27: tryCatch({ msg <- recvData(master) cat(paste("Type:", msg$type, "\n")) if (msg$type == "DONE") { closeNode(master) break } else if (msg$type == "EXEC") { success <- TRUE handler <- function(e) { success <<- FALSE structure(conditionMessage(e), class = c("snow-try-error", "try-error")) } t1 <- proc.time() value <- tryCatch(docall(msg$data$fun, msg$data$args), error = handler) t2 <- proc.time() value <- list(type = "VALUE", value = value, success = success, time = t2 - t1, tag = msg$data$tag) sendData(master, value) }}, interrupt = function(e) NULL)
28: slaveLoop(makeSOCKmaster(master, port))
29: eval(expr, envir, enclos)
30: eval(quote({ master <- "localhost" port <- "" snowlib <- Sys.getenv("R_SNOW_LIB") outfile <- Sys.getenv("R_SNOW_OUTFILE") args <- commandArgs() pos <- match("--args", args) args <- args[-(1:pos)] for (a in args) { pos <- regexpr("=", a) name <- substr(a, 1, pos - 1) value <- substr(a, pos + 1, nchar(a)) switch(name, MASTER = master <- value, PORT = port <- value, SNOWLIB = snowlib <- value, OUT = outfile <- value) } if (!(snowlib %in% .libPaths())) .libPaths(c(snowlib, .libPaths())) library(methods) library(snow) if (port == "") port <- getClusterOption("port") sinkWorkerOutput(outfile) cat("starting worker for", paste(master, port, sep = ":"), "\n") slaveLoop(makeSOCKmaster(master, port))}), new.env())
31: eval(expr, envir, enclos)
32: eval(expr, p)
33: eval.parent(substitute(eval(quote(expr), envir)))
34: local({ master <- "localhost" port <- "" snowlib <- Sys.getenv("R_SNOW_LIB") outfile <- Sys.getenv("R_SNOW_OUTFILE") args <- commandArgs() pos <- match("--args", args) args <- args[-(1:pos)] for (a in args) { pos <- regexpr("=", a) name <- substr(a, 1, pos - 1) value <- substr(a, pos + 1, nchar(a)) switch(name, MASTER = master <- value, PORT = port <- value, SNOWLIB = snowlib <- value, OUT = outfile <- value) } if (!(snowlib %in% .libPaths())) .libPaths(c(snowlib, .libPaths())) library(methods) library(snow) if (port == "") port <- getClusterOption("port") sinkWorkerOutput(outfile) cat("starting worker for", paste(master, port, sep = ":"), "\n") slaveLoop(makeSOCKmaster(master, port))})
An irrecoverable exception occurred. R is aborting now ...
该代码使用本地机器的 8 个线程中的 7 个线程。foreach 调用是这样进行的:
#Packages
packageVec <- c("RcppRoll", "FAdist", "fitdistrplus", "minpack.lm", "raster", "foreach", "data.table")
#Register cluster
cl <- makeCluster(nThreads, outfile=paste0(dischargefile, ".out"))
registerDoSNOW(cl)
#Create progress bar
pb <- txtProgressBar(max = ncells, style = 3)
progress <- function(n) setTxtProgressBar(pb, n)
SNOWopts <- list(progress = progress)
#Compute
a <-
foreach(i=cells, .packages=packageVec, .combine='rbind', .options.snow = SNOWopts) %dopar% {
mainF(as.vector(por[i]))
}
stopCluster(cl)
mainF() 函数说明
很难为这个功能生成一个 MRE,因为它很复杂。代码在这里,我将在下面描述该函数的作用。我想强调一个事实,所有这些在小数据集上都非常有效,但在大数据集上却失败了,即使它包含重复多次的小数据集的相同数据。
mainF()
是一个函数,其输入por[i]
是一个 125000 个元素的向量,其输出是一个 244 个元素的向量。该函数基本上对输入向量执行 240 个滚动均值(使用),并为每个均值获取每个时间片(通常为 15)RcppRoll
的最大值(使用计算另外 240 个值并将这些值拟合(使用)到函数以获得更多参数。返回的 240 个元素和 4 个拟合参数(每个拟合 2 个)被返回。data.table
FAdist
raster
fitdistrplus
minpack.lm
如何解决上述错误?这是什么意思?