2016-03-08 42 views
1

跟进一些data.table并行(1)(2)(3)我试图弄明白。 这个语法有什么问题?平行data.table - 什么是正确的语法

library(data.table) 
set.seed(1234) 
dt <- data.table(id= factor(sample(1L:10000L, size= 1e6, replace= TRUE)), 
     val= rnorm(n= 1e6), key="id") 

foo <- function(l) sum(l) 

dt2 <- dt[, foo(.SD), by= "id"] 

library(parallel) 
cl <- makeCluster(detectCores()) 
dt3 <- clusterApply(cl, x= parallel:::splitRows(dt, detectCores()), 
      fun=lapply, FUN= function(x,foo) { 
      x[, foo(data.table:::".SD"), by= "id"] 
      }, foo= foo) 
stopCluster(cl) 
# note that library(parallel) is annoying and you often have to do this type ("::", ":::") of exporting to the parallel package 

Error in checkForRemoteErrors(val) : 4 nodes produced errors; first error: incorrect number of dimensions

cl <- makeCluster(detectCores()) 
dt3 <- clusterApply(cl, x= parallel:::splitRows(dt, detectCores()), 
      fun=lapply, FUN= function(x,foo) { 
      x <- data.table::data.table(x) 
      x[, foo(data.table:::".SD"), by= "id"] 
      }, foo= foo) 
stopCluster(cl) 

Error in checkForRemoteErrors(val) : 4 nodes produced errors; first error: object 'id' not found

我的语法玩耍了不少。这两个似乎是我能得到的最接近的。显然有些东西还是不对的。

我真正的问题是类似的结构,但有更多的行,我正在使用24核心/ 48个逻辑处理器的机器。所以看我的电脑使用大约4%的计算能力(仅使用1个内核)真的很烦人

回答

2

您可能想要评估Rserve解决方案的并行性。

请参阅下面的示例使用2 R节点并行本地构建Rserve。它也可以分布在远程实例上。

library(data.table) 
set.seed(1234) 
dt <- data.table(id= factor(sample(1L:10000L, size= 1e6, replace= TRUE)), 
       val= rnorm(n= 1e6), key="id") 
foo <- function(l) sum(l) 

library(big.data.table) 
# start 2 R instances 
library(Rserve) 
port = 6311:6312 
invisible(sapply(port, function(port) Rserve(debug = FALSE, port = port, args = c("--no-save")))) 
# client side 
rscl = rscl.connect(port = port, pkgs = "data.table") # connect and auto require packages 
bdt = as.big.data.table(dt, rscl) # create big.data.table from local data.table and list of connections to R nodes 
rscl.assign(rscl, "foo", foo) # assign `foo` function to nodes 
bdt[, foo(.SD), by="id"][, foo(.SD), by="id"] # first query is run remotely, second locally 
#   id   V1 
# 1:  1 10.328998 
# 2:  2 -8.448441 
# 3:  3 21.475910 
# 4:  4 -5.302411 
# 5:  5 -11.929699 
# ---     
# 9996: 9996 -4.905192 
# 9997: 9997 -4.293194 
# 9998: 9998 -2.387100 
# 9999: 9999 16.530731 
#10000: 10000 -15.390543 

# optionally with special care 
# bdt[, foo(.SD), by= "id", outer.aggregate = TRUE] 

会议信息:

R version 3.2.3 (2015-12-10) 
Platform: x86_64-pc-linux-gnu (64-bit) 
Running under: Ubuntu 14.04.4 LTS 

locale: 
[1] LC_CTYPE=en_GB.UTF-8  LC_NUMERIC=C    LC_TIME=en_GB.UTF-8  LC_COLLATE=en_GB.UTF-8  LC_MONETARY=en_GB.UTF-8 LC_MESSAGES=en_GB.UTF-8 LC_PAPER=en_GB.UTF-8  
[8] LC_NAME=C     LC_ADDRESS=C    LC_TELEPHONE=C    LC_MEASUREMENT=en_GB.UTF-8 LC_IDENTIFICATION=C  

attached base packages: 
[1] stats  graphics grDevices utils  datasets methods base  

other attached packages: 
[1] Rserve_1.8-5   big.data.table_0.3.3 data.table_1.9.7  

loaded via a namespace (and not attached): 
[1] RSclient_0.7-3 tools_3.2.3 
+0

看起来很有趣 - 如何正确一个指定端口? –

+0

@Alex您可以在'Rserve :: Rserve()'调用中指定端口,然后在与'rscl.connect'连接时使用相同的端口。你可以使用单个端口并且有多个单独处理的连接,但我更愿意在自己的端口上隔离每个R节点。 – jangorecki

+0

我的问题与'库(Rserve)'没有特别清楚记录的事实有关。所以,假设我有24个内核。我是否将代码修改为'ports < - 6311:6324'?还有别的吗?我尝试过在'rscl.assign(...)'有一个大的停顿,看起来好像有很多以太网活动,但没有CPU/RAM活动。这是预期的吗? –