0

我在Spark 2 CDH 5.9中使用Kafka客户端0.8运行流作业。简单的目标是将信息保存在Impala中,并通过记录进行记录。由于InvalidClassException,Spark Kafka Streaming作业失败

我无法摆脱这种错误的,因为我不知道从哪里它来自何处:

16/12/14 08:43:28 ERROR scheduler.JobScheduler: Error running job streaming 
job 1481726608000 ms.0 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0 
(TID 132, datanode1, executor 1): 
java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; 
local class incompatible: stream classdesc serialVersionUID = 1, 
local class serialVersionUID = 2 

直接卡夫卡流用

val streamingContext = new StreamingContext(spark.sparkContext, Seconds(2)) 
val kafkaParams = Map[String, String](
    "bootstrap.servers" -> "datanode1:9092,datanode2:9092,datanode3:9092", 
    "group.id" -> "myconsumergroup", 
    "auto.offset.reset" -> "largest") 
val topics:Set[String] = Set("kafkatest") 
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, topics) 

简单地创建和处理通过:

val deviceMap = spark.read.parquet("/user/admin/temp/joinData.parquet").cache() 

directKafkaStream.foreachRDD { rdd => 
    val avgData = spark.read.schema(jsonDatastruct).json(rdd.map(i => i._2)).select("data.*").as[JsonInfo] 

    val deviceEnriched = avgData.join(deviceMap,Seq("COMMON_KEY"),"left") 

    deviceEnriched.show(false) 
    spark.sql("use my_database") 
     deviceEnriched.write.mode("append").saveAsTable("tbl_persisted_kafka_stream") 
} 

streamingContext.start() 
streamingContext.awaitTermination() 

回答

2

简短回答:消息被序列化的版本commons-lang3 JAR即与您使用Spark的JAR不兼容

龙回答:如果你刚刚说谷歌搜索的错误消息,则检索Apache的共享源代码,你会发现...

  • this post是挖掘到Java“类不兼容的”序列化的问题,在一般
  • FastDateFormat陈述的源代码serialVersionUID = 1LV3.1直到但V3.2切换到serialVersionUID = 2L(因为二进制结构当时改变)

顺便说一句,我只是检查和鼎晖5.9附带commons-lang3V3.1(对蜂房,黑斑羚,哨兵,蜂房式,Oozie的,Sqoop功能于Oozie的)和V3.3.2(为Spark -in-Oozie)和V3.4(对于Sqoop),而Spark本身根本不需要它。去搞清楚。
而且由于CDH尚未附带Spark 2,我猜你或者下载了“beta”包裹或Apache版本 - 并且我检查了Apache版本(V2.0.2)随附commons-lang3V3.3.2

我的2美分:只需在您的Spark 2命令行中强制--jars /opt/cloudera/parcels/CDH/jars/commons-lang3-3.1.jar,并查看这是否足以解决您的问题。

编辑  可加2分钱,请确保您的“自定义” JAR得到优先于任何JAR已经在纱线类路径,与--conf spark.yarn.user.classpath.first=true

+0

感谢参孙。这解决了这个问题:)顺便说一下,Spark 2是本周从Cloudera发布的GA,并且带有** V3.3.2 **。正如你所说的:去图。我的根本问题是我无法弄清楚哪个对象正在被序列化,从哪里到哪里,但是按照您指出的方式强制v3.1解决了问题。 –

+0

...一会儿。异常又回来了,不管是包含** V3.1 **还是** V3.3.2 **,这个异常总是相同的,并且在同一个节点中(我在三个节点上运行这个异常)。所以我认为它可能与Spark有关,但与我的工作无关?任何其他想法? –

+0

停止该节点解决了这个问题,所以我猜这个节点有一个陈旧的配置。有什么方法可以刷新它?因为它是一个VM,所以我试图从零开始创建它 –

相关问题