2017-04-24 101 views
1

我试图通过集群上的multidplyr::do()来学习运行定制功能。考虑这个简单的自包含示例。例如的缘故,我想我的自定义功能myWxTest适用于每个common_dest(目的地有超过50个航班)在flight数据集:multidplyr:试用定制功能

library(dplyr) 
library(multidplyr) 
library(nycflights13) 
library(quantreg) 

myWxTest <- function(x){ 
    stopifnot(!is.null(x$dep_time)) 
    stopifnot(!is.null(x$dep_delay)) 
    stopifnot(!is.null(x$sched_dep_time)) 
    stopifnot(!is.null(x$sched_arr_time)) 
    stopifnot(!is.null(x$arr_time)) 

    out_mat <- c('(Intercept)' = NA, dep_time = NA, dep_delay = NA, sched_dep_time = NA, sched_arr_time = NA) 
    if(length(x$arr_time)>5){ 
     model_1 <- quantreg::rq(arr_time ~ dep_time + dep_delay + sched_dep_time + sched_arr_time, data = x, tau = .5) 
     out_mat[names(coef(model_1))] <- coef(model_1) 
    } 
    return(out_mat) 
} 

common_dest <- flights %>% 
    count(dest) %>% 
    filter(n >= 365) %>% 
    semi_join(flights, .) %>% 
    mutate(yday = lubridate::yday(ISOdate(year, month, day))) 


cluster <- create_cluster(2) 
set_default_cluster(cluster) 
by_dest <- common_dest %>% 
      partition(dest, cluster = cluster) 
cluster_library(by_dest, "quantreg") 

到目前为止好(但我只是再现例子从小插曲)。现在,我有我的自定义函数发送到每个节点:

cluster %>% cluster_call(myWxTest) 

,但我得到:

Error in checkForRemoteErrors(lapply(cl, recvResult)) : 
    2 nodes produced errors; first error: argument "x" is missing, with no default 

最后,我想申请myWxTest每一个亚组:

models <- by_dest %>% 
      do(myWxTest(.)) 
+1

您需要将文件复制到每个节点用'cluster_copy(by_dest,myWxTest)','不cluster_call',这是运行任意每个群集上的代码。它仍然失败,抱怨某些东西是矢量而不是数据框架,但这是一个开始。 – alistaire

+0

已解决的问题(将返回类型更改为'return(data.frame(out_mat))')。 ! – user189035

回答

1

我让它运行一些调整:

library(dplyr) 
library(multidplyr) 
library(nycflights13) 
library(quantreg) 

myWxTest <- function(x){ 
    stopifnot(!is.null(x$dep_time)) 
    stopifnot(!is.null(x$dep_delay)) 
    stopifnot(!is.null(x$sched_dep_time)) 
    stopifnot(!is.null(x$sched_arr_time)) 
    stopifnot(!is.null(x$arr_time)) 

    out_mat <- c('(Intercept)' = NA, dep_time = NA, dep_delay = NA, sched_dep_time = NA, sched_arr_time = NA) 
    if(length(x$arr_time)>5){ 
     model_1 <- quantreg::rq(arr_time ~ dep_time + dep_delay + sched_dep_time + sched_arr_time, data = x, tau = .5) 
     out_mat[names(coef(model_1))] <- coef(model_1) 
    } 
    return(as.data.frame(out_mat, stringsAsFactors = FALSE)) # change result to data.frame, not matrix 
} 

common_dest <- flights %>% 
    count(dest) %>% 
    filter(n >= 365) %>% 
    semi_join(flights, .) %>% 
    mutate(yday = lubridate::yday(ISOdate(year, month, day))) 

by_dest <- common_dest %>% partition(dest) 

cluster_library(by_dest, "quantreg") 
cluster_copy(by_dest, myWxTest) # copy function to each node 

models <- by_dest %>% do(myWxTest(.)) %>% collect() # collect data from clusters 

...返回本地data.frame:

models 
#> Source: local data frame [390 x 2] 
#> Groups: dest [78] 
#> 
#>  dest  out_mat 
#> <chr>  <dbl> 
#> 1 CAK 156.5248953 
#> 2 CAK 0.9904261 
#> 3 CAK -0.0767928 
#> 4 CAK -0.3523211 
#> 5 CAK 0.3220386 
#> 6 DCA 74.5959035 
#> 7 DCA 0.2751917 
#> 8 DCA 1.0712483 
#> 9 DCA 0.2874165 
#> 10 DCA 0.4344960 
#> # ... with 380 more rows