2015-11-02 92 views
2

我在Storm中实现了一个接收来自RabbitMQ喷口消息(https://github.com/ppat/storm-rabbitmq)的消息。Apache Storm加入模式 - 至少一次

我必须在Storm中处理的每个事件都会作为来自Rabbit的两条消息到达,因此我在bolt上有一个fieldsGrouping,以便两个消息到达同一个螺栓。

我的第一个我想办法:

  1. 收到第一个元组,并在内存中保存的信息
  2. 确认的第一个元组
  3. 当第二元组到达从存储器中取第一,并发出新的记录从喷口锚定到第二个。

这个工作,但我可以放松消息,如果一名工人死亡,因为我会确认第一个元组,然后才得到第二个和处理。

我改变这对:

  1. 接收所述第一元组并将其保存在存储器
  2. 当第二元组到达从存储器中取出的第一,发射锚固到两个输入元组一个新的元组和ACK两个输入元组。

内存中高速缓存是一个Guava高速缓存,有时间到期,当Tuple因超时而被驱逐时,我将在拓扑中失败(),以便它在后面进行重新处理。

这似乎工作,但是当我做了一些测试后,我得到了一个系统停止从Rabbit Queue获取消息的情况。

队列上的预取设置为5,并且在7处用setMaxSpoutPending喷出。在Rabbit接口中,我看到5个Unacked消息。

在风暴日志中,我看到相同的元组一次又一次从缓存中被逐出。

据我所知,问题是壶嘴只能取出5个消息,这些消息都是一对的第一部分。我可以增加预取,但不保证这不会在生产中发生。

所以我的问题是:如何在Storm中处理这些问题时实现联接?

回答

1

风暴并没有提供一个很好的解决方案......什么,你会需要的是一个可靠的存储该缓冲第一元组(即,有状态的操作)。因此,您可以立即确认第一个元组,并在发生故障后恢复状态。

  1. 据我所知,Trident支持一些状态处理。但我从来没有用过它。
  2. 作为第二种选择,您可以使用分布式键值存储(如Casandra)作为缓冲区。当然,这将是一个手写解决方案,即您需要自己编写所有Casandra交互。
  3. 最后但并非最不重要的一点,您可以切换到支持Apache Flink等有状态运算符的流处理系统。(免责声明:我是Flink的提交者)
相关问题