2017-06-26 52 views
1

我需要基于来自Kafka的处理数据,使用GraphX构建一个图。然而,似乎sc.parallelize()引发错误java.io.NotSerializableException: org.apache.spark.SparkContext为什么DStream.foreachRDD使用java.io.NotSerializableException失败:org.apache.spark.SparkContext?

...... 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet) 
val lines = messages.map(_._2) 

lines.foreachRDD(rdd => { 
    rdd.foreachPartition(partition => { 
    ...... 
    // Build a graph 
    val vertRDD = sc.parallelize(vertices) 
    val edgeRDD = sc.parallelize(edge) 
    val graph = Graph(vertRDD, edgeRDD, defaultUser) 
    } 
    }) 
}) 

以何种方式,我应该解决什么问题?

+1

您不能在RDD内部创建RDD。在这个网站上已经问过很多次了。 – eliasah

+1

你正在尝试在'RDD' 里面存储你的'RDD',这是不可能的 - 所有'RDD's必须在驱动程序中才能提供更完整的代码示例? – lev

+0

谢谢。我已经移动了'foreachRDD'外的'sc.parallelize()',并解决了这个问题。 – bila

回答

1

foreachRDD Spark Streaming中的运算符在驱动程序的每个批处理间隔运行处理RDD,然后使用它(通过其RDD)编写代码,最终将代码转换为Spark作业。

foreachRDD(foreachFunc:(RDD [T])⇒单位):单位在此DSTREAM应用一个函数应用于每个RDD。这是一个输出运算符,因此'this'DStream将被注册为输出流并因此被物化。

RDD.foreachPartition是一个只发生在执行者身上的动作。

foreachPartition(F:(迭代[T])⇒单位):单位应用一个函数f本RDD的每个分区。

在任务可用于执行程序上执行之前,它必须被序列化,并且因为SparkContext不是可序列化的,因此是例外。这就是Spark如何确保SparkContext(如sc)由于Spark中的设计决定而永不出现。无论如何,因为整个调度基础设施都在驱动程序上,所以没有任何意义。

SparkContextRDD只适用于驱动程序。它们只是描述分布式计算的一种方式,最终将被“翻译”为在Spark执行程序上运行的任务。

这就是为什么你看到错误消息:

java.io.NotSerializableException:org.apache.spark.SparkContext

顺便说一句,我回答了类似的问题今日(见Can SparkContext.textFile be used with a custom receiver?)等等它看起来像今天是SparkContext日:)