1
我上运行的火花卡夫卡一个流读取器火花 - 卡夫卡流异常 - 对象不是serializableConsumerRecord
以下是依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
当一些数据说“嗨---- 3”生产卡夫卡的话题,收到以下异常(我可以看到,虽然在异常数据) -
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = q_metrics, partition = 0, offset = 26, CreateTime = 1480588636828, checksum = 3939660770, serialized key size = -1, serialized value size = 9, key = null, value = "Hi----3"))
我没有做对任何RDD计算(因为这也抛出同样的异常)。即使stream.print()也抛出异常
以下是代码
import org.apache.spark.streaming._
import org.apache.spark.SparkContext
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.Subscribe
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.rdd.RDD
class Metrics {
def readKafka() {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topics = Array("q_metrics")
val sc = new SparkContext("local[4]", "ScalaKafkaConsumer")
val streamingContext = new StreamingContext(sc, Seconds(10))
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
stream.print()
streamingContext.start
streamingContext.awaitTermination()
}
def rddReader(rdd: Array[String]) = {
}
}
object MetricsReader {
def main(args: Array[String]): Unit = {
val objMetrics = new Metrics()
objMetrics.readKafka()
}
}
得到任何帮助。
感谢
您能接收卡夫卡消费控制台上的消息? – user4342532
没有。我将该消息看作是该例外的一部分。 – Raaghu
我认为你需要添加一些罐子到你的kafka/lib位置,如metrics-core和kafka-clients(如果不存在)。 – user4342532