6

我有一个单元格、值和坐标的 data.frame。它驻留在全球环境中。

> head(cont.values)
   cell value   x   y
1 11117    NA -34 322
2 11118    NA -30 322
3 11119    NA -26 322
4 11120    NA -22 322
5 11121    NA -18 322
6 11122    NA -14 322

因为我的自定义函数需要将近一秒钟来计算单个单元格(并且我有数万个单元格要计算),所以我不想重复计算已经有值的单元格。我的以下解决方案试图避免这种情况。每个单元格都可以独立计算,为并行执行而尖叫。

我的函数实际上做的是检查是否有指定单元格编号的值,如果它是 NA,它会计算它并插入它来代替 NA。

我可以使用 apply 系列函数运行我的魔法函数(结果是value对应的),并且从内部,我可以毫无问题地读写(它在全局环境中)。cellapplycont.values

现在,我想并行运行它(使用snowfall),我无法从单个核心读取或写入这个变量。

问题:当并行执行函数时,什么解决方案能够从工作人员(核心)内部读取/写入驻留在全局环境中的动态变量。有没有更好的方法来做到这一点?

4

3 回答 3

4

当然,这将取决于所讨论的功能是什么,但恐怕在snowfall那里不会有太大帮助。问题是,您必须将整个数据帧导出到不同的核心(请参阅 参考资料?sfExport),并且仍然要找到一种方法来组合它。这种改变全局环境中的值的全部目的,因为您可能希望保持尽可能低的内存使用。

snow您可以深入了解to -kind of- get this to work的低级功能。请参见以下示例:

#Some data
Data <- data.frame(
  cell = 1:10,
  value = sample(c(100,NA),10,TRUE),
  x = 1:10,
  y = 1:10
)
# A sample function
sample.func <- function(){
    id <- which(is.na(Data$value)) # get the NA values

    # this splits up the values from the dataframe in a list
    # which will be passed to clusterApply later on.
    parts <- lapply(clusterSplit(cl,id),function(i)Data[i,c("x","y")])

    # Here happens the magic
    Data$value[id] <<-
    unlist(clusterApply(cl,parts,function(x){
        x$x+x$y
      }
    ))
}
#now we run it
require(snow)
cl <- makeCluster(c("localhost","localhost"), type = "SOCK")
sample.func()
stopCluster(cl)
> Data
   cell value  x  y
1     1   100  1  1
2     2   100  2  2
3     3     6  3  3
4     4     8  4  4
5     5    10  5  5
6     6    12  6  6
7     7   100  7  7
8     8   100  8  8
9     9    18  9  9
10   10    20 10 10

您仍然必须复制(部分)数据才能将其发送到核心。但是,当您在数据帧上调用snowfall高级函数时,无论如何都会发生这种情况,因为无论如何都会snowfall使用低级函数snow

另外,不要忘记,如果您更改数据帧中的一个值,整个数据帧也会被复制到内存中。因此,当它们从集群返回时,您不会通过将值一一添加来赢得那么多。您可能想尝试一些不同的方法并进行一些内存分析。

于 2011-06-06T12:54:44.803 回答
4

工人咨询值的中央存储模式在 CRAN 上的rredis包中实现。这个想法是 Redis 服务器维护一个键值对存储(您的全局数据框,重新实现)。工作人员查询服务器以查看该值是否已计算(redisGet),如果没有计算并存储(redisSet),以便其他工作人员可以重新使用它。Worker 可以是 R 脚本,因此很容易扩大劳动力。这是一个非常好的替代并行范例。这是一个使用“记忆”每个结果的概念的示例。我们有一个很慢的函数(休眠一秒钟)

fun <- function(x) { Sys.sleep(1); x }

我们编写了一个'memoizer',它返回一个变体fun,首先检查是否x已经计算了 for 的值,如果是,则使用它

memoize <-
    function(FUN)
{
    force(FUN) # circumvent lazy evaluation
    require(rredis)
    redisConnect()
    function(x)
    {
        key <- as.character(x)
        val <- redisGet(key)
        if (is.null(val)) {
            val <- FUN(x)
            redisSet(key, val)
        }
        val
    }
}

然后我们记住我们的函数

funmem <- memoize(fun)

> system.time(res <- funmem(10)); res
   user  system elapsed 
  0.003   0.000   1.082 
[1] 10
> system.time(res <- funmem(10)); res
   user  system elapsed 
  0.001   0.001   0.040 
[1] 10

这确实需要在 R 之外运行但非常易于安装的 redis 服务器;请参阅 rredis 软件包附带的文档。

R 内并行版本可能是

library(snow)
cl <- makeCluster(c("localhost","localhost"), type = "SOCK")
clusterEvalQ(cl, { require(rredis); redisConnect() })
tasks <- sample(1:5, 100, TRUE)
system.time(res <- parSapply(cl, tasks, funmem))
于 2011-06-06T16:49:41.493 回答
1

我同意 Joris 的观点,即您需要将数据复制到其他内核。从积极的方面来说,您不必担心NA' 是否在数据中,在核心中。如果您的原件data.frame被称为cont.values

nnaidx<-is.na(cont.values$value) #where is missing data originally
dfrnna<-cont.values[nnaidx,] #subset for copying to other cores
calcValForDfrRow<-function(dfrRow){return(dfrRow$x+dfrRow$y)}#or whatever pleases you
sfExport(dfrnna, calcValForDfrRow) #export what is needed to other cores
cont.values$value[nnaidx]<-sfSapply(seq(dim(dfrnna)[1]), function(i){calcValForDfrRow(dfrnna[i,])}) #sfSapply handles 'reordering', so works exactly as if you had called sapply

应该可以很好地工作(除非错别字)

于 2011-06-06T13:41:15.390 回答