2011-06-06 51 views
6

我有一个data.frame的单元格,值和坐标。它驻留在全球环境中。并行运行时写入全局环境

> head(cont.values) 
    cell value x y 
1 11117 NA -34 322 
2 11118 NA -30 322 
3 11119 NA -26 322 
4 11120 NA -22 322 
5 11121 NA -18 322 
6 11122 NA -14 322 

因为我的自定义函数需要近一个第二计算单个细胞(和我有细胞数以万计的计算),我不想重复那些已经有一个值单元格计算。我的以下解决方案试图避免这一点。每个单元格可以独立计算,尖叫并行执行。

我的功能实际上是做的是检查是否有一个指定的单元格号码的值,如果它是NA,它会计算它并插入它的位置NA。

我可以使用申请家庭的功能和apply内,我可以读取和写入cont.values没有问题(这是在全球环境中)运行我的神奇功能(结果是value了相应cell)。

现在,我想并行运行这个(使用snowfall),我无法读取或写入/从这个​​变量从个别核心。

问题:当并行执行一个函数时,什么解决方案可以从worker/core中的全局环境中读取/写入/写入全局环境中的动态变量。有没有更好的方法来做到这一点?

回答

4

中央存储的工人的价值协商的模式在rredis包CRAN上实现。这个想法是,Redis服务器维护一系列键值对(您的全局数据框,重新实现)。工人向服务器查询是否计算了该值(redisGet),如果不计算并存储(redisSet),以便其他工人可以重新使用它。工人可以是R脚本,所以扩展劳动力很容易。这是一个非常好的替代平行范例。以下是一个使用“记忆”每个结果的概念的示例。我们有一个功能,就是慢(休眠第二)

fun <- function(x) { Sys.sleep(1); x } 

我们写一个“memoizer”返回的fun一个变种,首先检查是否为x值已经计算出,如果是这样使用该

memoize <- 
    function(FUN) 
{ 
    force(FUN) # circumvent lazy evaluation 
    require(rredis) 
    redisConnect() 
    function(x) 
    { 
     key <- as.character(x) 
     val <- redisGet(key) 
     if (is.null(val)) { 
      val <- FUN(x) 
      redisSet(key, val) 
     } 
     val 
    } 
} 

然后我们memoize的我们的函数

funmem <- memoize(fun) 

> system.time(res <- funmem(10)); res 
    user system elapsed 
    0.003 0.000 1.082 
[1] 10 
> system.time(res <- funmem(10)); res 
    user system elapsed 
    0.001 0.001 0.040 
[1] 10 

这确实需要在R之外运行的Redis服务器,但安装非常简单;请参阅rredis软件包随附的文档。

A内-R并行版本可能

library(snow) 
cl <- makeCluster(c("localhost","localhost"), type = "SOCK") 
clusterEvalQ(cl, { require(rredis); redisConnect() }) 
tasks <- sample(1:5, 100, TRUE) 
system.time(res <- parSapply(cl, tasks, funmem)) 
+0

我可以实现这一些日子,但我目前官方没有访问POSIX型系统(卡在Windows上),这意味着我可以”还没有运行服务器。 – 2011-06-07 08:53:32

4

这将取决于问题的功能是什么,当然,但恐怕snowfall不会有很大的帮助。事情是,您必须将整个数据帧导出到不同的核心(请参阅?sfExport),并仍然找到合并它的方法。这种做法违背了改变全球环境价值的全部目的,因为您可能希望尽可能降低内存使用量。

您可以深入到snow的低级功能中以获得此功能。请参阅以下示例:

#Some data 
Data <- data.frame(
    cell = 1:10, 
    value = sample(c(100,NA),10,TRUE), 
    x = 1:10, 
    y = 1:10 
) 
# A sample function 
sample.func <- function(){ 
    id <- which(is.na(Data$value)) # get the NA values 

    # this splits up the values from the dataframe in a list 
    # which will be passed to clusterApply later on. 
    parts <- lapply(clusterSplit(cl,id),function(i)Data[i,c("x","y")]) 

    # Here happens the magic 
    Data$value[id] <<- 
    unlist(clusterApply(cl,parts,function(x){ 
     x$x+x$y 
     } 
    )) 
} 
#now we run it 
require(snow) 
cl <- makeCluster(c("localhost","localhost"), type = "SOCK") 
sample.func() 
stopCluster(cl) 
> Data 
    cell value x y 
1  1 100 1 1 
2  2 100 2 2 
3  3  6 3 3 
4  4  8 4 4 
5  5 10 5 5 
6  6 12 6 6 
7  7 100 7 7 
8  8 100 8 8 
9  9 18 9 9 
10 10 20 10 10 

您仍然必须复制(部分)您的数据,以便将其传送到核心。但是,无论如何,当您在数据帧上调用snowfall高级函数时,无论如何,snowfall总是会使用snow的低级函数。

另外,人们不应该忘记,如果您更改数据框中的一个值,则整个数据框也会复制到内存中。所以当他们从群集中回来时,通过逐个添加值,您不会赢得那么多。你可能想尝试一些不同的方法,并做一些内存分析。

1

我同意Joris您需要将您的数据复制到其他内核。从积极的角度来看,您不必担心NA在核心中是否在数据中。 如果你原来data.frame被称为cont.values

nnaidx<-is.na(cont.values$value) #where is missing data originally 
dfrnna<-cont.values[nnaidx,] #subset for copying to other cores 
calcValForDfrRow<-function(dfrRow){return(dfrRow$x+dfrRow$y)}#or whatever pleases you 
sfExport(dfrnna, calcValForDfrRow) #export what is needed to other cores 
cont.values$value[nnaidx]<-sfSapply(seq(dim(dfrnna)[1]), function(i){calcValForDfrRow(dfrnna[i,])}) #sfSapply handles 'reordering', so works exactly as if you had called sapply 

应该很好地工作(不包括错别字)