0

代码(通过火花流运行)设立消费,创建后的道具对象卡夫卡消费者无法从卡夫卡的主题订阅

val consumer = new KafkaConsumer[String, String](props) 
consumer.subscribe(util.Arrays.asList(topic)) 

代码拥有自营如下

package main.scala 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.kafka.KafkaUtils 
import org.apache.kafka.clients.consumer.KafkaConsumer 
import java.util 
import java.util.Properties 
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} 
import java.io.IOException 

我通过创建一个SBT组装罐子

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % "provided" 
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0" % "provided" 
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0" libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.10.0-kafka-2.1.1" 

u能请建议什么,我在这里缺少

错误消息 -

用户类抛出异常:java.lang.NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/UTIL /收藏;)V

+0

很难说没有看到Scaladoc,但为什么你不能只做'List(topic)'?是什么让你使用Java'List'? –

+0

你好Abhijit,谢谢你的回复。就像我如何打印,我只需要看看数据是否来临。做你可以在这里使用的代码片段 –

+0

你不回答与另一个问题的问题。 –

回答

-2

subscribe取输入类型java.util.Collections而不是java.util.Arrays.asList

尝试

consumer.subscribe(java.util.Arrays.asList("topic")) 

它应该工作...

+0

没有hitesh,它没有帮助。我仍然面临这个问题。 –

+0

可以添加“org.apache.kafka”%“kafka-clients”%“0.10.0.0”dependency – hitesh

+0

'List'是一个'Collection'。继承101。 –

0

我曾与星火2.2.0和卡夫卡0.10.0 问题是因为在不同的spark2-卡夫卡默认版本同样的问题提交(也spark2壳)

我发现决定here

1. Before spark2-submit you have to export kafka version 
$ export SPARK_KAFKA_VERSION=0.10 
$ spark2-submit ...