2014-09-04 125 views
0

我试图找到一个问题的解决方案,以保证消息只被一个消费者完全处理。ActiveMQ故障转移重新连接消息回滚

队列上的许多消息和一些消费者读取消息并处理它们写入数据库。我的消息是经过处理的,所以如果消费者死亡,那么消息就会返回到另一个消费者处理的队列中。

我们必须对activemq进行主动/被动配置,这会导致问题。如果我停止激活的activemq,那么当我使用故障转移传输时,使用者将重新连接到另一个activemq。这很好,但在重新连接期间,消息被放回队列中,并且消费者没有意识到这种重新连接并继续处理。这导致2个消费者处理相同消息的情况。

我本来希望使用分布式事务管理器,这可能发生在将来,但现在我需要一个不同的解决方案。

如果我不使用故障转移传输,那么我可以挂接到JMSException侦听器并中止使用者。不幸的是,这在使用故障转移传输时不起作用。

我想要使用故障转移传输进行初始连接(发现哪个activemq正在运行),然后强制故障转移不要重新连接...或者使用允许服务器列表尝试但不连接的不同传输不要重新连接...或者去找听听重新连接。

请注意,有时只有一台服务器使用故障切换(重新连接)。

我可以做我在初始连接逻辑(狩猎活动服务器),但要检查是否有另一种选择

+0

刚才我发现我可以使用发现来启动一个有效的连接 – Paul 2014-09-04 09:45:33

回答

0

您可以收听通过使用监听器传输的ActiveMQConnection事件:

connection = (ActiveMQConnection)factory.createConnection(); 
    connection.addTransportListener(new TransportListener() { 
    public void onCommand(Object command) { 
     // Do something 
    } 

    public void onException(IOException error) { 
     // Do something 
    } 

    public void transportInterupted() { 
     // Do something 
    } 

    public void transportResumed() { 
     // Do something 
    } 
}); 
connection.start(); 

请注意,在此示例中,监听器直接设置在Connection上;但是,您可以在ActiveMQConnectionFactory上设置一个实例,该实例将分配给它创建的每个Connection实例。

+0

不幸的是,我没有连接的句柄,因为我通过DefaultMessageListenerContainer从PooledConnectionFactory获取它。 – Paul 2014-09-04 11:44:11

+0

看编辑,你也可以在你的ActiveMQConnectionFactory上设置一个监听器。 – 2014-09-04 13:23:14

+0

虽然PooledConnectionfactory不公开它,但这是Spring-Boot为您提供的。我可以解决这个问题,但是我已经进行了测试,以获得有效的连接,然后侦听jms异常。 – Paul 2014-09-04 14:04:56