2013-04-25 86 views
0

我有一个正在发送关于使用ActiveMQ的一些事件JMS消息消息生产者。 但是,连接到ActiveMQ可能不会一直运行。因此,事件被存储,当连接建立时,它们被假设为被读取并被发送。这里是我的代码:JMS连接建立时如何捕获?

private void sendAndSave(MyEvent event) { 
    boolean sent = sendMessage(event); 
    event.setProcessed(sent); 
    boolean saved = repository.saveEvent(event); 
    if (!sent && !saved) { 
     logger.error("Change event lost for Id = {}", event.getId()); 
    } 
} 

private boolean sendMessage(MyEvent event) { 
    try { 
     messenger.publishEvent(event); 
     return true; 
    } catch (JmsException ex) { 
     return false; 
    } 
} 

我想创建某种ApplicationEventListener当建立和处理未发送的事件连接将被调用。 我通过JMS,Spring框架和ActiveMQ的文件去也没有找到任何线索如何连接我的听众与连接工厂。

如果有人能帮助我,我会很感激。

这里是我的应用程序的Spring上下文说,大约JMS:

<!-- Connection factory to the ActiveMQ broker instance.    --> 
<!-- The URI and credentials must match the values in activemq.xml --> 
<!-- These credentials are shared by ALL producers.     --> 
<bean id="jmsTransportListener" class="com.rhd.ams.service.common.JmsTransportListener" 
     init-method="init" destroy-method="cleanup"/> 
<bean id="amqJmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
    <property name="brokerURL" value="${jms.publisher.broker.url}"/> 
    <property name="userName" value="${jms.publisher.username}"/> 
    <property name="password" value="${jms.publisher.password}"/> 
    <property name="transportListener" ref="jmsTransportListener"/> 
</bean> 

<!-- JmsTemplate, by default, will create a new connection, session, producer for   --> 
<!-- each message sent, then close them all down again. This is very inefficient!   --> 
<!-- PooledConnectionFactory will pool the JMS resources. It can't be used with consumers.--> 
<bean id="pooledAmqJmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> 
    <property name="connectionFactory" ref="amqJmsConnectionFactory" /> 
</bean> 

<!-- Although JmsTemplate instance is unique for each message, it is --> 
<!-- thread-safe and therefore can be injected into referenced obj's. --> 
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <constructor-arg ref="pooledAmqJmsConnectionFactory"/> 
</bean> 
+0

您需要详细说明您的设置。你使用故障转移运输等? – 2013-04-25 13:59:02

+0

我已将Spring应用上下文添加到了我的原始消息中。 – 2013-04-25 17:50:07

+0

因为它省略了连接URI,所以它仍然没用。 – 2013-04-25 19:38:05

回答

1

您所描述的问题的方法,它肯定听起来像JMS持久订阅的开和关的情况。在走这条路之前,您可能会考虑更传统的实施。注意事项之外,ActiveMQ的提供Advisory Messages,您可以监听并会举办各种活动,包括新的连接发送。

=========

拍摄,对不起......我不明白的问题了。我认为Advisories根本就不是解决方案......毕竟,你需要连接到经纪人才能获得他们,但连接是你知道的。

因此,如果我理解正确(准备重试#2 ....),您需要的是一个客户端连接,当它失败时,将尝试无限期重新连接。当它重新连接时,你想要触发一个事件(或更多)将未决消息刷新到代理。

所以检测失去的连接是容易的。您只需注册一个JMS ExceptionListener。至于检测重新连接,我能想到的最简单的方法是启动重新连接线程。当它连接时,停止重新连接线程并使用Observer/Observable或JMX通知等通知相关方。您可以使用ActiveMQ Failover Transport,即使您只有一个代理,它也会为您执行连接重试循环。至少,这是supposed to,但它不会为你做那么多,你自己的重新连接线程不会完成... 但是如果你愿意委托它的一些控制,它会缓存你未刷新的消息(请参阅trackMessages选项),然后在重新连接时发送它们,这是您尝试执行的所有操作。

我猜如果你的经纪人宕机了几分钟,这不是一个坏的方法,但如果你在说几个小时,或者你可能在停机时间累积10k +的消息,我只是不知道是否缓存机制与您所需要的一样可靠。

==================

移动应用程序...权。不适合故障转移传输。然后我会实现一个定期连接的定时器(可能是使用http传输的一个好主意,但不相关)。当它连接时,如果没有什么要冲洗的话,那么在x分钟内见到你。如果有,请发送每封邮件,等待握手并清除手机商店的邮件。然后在x分钟再次见到你。

我认为这是Android?如果没有,请停止阅读。我们实际上是在一段时间之前实施的我只做了服务器端,但如果我没有记错的话,连接计时器/轮询器每n分钟就会旋转一次(我认为可变频率,因为变得过于激进就会耗尽电池)。一旦成功建立联系,我相信他们会使用意向广播来推动信息推动者做他们的事情。这个想法是,即使只有一个消息推送者,我们可能会添加更多。

+0

它看起来不像JMS Durable订阅。通过定义持久订阅消息订户可能远离JMS连接,并且当它们最终连接时,它们将接收这些消息。 在我的情况下,消息生成器可能会失去JMS连接,但会存储所有生成的消息。当它连接时,它需要发送所有未发送的消息。 关于ActiveMQ咨询支持,它可能是一个解决方案,但我找不到CONNECTION_ADVISORY_TOPIC消息携带的信息。 谷歌搜索没有帮助。你知道任何代码示例吗? – 2013-04-25 21:33:31

+0

更新的原始答复。 – Nicholas 2013-04-25 22:48:47

+0

如果我将创建自己的重新连接循环,它将干扰ActiveMQ故障切换传输,不是吗? 如果ActiveMQ故障转移传输处理重试循环,我的应用程序可以如何通知连接已重新建立? ActiveMQ Failover Transport提供的缓存大小太小。 我的客户端是移动应用程序,可以远离WIFI数小时。 我将未发送的事件存储在磁盘上。 – 2013-04-26 18:02:05