1
我试图通过集群上的multidplyr::do()
来学习运行定制功能。考虑这个简单的自包含示例。例如的缘故,我想我的自定义功能myWxTest
适用于每个common_dest
(目的地有超过50个航班)在flight
数据集:multidplyr:试用定制功能
library(dplyr)
library(multidplyr)
library(nycflights13)
library(quantreg)
myWxTest <- function(x){
stopifnot(!is.null(x$dep_time))
stopifnot(!is.null(x$dep_delay))
stopifnot(!is.null(x$sched_dep_time))
stopifnot(!is.null(x$sched_arr_time))
stopifnot(!is.null(x$arr_time))
out_mat <- c('(Intercept)' = NA, dep_time = NA, dep_delay = NA, sched_dep_time = NA, sched_arr_time = NA)
if(length(x$arr_time)>5){
model_1 <- quantreg::rq(arr_time ~ dep_time + dep_delay + sched_dep_time + sched_arr_time, data = x, tau = .5)
out_mat[names(coef(model_1))] <- coef(model_1)
}
return(out_mat)
}
common_dest <- flights %>%
count(dest) %>%
filter(n >= 365) %>%
semi_join(flights, .) %>%
mutate(yday = lubridate::yday(ISOdate(year, month, day)))
cluster <- create_cluster(2)
set_default_cluster(cluster)
by_dest <- common_dest %>%
partition(dest, cluster = cluster)
cluster_library(by_dest, "quantreg")
到目前为止好(但我只是再现例子从小插曲)。现在,我有我的自定义函数发送到每个节点:
cluster %>% cluster_call(myWxTest)
,但我得到:
Error in checkForRemoteErrors(lapply(cl, recvResult)) :
2 nodes produced errors; first error: argument "x" is missing, with no default
最后,我想申请myWxTest
每一个亚组:
models <- by_dest %>%
do(myWxTest(.))
您需要将文件复制到每个节点用'cluster_copy(by_dest,myWxTest)','不cluster_call',这是运行任意每个群集上的代码。它仍然失败,抱怨某些东西是矢量而不是数据框架,但这是一个开始。 – alistaire
已解决的问题(将返回类型更改为'return(data.frame(out_mat))')。 ! – user189035