我正在向卡夫卡生成多个主题。我想从卡夫卡检索所有主题,并且我有不同的喷口和螺栓。并且我想将每个主题发送到相应的喷口和相关联的螺栓(例如,对于topic1,我有相应的spout1和bolt1,对于topic2我有相应的spout2和bolt2等..)
我该怎么做?从卡夫卡检索主题并发送到相应的喷口和螺栓
0
A
回答
0
尽管我没有完全明白你想要做什么(你是否有单独的拓扑为每个主题运行?)通常你可以做的是,在你的spout1中创建一个消费者,订阅topic1并发射只要收到一个值,它就会立即生效。然后将输出链接到相应的螺栓以进一步执行。
但据我了解,你应该看看在github storm-contrib项目下的KafkaSpout实现。它基本上是从Kafka集群中读取的喷口实现,并且您需要的只是正确地创建配置。
从文档就基本这个样子
SpoutConfig spoutConfig = new SpoutConfig(
ImmutableList.of("kafkahost1", "kafkahost2"), // list of Kafka brokers
8, // number of partitions per host
"clicks", // topic to read from
"/kafkastorm", // the root path in Zookeeper for the spout to store the consumer offsets
"discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
这里有一点要提的是上面的实现使用卡夫卡0.7这样的情况下,您正在使用最新的工作(0.8,你应该)实现可以找到0.8支持here
3
相关问题
- 1. Storm-路由螺栓来从卡夫卡模式壶口
- 2. 卡夫卡喷口集成
- 3. 监测卡夫卡喷口
- 4. 卡夫卡python检索主题列表
- 5. 卡夫卡喷口的字段分组
- 6. (卡夫卡)喷口风暴返回的东西清单,不能传播此列表螺栓
- 7. 卡夫卡风暴喷口lein或Mvn
- 8. 从卡夫卡消耗并发送到德鲁伊问题
- 9. 将RDD发送到卡夫卡时发生PYspark错误主题
- 10. Apache Storm:支持卡夫卡喷嘴中的主题通配符
- 11. KafkaProducerException上发送消息到一个主题卡夫卡生产
- 12. 如何检查一个字段是由喷口还是螺栓发送?
- 13. 如何检索卡夫卡消息的特定计数主题
- 14. 重复卡夫卡主题
- 15. 卡夫卡主题websocket
- 16. 关联卡夫卡和动态主题
- 17. 发送自定义Java对象卡夫卡主题
- 18. 卡夫卡喷嘴性能不佳
- 19. 卡夫卡消费者无法从卡夫卡的主题订阅
- 20. 比较不透明或交易卡夫卡喷口
- 21. 风暴 - 卡夫卡喷口慢慢消耗
- 22. 将错误发布到卡夫卡主题
- 23. 如何从卡夫卡的数据发送到火花
- 24. 2喷出一个风暴螺栓
- 25. 如何解决风暴卡夫卡喷口只消费卡夫卡的一半数据?
- 26. 计算卡夫卡主题分区
- 27. 卡夫卡流:多主题分区
- 28. 卡夫卡经纪人与主题
- 29. 卡夫卡主题分区火花流
- 30. 卡夫卡多个主题消费