我在Storm中实现了一个接收来自RabbitMQ喷口消息(https://github.com/ppat/storm-rabbitmq)的消息。Apache Storm加入模式 - 至少一次
我必须在Storm中处理的每个事件都会作为来自Rabbit的两条消息到达,因此我在bolt上有一个fieldsGrouping,以便两个消息到达同一个螺栓。
我的第一个我想办法:
- 收到第一个元组,并在内存中保存的信息
- 确认的第一个元组
- 当第二元组到达从存储器中取第一,并发出新的记录从喷口锚定到第二个。
这个工作,但我可以放松消息,如果一名工人死亡,因为我会确认第一个元组,然后才得到第二个和处理。
我改变这对:
- 接收所述第一元组并将其保存在存储器
- 当第二元组到达从存储器中取出的第一,发射锚固到两个输入元组一个新的元组和ACK两个输入元组。
内存中高速缓存是一个Guava高速缓存,有时间到期,当Tuple因超时而被驱逐时,我将在拓扑中失败(),以便它在后面进行重新处理。
这似乎工作,但是当我做了一些测试后,我得到了一个系统停止从Rabbit Queue获取消息的情况。
队列上的预取设置为5,并且在7处用setMaxSpoutPending喷出。在Rabbit接口中,我看到5个Unacked消息。
在风暴日志中,我看到相同的元组一次又一次从缓存中被逐出。
据我所知,问题是壶嘴只能取出5个消息,这些消息都是一对的第一部分。我可以增加预取,但不保证这不会在生产中发生。
所以我的问题是:如何在Storm中处理这些问题时实现联接?