2016-12-01 139 views
1

我上运行的火花卡夫卡一个流读取器火花 - 卡夫卡流异常 - 对象不是serializableConsumerRecord

以下是依赖

<dependencies> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.0.1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.0.1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
     <version>2.0.1</version> 
    </dependency> 
</dependencies> 

当一些数据说“嗨---- 3”生产卡夫卡的话题,收到以下异常(我可以看到,虽然在异常数据) -

Serialization stack: 
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = q_metrics, partition = 0, offset = 26, CreateTime = 1480588636828, checksum = 3939660770, serialized key size = -1, serialized value size = 9, key = null, value = "Hi----3")) 

我没有做对任何RDD计算(因为这也抛出同样的异常)。即使stream.print()也抛出异常

以下是代码

import org.apache.spark.streaming._ 
import org.apache.spark.SparkContext 
import org.apache.spark.streaming.kafka010._ 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010.Subscribe 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.rdd.RDD 

class Metrics { 

    def readKafka() { 
    val kafkaParams = Map[String, Object](
     "bootstrap.servers" -> "localhost:9092", 
     "key.deserializer" -> classOf[StringDeserializer], 
     "value.deserializer" -> classOf[StringDeserializer], 
     "group.id" -> "use_a_separate_group_id_for_each_stream", 
     "auto.offset.reset" -> "latest", 
     "enable.auto.commit" -> (false: java.lang.Boolean)) 

    val topics = Array("q_metrics") 
    val sc = new SparkContext("local[4]", "ScalaKafkaConsumer") 
    val streamingContext = new StreamingContext(sc, Seconds(10)) 

    val stream = KafkaUtils.createDirectStream[String, String](
          streamingContext, 
          PreferConsistent, 
          Subscribe[String, String](topics, kafkaParams)) 

    stream.print() 


    streamingContext.start 

    streamingContext.awaitTermination() 

    } 

    def rddReader(rdd: Array[String]) = { 

    } 
} 

object MetricsReader { 
    def main(args: Array[String]): Unit = { 
    val objMetrics = new Metrics() 
    objMetrics.readKafka() 
    } 
} 

得到任何帮助。

感谢

+0

您能接收卡夫卡消费控制台上的消息? – user4342532

+0

没有。我将该消息看作是该例外的一部分。 – Raaghu

+0

我认为你需要添加一些罐子到你的kafka/lib位置,如metrics-core和kafka-clients(如果不存在)。 – user4342532

回答

4

发现的问题,我们无法直接打印为“打印”调用ConsumerRecord。所以我用地图,让记录,收集键值,然后打印

stream.foreachRDD { rdd => 
         val collected = rdd.map(record => (record.key(), record.value())).collect() 
         for (c <- collected) { 
          println(c) 
         } 
        }