2015-11-05 148 views
3

(还有一些关于超时和maxSpoutPending的问题)风暴如何知道邮件何时“完全处理”?

我在Storm文档中看到很多关于消息被完全处理的参考文献。但是,我的KafkaSpout如何知道消息何时完全处理?

希望它能认识到我的螺栓连接的方式,所以当我的Stream中的最后一个螺栓夹住一个元组时,喷口知道我的消息何时处理?否则,我会想象在超时时间到期后,检查消息的确认状态,并且如果由确认/锚定XOR指示,则认为它已被处理。但我希望事实并非如此?

我也有关于maxTuplesPending和超时配置的相关问题。

如果我将maxTuplePending设置为10k,那么我是否认为每个spout实例都会继续发射元组,直到spout实例正在跟踪10k元组中的10k个元组,这些元组尚未完全处理?然后当一个正在处理的飞行消息被完全处理时,新的元组被发射出去了吗?

最后,这是否与超时配置有关?喷嘴在发出新消息之前是否以任何方式等待发生配置的超时?或者,如果消息处于停滞/缓慢状态,超时配置才起作用,导致由于超时而失败?

更简洁(或希望更清楚),是否有一个效果来设置我的超时30分钟除非消息不会失败,除非他们在30分钟内被最终的博尔特认可?或者是否还有其他影响,例如影响喷口排放速率的超时配置?

对不起,漫长而漫长的问题。预先感谢任何回应。

*编辑进一步澄清

的原因,这对我来说是一个问题,是因为我的消息并不一定要通过整个流运行。

说我有螺栓A,B,C,D。大多数时间的消息将从A-> B - > - > D传递。但是我有一些信息会故意停在螺栓A上.A会识别它们,但不会发出它们(因为我的业务逻辑,在这种情况下,我希望进一步处理这些信息)。

那么我的KafkaSpout是否知道被Ack发送但未从A发出的消息会被完全处理?因为我希望在螺栓A完成之后马上从喷口发出另一条消息,在这种情况下。

回答

4

Storm通过UDF代码必须使用的锚定机制跟踪整个拓扑中的元组。该锚定导致所谓的元组树(tuple-tree),如果树的根是由喷口发出的元组,则所有其他节点(以树结构连接)代表来自螺栓的发射元组,其使用输入元组作为锚(这只是一个逻辑模型,并没有在Storm中以这种方式实现)。

例如,Spout会发出一个由第一个螺栓分开的句子元组,第二个螺栓将过滤一些单词,第三个螺栓将应用一个单词计数。最后,水槽螺栓将结果写入文件。树是这样的:

"this is an example sentence" -+-> "this" 
           +-> "is" 
           +-> "an" 
           +-> "example" -> "example",1 -> "example",1 
           +-> "sentence" -> "sentence",1 -> "sentence",1 

,头一句是通过口发出,通过bolt1对于那些发出的所有令牌作为锚,并且得到由bolt1获得确认。 Bolt2过滤出“this”,“is”和“an”,并且仅仅包含三个元组。 “例子”和“句子”只是被转发,用作输出元组的锚点并在之后进行搜索。同样的情况发生在bolt2中,并且最后的sink螺栓只是对所有传入的元组进行确认。此外,Storm追踪所有元组的所有元组,即从中间螺栓和下沉螺栓中抽取所有元组。首先,喷口将输出元组的ID发送给acker任务。每次元组被用作定位点时,acker也会收到一个消息,其中包含锚元组标识和输出元组ID(由Storm自动生成)。来自螺栓的ackes也会转到与XORs相同的acker任务。如果所有的ack都被接收到 - 即,对于喷口和所有递归锚定的输出元组 - - (XOR结果将为零),则acker向喷口发送消息,该元组已被完全处理,并且发生后退到Spout.ack(MessageId)即当元组被完全处理时立即完成回呼)。此外,ackers会定期检查,如果有一个由acker注册的元组比超时长。如果发生这种情况,acker将丢弃元组ID,并向元组发送一条消息,指出元组失败(导致调用Spout.fail(MessageId))。

此外,Spouts保留在飞行中的所有元组的计数,并且如果该计数超过maxTuplesPending参数,则停止呼叫Spout.nextTuple()。据我所知,这个参数是全局应用的,也就是说,每个喷口任务的局部计数被总结,并且全局计数与参数进行比较(不知道如何详细实现这个)。因此timeout参数独立于maxTuplesPending

+0

非常感谢您的详细解答。你能解决我在编辑中提到的情况吗?风暴如何知道“所有的贿赂都已收到”。如果我故意在A后插入A,但不要将元组发送到B,那么这个消息是否会超时? – ab11

+0

看看我的例子。元组“this”,“is”和“an”将被一个中间螺栓过滤掉,即只有acked和没有输出。这工作得很好。如果一个句子中的所有单词都会被过滤掉,那么这棵树就不那么深了,但是初始句子对喷口来说会很好。 –

+0

再次感谢。我仍然对Storm如何处理这个问题感到困惑。如果螺栓A调用ack并且不发射,Storm如何知道调用Spout.ack(messageId)?我认为只有当Tuple树中的最后一个螺栓确定了一个元组时,它才会调用Spout.ack(messageId)(它怎么知道在这种情况下,我没有发出这个树,比典型的深)。我问这是因为即使我的拓扑在超时时间内完全处理了它的消息,我也看到了喷口故障,所以我怀疑这些故障来自螺栓A确认但没有发出的消息。 – ab11