2015-12-02 69 views
0

我是新来的风暴,所以请忍受我。(卡夫卡)喷口风暴返回的东西清单,不能传播此列表螺栓

我有一项服务,每当我们与结果列表打电话时,都会向卡夫卡发送一条消息。

有一个KafkaSpout读取每一条消息,消息中包含beforementioned列表。

这只是一个JSON,我可以解开它。现在,这里的问题:

我在杰克逊计划做这个转换操作,但是这个计划可以基本上返回一个Values对象,它不是一个对象列表,但基本上是一个字段值对列表。

另一件事可能是一个博尔特这个值(这只是一个扩展的ArrayList)对象获取并在单一元素解开它,并给他们的每一个到下一个博尔特。这是一个解决方案吗?我可以从一次呼叫发射多个物体到一个螺栓吗?

是否有更智能的解决方案?

回答

2

是的,你可以发出从单一博尔特多个元组。在位于here

public void execute(Tuple tuple) { 
    String sentence = tuple.getString(0); 
    for(String word: sentence.split(" ")) { 
    _collector.emit(tuple, new Values(word)); //emits multiple tuples 
    } 
    _collector.ack(tuple); 
} 

正如可以看到for循环可以同时发射的多个元组的示例螺栓通过执行方法寻找。这样做会创建一个更大的消息树。这可能会导致问题,这取决于您的可靠性保证和数据大小。

根据我的经验,很难/不可能操纵KafkaSpout里面的数据。所以有几个关于你的设置的笔记。

  • 我想要做的第一件事就是改变服务发送给卡夫卡的内容。你可以发送它在单个项目而不是一个大项目。如果你无法改变这一点。
  • 像你刚才提到的那样设置多个Bolts,Spout => UnwrapBolt => ProcessBolt,其中UnwrapBolt将你的一个数据源作为单独的元组发送出去,然后ProcessBolt获取每个单独的元组,然后处理它们,但是你需要。
+0

谢谢morganw09dev。我实际上使用了第一种方法,我只用一条消息减少了消息传递。一个问题:如果我发出每个元组而不是最后一个元组,会发生什么? – dierre

+1

我不知道这是可能的/可取的吗???我的解释一直是,通常使用aple元组来表示你已经完成了为特定的Bolt处理这个元组。所以,当你在同一个Bolt中使用它时,确定一个元组没有什么意义。我不知道这个观点是如何官方的。 – morganw09dev

+0

哦,好的。我会尝试,我会让你知道发生了什么:) – dierre