2017-08-11 61 views
1

我正在测试由我在我的代码中定义的简单字符串分析器函数,但其​​中一个工作器节点在执行时总是失败。这里是我一直在测试假人代码:Spark集群中的RDD映射函数内调用函数

/* JUST A SIMPLE PARSER TO CLEAN PARENTHESIS */ 
def parseString(field: String): String = { 
    val Pattern = "(.*.)".r 
    field match{ 
     case "null" => "null" 
     case Pattern(field) => field.replace('(',' ').replace(')',' ').replace(" ", "") 
    } 
} 

/* CREATE TWO DISTRIBUTED RDDs TO JOIN THEM */ 
val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)), 6) 
val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)), 6) 
val manipulated_emp = emp.keyBy(t => t._3) 
val manipulated_dept = dept.keyBy(t => t._2) 
val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept) 

/* OUTPUT */ 
left_outer_join_data.collect.foreach(println) 
/* 
(30,((3,matt,30),Some((hive,30)))) 
(30,((5,rhonda,30),Some((hive,30)))) 
(20,((2,ricky,20),Some((spark,20)))) 
(10,((1,jordan,10),Some((hadoop,10)))) 
(35,((4,mince,35),None)) 
*/ 

val res = left_outer_join_data 
.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString)) 
.collect 

res 
.map(f => (f._1, f._2, parseString(f._3))) 
.foreach(println) 

/* DESIRED OUTPUT */ 
/* 
(3,matt,hive,30) 
(5,rhonda,hive,30) 
(2,ricky,spark,20) 
(1,jordan,hadoop,10) 
(4,mince,null) 
*/ 

此代码的工作,如果我先收集的资源驱动程序中的结果。由于这是一个测试,所以这样做没有问题,但是我的实际应用会处理数百万行,并且不鼓励在驱动程序中收集结果。所以,如果我做同样的而不收集它首先,就像这样:

val res = left_outer_join_data 
.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString)) 

res 
.map(f => (f._1, f._2, parseString(f._3))) 
.foreach(println) 

我得到如下:

ERROR TaskSetManager: Task 5 in stage 17.0 failed 4 times; aborting job 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 17.0 failed 4 times, most recent failure: Lost task 5.3 in stage 17.0 (TID 166, 192.168.28.101, executor 1): java.lang.NoClassDefFoundError: Could not initialize class tele.com.SimcardMsisdn$ 
     at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249) 
     at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
     at org.apache.spark.scheduler.Task.run(Task.scala:99) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
     at scala.Option.foreach(Option.scala:257) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
     at org.apache.spark.rdd.RDD.foreach(RDD.scala:916) 
     at tele.com.SimcardMsisdn$.main(SimcardMsisdn.scala:249) 
     at tele.com.SimcardMsisdn.main(SimcardMsisdn.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.NoClassDefFoundError: Could not initialize class tele.com.SimcardMsisdn$ 
     at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249) 
     at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
     at org.apache.spark.scheduler.Task.run(Task.scala:99) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

为什么星火未能执行节点上我的解析器?你能否推荐一个解决方案或解决方法?

UPDATE

我找到了解决这个问题(贴在下面),尽管如此,我还是感到困惑这个问题,也许是我做错了什么。

回答

0

嗯,我已经成功通过广播模式变量工人来解决它自己:

val Pattern = sc.broadcast("(.*.)".r) 

,做地图内的匹配,而不是在一个功能模式,并没有收集到驱动程序:

val res = left_outer_join_data.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString)) 
res.map(f => (f._1, f._2, f._3 match { 
     case "null" => "null" 
     case Pattern.value(f._3) => f._3.replace('(',' ').replace(')',' ').replace(" ", "")}) 
    ) 
.foreach(println) 

然后我从工作人员标准输出所需的输出:

(3,matt,hive,30) 
(5,rhonda,hive,30) 
(2,ricky,spark,20) 
(1,jordan,hadoop,10) 
(4,mince,null) 
+1

您还可以在'left_outer_join_data'中使用'case(a,((b,c,d),Some((e,f,g))=>(b,c,...)''。地图'以提高可读性。 :) – philantrovert

+0

确实。谢谢你的提示 ;) – Emiliano