2013-09-23 38 views
0

我正在向卡夫卡生成多个主题。我想从卡夫卡检索所有主题,并且我有不同的喷口和螺栓。并且我想将每个主题发送到相应的喷口和相关联的螺栓(例如,对于topic1,我有相应的spout1和bolt1,对于topic2我有相应的spout2和bolt2等..)
我该怎么做?从卡夫卡检索主题并发送到相应的喷口和螺栓

回答

0

尽管我没有完全明白你想要做什么(你是否有单独的拓扑为每个主题运行?)通常你可以做的是,在你的spout1中创建一个消费者,订阅topic1并发射只要收到一个值,它就会立即生效。然后将输出链接到相应的螺栓以进一步执行。

但据我了解,你应该看看在github storm-contrib项目下的KafkaSpout实现。它基本上是从Kafka集群中读取的喷口实现,并且您需要的只是正确地创建配置。

从文档就基本这个样子

SpoutConfig spoutConfig = new SpoutConfig(
       ImmutableList.of("kafkahost1", "kafkahost2"), // list of Kafka brokers 
       8, // number of partitions per host 
       "clicks", // topic to read from 
       "/kafkastorm", // the root path in Zookeeper for the spout to store the consumer offsets 
       "discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper 
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 

这里有一点要提的是上面的实现使用卡夫卡0.7这样的情况下,您正在使用最新的工作(0.8,你应该)实现可以找到0.8支持here