2016-07-25 79 views
5

我有以下简单SparkR方案,该方案是创建一个SparkR DataFrame,并从中检索/收集数据。无法从SparkR检索数据创建数据帧

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn") 
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6") 
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library(SparkR) 
sc <- sparkR.init(master="yarn-client",sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40")) 
hiveContext <- sparkRHive.init(sc) 

n = 1000 
x = data.frame(id = 1:n, val = rnorm(n)) 
xs <- createDataFrame(hiveContext, x) 

xs 

head(xs) 
collect(xs) 

我能够创建它并成功查看信息,但是与获取数据相关的任何操作都会抛出错误。

16/07/25 16时33分59秒WARN TaskSetManager:失去任务0.3在阶段17.0(TID 86,wlos06.nrm.minn.seagate.com):java.net.SocketTimeoutException:接受超时 在java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) at java.net.ServerSocket.implAccept(ServerSocket.java:530) at java.net。的ServerSocket.accept(ServerSocket.java:498) 在org.apache.spark.api.r.RRDD $ .createRWorker(RRDD.scala:432) 在org.apache.spark.api.r.BaseRRDD.compute(RRDD .scala:63) at org.apache.spark.rdd.RDD.computeOrReCheckCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd。 RDD.computeOrReadCheckpoint(RDD.scala:306) 在org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReCheckCheck(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd。 MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReCheckCheck(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在org.apache.spark.scheduler.Task.run(Task.scala:89) 在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615) (java.lang.Thread.run)(Thread.java:745)

16/07/25 16:33:59 ERROR TaskSetManager:阶段17.0中的任务0失败4次;中止工作 16/07/25 16时33分59秒错误RBackendHandler:上org.apache.spark.sql.api.r.SQLUtils dfToCols在invokeJava(isStatic = TRUE,类名,方法名,...)失败 错误: org.apache.spark.SparkException:作业已中止因故障阶段:阶段17.0(TID 86,wlos06.nrm.minn.seagate.com)丢失任务0.3:任务0级17.0失败4次,最近一次失败: java.net.SocketTimeoutException:接受超时 在java.net.PlainSocketImpl.socketAccept(本地方法) 在java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) 在java.net.ServerSocket.implAccept(ServerSocket。 Java的:530) 在java.net.ServerSocket.accept(ServerSocket.java:498) 在org.apache.spark.api.r.RRDD $ .createRWorker(RRDD.scala:432) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReCheckCheck(RDD.scala:306) at org.apache.spark。 rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: 306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache。spark.rdd.MapPartitionsRDD.compute(MapPar

如果我被sparkR命令行类似下面执行它,它得到执行。

~/Downloads/spark-1.6.1-bin-hadoop2.6/bin/sparkR --master yarn-client 

但是当我经由R个执行它,并sparkR。 ?初始化((主=“纱客户”),它抛出错误

是否有人可以帮助解决这些错误

+0

我有同样的问题。你怎么修好它的? –

回答

5

添加此行带来的改变:

Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell") 

下面是完整的代码:

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn") 
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6") 
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library(SparkR) 
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell") 
sc <- sparkR.init(sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40")) 
hiveContext <- sparkRHive.init(sc) 

n = 1000 
x = data.frame(id = 1:n, val = rnorm(n)) 
xs <- createDataFrame(hiveContext, x) 

xs 

head(xs) 
collect(xs)