0

我正在使用 foreach 在一些 R 代码中并行化一个简单的循环。一切正常,而且我得到了可接受的加速 - 除了 foreach 循环的输出“缺少”一些结果,因为(似乎)它们被复制了。换句话说,我假设在循环增量之前将相同的工作发送给每个工作人员,而不是在每个工作人员空闲时将其推送给每个工作人员。

我使用 doSNOW 作为并行后端(R 版本 2.15.3,foreach 版本 1.4.1,doSNOW 版本 1.0.9)。代码基本上如下:

library(foreach)
library(doSNOW)
the.cores <- 2
cl <- makeCluster( rep ("localhost",the.cores), type="SOCK" )
registerDoSNOW(cl)

getRows <- function(fileToRead, numberOfRows, rowsToSkip){
return( read.csv(fileToRead, numberOfRows, rowsToSkip, stringsAsFactors=FALSE) )
}

doCalculation <- function(x){
# do some stuff with x
return(result)
}

calculationTest <- function(fileToRead, numberOfRows, rowsToSkip){
theData <- getRows(fileToRead, numberOfRows, rowsToSkip)
calcs <- doCalculation(theData)
return(result)
}


final.results <- foreach(i=1:n) %dopar% {
theResult <- lapply(aFile, calcTest, i=i, nrows=numberOfRows,  rowstoskip=rowsToSkip)
}

问题伴随结果而来。我的机器上有 2 个物理核心和 4 个逻辑核心,结果遵循类似的模式 - 即,将 n 和核心数量设置如下,结果是:

n = 6
the.cores <- 2
unlist(final.results)
1 1 2 2 3 3 

同样,对于

n = 6
the.cores <- 4

我明白了

unlist(final.results)
1 1 1 1 2 2

串行计算并手动检查的正确结果是:

unlist(final.results)
[1] 1 2 3 4 5

其他一切都很好:我只是有点困惑,因为我认为结果会在每个工人空闲时被推送给他们,因此应该准确地复制连续结果。我还假设在这个非常简单的例子中(它只是为了加快一些中等大小的计算!)对于每个工人来说,没有必要将 foreach 循环分成显式块:我是不是在这种想法?由于 lapply 语句中的函数调用其他函数,其中第一个函数从文件中读取数值块,然后调用其他函数对块执行计算,这可能是问题所在吗?

最后,如果我设置

the.cores <- 1

要复制串行计算,结果完全正确 - 即

unlist(final.results)
[1] 1 2 3 4 5

非常感谢任何可以弥补我的无知的解释!:-)

编辑:请注意,在以下测试示例中使用上述代码,一切正常。

library(foreach)
library(doSNOW)
the.cores <- 2
cl <- makeCluster( rep ("localhost",the.cores), type="SOCK" )
registerDoSNOW(cl)

my.fun <- function(x) { x^2 }
the.output <- foreach(i=1:10) %dopar% {
my.fun(i)
}

给出预期的结果:

[1]   1   4   9  16  25  36  49  64  81 100
4

1 回答 1

1

我可能对你想要做什么感到困惑,但我猜你想与每个工作人员并行处理一个文件,每个工作人员都在读取它自己的文件块。为此,我将在 foreach 循环中使用两个迭代变量。这是一个使用虚拟calcTest函数的示例,该函数仅返回两个关键输入参数来演示该技术:

library(doSNOW)
library(iterators)
the.cores <- 4
cl <- makeSOCKcluster(the.cores)
registerDoSNOW(cl)
totalRows <- 1000
nrows <- unlist(as.list(idiv(totalRows, chunks=the.cores)))
skip <- cumsum(c(0, nrows))[1:the.cores]
calcTest <- function(fileToRead, numberOfRows, rowsToSkip) {
  c(numberOfRows, rowsToSkip)
}
aFile <- 'file.dat'
final.results <- foreach(numberOfRows=nrows, rowsToSkip=skip) %dopar% {
  calcTest(aFile, numberOfRows, rowsToSkip)
}

执行此操作时,final.results变为:

> final.results
[[1]]
[1] 250   0

[[2]]
[1] 250 250

[[3]]
[1] 250 500

[[4]]
[1] 250 750

所以第一个工作人员处理 1-250 行,第二个工作人员处理 251-500 等。这基本上是你想要做的吗?

于 2013-11-10T20:08:51.360 回答