2012-01-15 95 views
5

我有一个系统实现了一些服务器之间通信的Camel和ActiveMQ。我想知道是否有办法在X时间段后自动过期并清除发送到队列的消息。由于始发服务器(填充队列)不知道是否有人正在收集消息,我不希望我的队列增长到其大到某个东西崩溃为止。 Bonus karma指向可以帮助并提供java dsl方式来实现此功能的人员。Camel中消息的自动到期

解决方案

// expire message after 2 minutes 
long ttl = System.currentTimeMillis() + 120000; 
// send our info one-way to the group topic 
camelTemplate.sendBodyAndHeader("jms:queue:stats", ExchangePattern.InOnly, stats, "JMSExpiration", ttl); 

回答

1

嘛 setJMSExpiration(长过期):

是什么东西一定不要当你是客户时打电话。在ActiveMQ论坛上查看我的演讲。

http://apache-qpid-developers.2158895.n2.nabble.com/MRG-Java-JMS-Expiration-td7171854.html

+0

在论坛上的留言中,看起来好像你想要10分钟过期,而是设置了10秒。恩。 (10 *(60 * 1000))vs(10 * 1000) – 2012-01-15 19:45:25

+0

@Mondain nope,我想要10秒钟。 10分钟是从队列中清除过期消息时的Qpid默认设置。例如:你发送了一条消息到队列过期设置为10秒,所以在10秒后消息被QPid过期,并且没有消费者可以使用它,但事实上这个消息是删除bu QPid,当删除策略线程s)踢了,默认设置是他们踢了10分钟后 – Eugene 2012-01-16 07:01:16

+0

好吧,我误解了你在那里发布的内容。 – 2012-01-16 15:44:07

3

还介意客户端之间的时钟 - 经纪人需要保持同步,为到期的正常工作。如果时钟不同步,则在代理上收到消息时,客户端设置的失效可能已经过期。或者客户时间超过经纪人,所以到期时间超过10秒。

它有点击败我为什么到期是客户端时间为基础。所以AMQ提供了一个插件,通过重新调整时间来解决这个问题,只能以经纪人为基础。请参阅http://activemq.apache.org/timestampplugin.html

1

就我们而言,我们选择使用部署在ActiveMQ服务本身中的骆驼路线向特定目的地添加到期时间。

唯一要做的就是创建一个如下名称的XML文件,例如: setJMSExpiration.xml

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation=" 
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd 
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> 

    <camelContext id="camel-set-expiration" xmlns="http://camel.apache.org/schema/spring"> 
    <!-- Copy route for each destination to expire --> 
    <route id="setJMSExpiration.my.queue.dlq"> 
     <from uri="broker:queue:MY.QUEUE.DLQ"/> 
     <setHeader headerName="JMSExpiration"> 
      <!-- Message will expire after 1 day --> 
      <spel>#{T(java.lang.System).currentTimeMillis() + 86400000}</spel> 
     </setHeader> 
     <to uri="broker:queue:MY.QUEUE.DLQ"/> 
    </route> 
    <route id="setJMSExpiration.another.queue"> 
     <from uri="broker:queue:ANOTHER.QUEUE"/> 
     <setHeader headerName="JMSExpiration"> 
      <!-- Message will expire after 5 days --> 
      <spel>#{T(java.lang.System).currentTimeMillis() + 432000000}</spel> 
     </setHeader> 
     <to uri="broker:queue:ANOTHER.QUEUE"/> 
    </route> 
    </camelContext> 
</beans> 

,并导入它在你的activemq.xml配置:

<!-- Add default Expiration (file in the same directory) --> 
<import resource="setJMSExpiration.xml"/> 

或者您也可以提供特定per destination policies,如果你不想让过期消息到达ActiveMQ.DLQ队列。

<policyEntry queue="MY.QUEUE.DLQ"> 
    <deadLetterStrategy> 
     <sharedDeadLetterStrategy processExpired="false" /> 
    </deadLetterStrategy> 
</policyEntry> 
<policyEntry queue="ANOTHER.QUEUE"> 
    <deadLetterStrategy> 
     <sharedDeadLetterStrategy processExpired="false" /> 
    </deadLetterStrategy> 
</policyEntry> 

这种方式的唯一的限制就是你不能轻易使用通配符,因为它是在这里编码(你可以,但它会通过在骆驼航线使用JMS目的地标题需要一些调整)。

我们尝试让生产者定义timeToLive(并尽可能强制它们),但并不总是可以强制他们更改其代码,这样可以最大限度地减少此类路由的数量。