2017-05-25 131 views
0

我不确定这个论坛是否正确。 Storm使用Storm KafkaSpout连接器从Kafka话题中消费。它一直工作到现在。现在我们应该连接到一个新的Kafka集群,其升级版本为0.10.x,其版本号为0.10.x使用Storm 0.10.x(KafkaSpout)从卡夫卡0.10.x主题中消费

从风暴文档(http://storm.apache.org/releases/1.1.0/storm-kafka-client.html)我可以看到,风暴1.1.0与卡夫卡0.10.x起支持新的卡夫卡消费者API兼容。但在那种情况下,我将无法在我的最后运行拓扑(如果我错了,请纠正我)。

有没有解决这个问题的方法? 我已经看到,即使新的Kafka Consumer API已经删除了ZooKeeper的依赖关系,但我们仍然可以使用旧的Kafka-console-consumer.sh通过传递--zookeeper标志而不是新的–bootstrap-server标志(推荐)来使用它的消息。我使用卡夫卡0.9运行这个命令并能够从托管在卡夫卡的话题消耗0.10.x

当我们试图连接,提示以下异常:

java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /brokers/topics/mytopic/partitions 
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[stormjar.jar:?] 
at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) ~[stormjar.jar:?] 

但我们都能够连接到远程ZK服务器并验证该路径存在:

 ./zkCli.sh -server remoteZKServer:2181 

     [zk: remoteZKServer:2181(CONNECTED) 5] ls /brokers/topics/mytopic/partitions 
     [3, 2, 1, 0] 

正如我们上面所看到的,它给我们预期的输出,因为主题中有4个分区。

此时有以下问题:

1)它是在所有可能的连接到卡夫卡0.10.x使用暴风版0.10.x?有人试过这个吗?

2)即使我们能够使用,我们是否需要进行任何代码更改以便在拓扑关闭/重新启动的情况下检索消息偏移量。我这样问,因为我们将传递Zk群集细节,而不是旧KafkaSpout版本支持的代理信息。

暗战选择这里,任何指针将高度赞赏

UPDATE:
我们能够连接,当运行在本地使用eclipse从远程卡夫卡话题消耗。为了确保Storm不使用内存中的zk,我们使用了重载的构造函数LocalCluster("zkServer",port),它工作正常,我们可以看到数据来了。这导致我们得出结论,版本兼容性可能不是这里的问题。

但是,在集群中部署拓扑时仍然没有运气。 我们已验证从风暴箱到zkservers的连通性 该znode似乎也很好..

此时真的需要一些指针,这可能有什么问题,我们该如何调试?从来没有使用卡夫卡0.10x之前,所以不知道我们错过了什么。

真的很感谢一些帮助和建议

回答

0

风暴0.10x与卡夫卡0.10x兼容。我们仍然可以使用旧的KafkaSpout,这取决于基于zookeeper的偏移量存储机制。

连接丢失异常即将到来,因为我们试图到达不允许/接受来自我们端的连接的远程Kafka集群。我们需要打开特定的防火墙端口,以便建立连接。看起来,虽然运行拓扑结构是集群模式,但所有管理节点都应该能够与动物园管理员通话,所以防火墙应该对每个节点开放。