2011-02-08 310 views
17

有一种方法可以抑制在ActiveMQ服务器上定义的队列上的重复消息?在JMS/ActiveMQ上避免重复的消息

我尝试手动定义JMSMessageID(message.setJMSMessageID(“uniqueid”)),但服务器忽略此修改并传递带有内置生成的JMSMessageID的消息。

根据规范,我没有找到关于如何删除重复消息的参考。

在HornetQ中,为了处理这个问题,我们需要在消息定义上声明HQ特有的属性org.hornetq.core.message.impl.HDR_DUPLICATE_DETECTION_ID。

即:

Message jmsMessage = session.createMessage(); 
String myUniqueID = "This is my unique id"; // Could use a UUID for this 
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID); 

有人知道是否有对应的ActiveMQ类似的解决方案?

回答

7

你应该看看Apache的骆驼,它提供了一个幂等消费组件将与任何JMS提供者的工作,请参阅:http://camel.apache.org/idempotent-consumer.html

使用与ActiveMQ的组件使得使用JMS非常简单组合,请参阅: http://camel.apache.org/activemq.html

+1

我怀疑这种方法是否能解决我的问题。我只需要在队列中使用相同的JMSMessageID保留一个消息实例。我需要它作为一个集合工作。我希望能够将最新的idem元素从队列中移除后,使用相同的JMSMessageID放置其他消息。我需要实施它并进行测试。但是,基于在EAI书中描述的Idempotent,我认为这个概念与我的必要性不符。 BUt,建议的解决方案很好。我会更多地研究它并在这里评论我的结果。谢谢 – apast 2011-02-11 01:31:58

4

我怀疑ActiveMQ本身是否支持它,但它应该很容易实现幂等消费者。做到这一点的一种方法是在生产者端为每个消息添加一个唯一的标识符,现在在消费者端使用一个存储区(db,cache等),可以检查消息是否在之前被接收,以及继续根据该检查进行处理。

我沿同一行看到一个前面的stackoverflow问题 - Apache ActiveMQ 5.3 - How to configure a queue to reject duplicate messages?,这可能也有帮助。

3

有一种方法可以使ActiveMQ根据JMS属性过滤重复项。它涉及编写一个Activemq Plugin。将重复消息发送到死信队列的基本代理过滤器就是这样的

import java.util.List; 
import java.util.regex.Matcher; 
import java.util.regex.Pattern; 

import org.apache.activemq.broker.Broker; 
import org.apache.activemq.command.Message; 
import org.apache.activemq.command.ActiveMQMessage; 
import org.apache.activemq.broker.BrokerFilter; 
import org.apache.activemq.broker.ConnectionContext; 
import org.apache.activemq.command.ConnectionInfo; 
import org.apache.activemq.broker.ProducerBrokerExchange; 

public class DuplicateFilterBroker extends BrokerFilter { 
    String messagePropertyName; 
    boolean switchValue; 

    public DuplicateFilterBroker(Broker next, String messagePropertyName) { 
     super(next); 
     this.messagePropertyName = messagePropertyName; 
    } 

    public boolean hasDuplicate(String propertyValue){ 
     switchValue = propertyValue; 
     return switchValue; 
    } 

    public void send(ProducerBrokerExchange producerExchange, Message msg) throws Exception { 
     ActiveMQMessage amqmsg = (ActiveMQMessage)msg; 
     Object msgObj = msg.getMessage(); 
     if (msgObj instanceof javax.jms.Message) { 
      javax.jms.Message jmsMsg = (javax.jms.Message) msgObj; 
      if (!hasDuplicate(jmsMsg.getStringProperty(messagePropertyName))) { 
       super.send(producerExchange, msg); 
      } 
      else { 
       sendToDeadLetterQueue(producerExchange.getConnectionContext(), msg); 
      } 
     } 
    } 
} 
+0

绝对是一个非常好的方法来解决这个问题。 – user1052080 2013-01-11 14:05:28

3

现在支持删除烘焙到ActiveMQ传输中的重复消息。请参阅Connection Configuration Guide中的配置值auditDepthauditMaximumProducerNumber

+3

您如何配置这些参数以避免重复? – Thomas 2013-08-26 09:44:20

0

看起来问题中提出的方式,适用于ActiveMQ(2016/12)。请参阅activemq-artemis指南。这要求生产者在消息中设置一个特定的属性。

Message jmsMessage = session.createMessage(); 
String myUniqueID = "This is my unique id"; // Could use a UUID for this 
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID); 

但是包含属性的类是不同的: org.apache.activemq.artemis.core.message.impl.HDR_DUPLICATE_DETECTION_ID和属性值是_AMQ_DUPL_ID