我不确定这个论坛是否正确。 Storm使用Storm KafkaSpout连接器从Kafka话题中消费。它一直工作到现在。现在我们应该连接到一个新的Kafka集群,其升级版本为0.10.x
,其版本号为0.10.x
。使用Storm 0.10.x(KafkaSpout)从卡夫卡0.10.x主题中消费
从风暴文档(http://storm.apache.org/releases/1.1.0/storm-kafka-client.html)我可以看到,风暴1.1.0
与卡夫卡0.10.x
起支持新的卡夫卡消费者API兼容。但在那种情况下,我将无法在我的最后运行拓扑(如果我错了,请纠正我)。
有没有解决这个问题的方法? 我已经看到,即使新的Kafka Consumer API已经删除了ZooKeeper的依赖关系,但我们仍然可以使用旧的Kafka-console-consumer.sh
通过传递--zookeeper
标志而不是新的–bootstrap-server
标志(推荐)来使用它的消息。我使用卡夫卡0.9运行这个命令并能够从托管在卡夫卡的话题消耗0.10.x
当我们试图连接,提示以下异常:
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /brokers/topics/mytopic/partitions
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[stormjar.jar:?]
at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) ~[stormjar.jar:?]
但我们都能够连接到远程ZK服务器并验证该路径存在:
./zkCli.sh -server remoteZKServer:2181
[zk: remoteZKServer:2181(CONNECTED) 5] ls /brokers/topics/mytopic/partitions
[3, 2, 1, 0]
正如我们上面所看到的,它给我们预期的输出,因为主题中有4个分区。
此时有以下问题:
1)它是在所有可能的连接到卡夫卡0.10.x
使用暴风版0.10.x
?有人试过这个吗?
2)即使我们能够使用,我们是否需要进行任何代码更改以便在拓扑关闭/重新启动的情况下检索消息偏移量。我这样问,因为我们将传递Zk群集细节,而不是旧KafkaSpout版本支持的代理信息。
暗战选择这里,任何指针将高度赞赏
UPDATE:
我们能够连接,当运行在本地使用eclipse从远程卡夫卡话题消耗。为了确保Storm不使用内存中的zk,我们使用了重载的构造函数LocalCluster("zkServer",port)
,它工作正常,我们可以看到数据来了。这导致我们得出结论,版本兼容性可能不是这里的问题。
但是,在集群中部署拓扑时仍然没有运气。 我们已验证从风暴箱到zkservers的连通性 该znode似乎也很好..
此时真的需要一些指针,这可能有什么问题,我们该如何调试?从来没有使用卡夫卡0.10x之前,所以不知道我们错过了什么。
真的很感谢一些帮助和建议