2017-01-02 94 views
1

我需要从使用Scala和Spark的远程Kafka队列主题中消费消息。默认情况下,远程计算机上的Kafka端口设置为7072,而不是9092。此外,远程机器上有安装以下版本:无法将代理列表参数从Scala传递到Kafka:属性bootstrap.servers无效

  1. 卡夫卡0.10.1.0
  2. 斯卡拉2.11

这意味着我应该通过经纪人列表(与该端口7072)从斯卡拉远程卡夫卡,因为否则它会尝试使用默认端口。

问题是根据日志,参数bootstrap.servers无法被远程机器识别。我还尝试将此参数重命名为metadata.broker.list,broker.listlisteners,但始终在日志中出现相同的错误Property bootstrap.servers is not valid,然后默认使用端口9092(并且显然不会消耗这些消息)。

在POM文件我用卡夫卡以下依赖和星火:

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming-kafka_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

所以,我用Scala的2.10,而不是2.11。

这是我的Scala代码(它工作绝对没问题,如果我使用安装在亚马逊云我自己卡夫卡在那里我有EMR机(那里我已经用了卡夫卡的港口9092)):

val testTopicMap = testTopic.split(",").map((_, kafkaNumThreads.toInt)).toMap 

    val kafkaParams = Map[String, String](
     "broker.list" -> "XXX.XX.XXX.XX:7072", 
     "zookeeper.connect" -> "XXX.XX.XXX.XX:2181", 
     "group.id" -> "test", 
     "zookeeper.connection.timeout.ms" -> "10000", 
     "auto.offset.reset" -> "smallest") 

    val testEvents: DStream[String] = 
     KafkaUtils 
     .createStream[String, String, StringDecoder, StringDecoder](
     ssc, 
     kafkaParams, 
     testTopicMap, 
     StorageLevel.MEMORY_AND_DISK_SER_2 
    ).map(_._2) 

我正在阅读this Documentation,但看起来我所做的一切都是正确的。我应该使用其他一些Kafka客户端API(其他Maven依赖项)吗?

更新#1:

我也试过直接流(无动物园管理员),但它运行我进了错误:

val testTopicMap = testTopic.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> "XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072","bootstrap.servers" -> "XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072", 
             "auto.offset.reset" -> "smallest") 
val testEvents = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, testTopicMap).map(_._2) 

testEvents.print() 

17/01/02 12:23:15 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. 
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. 
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. 

更新#2:

我发现了这个相关的话题。建议的解决方案说Fixed it by setting the property 'advertised.host.name' as instructed by the comments in the kafka configuration (config/server.properties)。我是否正确理解config/server.properties应该在安装有Kafka的远程机器上更改

Kafka : How to connect kafka-console-consumer to fetch remote broker topic content?

回答

0

我觉得我碰到了同样的问题,最近(EOFException类),原因是卡夫卡的版本不匹配。

如果我看这里https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10/1.6.2 kafka streaming版本的编译时间依赖性是0.8而您使用0.10。

就我所知0.9已经不兼容0.8。你可以尝试设置一个本地的0.8或0.9代理并尝试连接?

+0

为了进行测试,我在本地安装了Kafka 0.10.1.0-2.11,然后在config/server中更改了参数'listeners'中的端口。属性“到”7072“,最后我用问题中提到的POM执行我的代码。我能够没有任何问题地获得消息。所以,我放弃了版本的兼容性问题。为了确保我能够很好地解释我自己:我在AWS EMR群集上运行消费者代码,而Kafka群集是AWS之外的另一台机器。 – Dinosaurius

+0

我在想这与'security.protocol'有关。例如,如果远程Kafka集群使用'SSL'协议,那么显然我的连接将失败。在这种情况下,我唯一会误解的是为什么我能够使用'curl'和Kafka-Rest-API(http://docs.confluent.io/2.0.0/kafka-rest/docs/api)来检索邮件。 HTML#消费者)从终端(没有斯卡拉)。也许它与远程集群中Confluent API的设置有关(即它不使用SSL)。 – Dinosaurius