2013-06-18 118 views
0

我目前正在使用NMS来开发基于应用程序的ActiveMQ(5.6)。ActiveMQ:多用户连接到一个队列,但只有一个用户接收所有消息

我们有几个消费者(exe)试图从相同的队列(而不是主题)接收大量数据。虽然所有的信息都传给了一位消费者,尽管我已经让消费者在收到消息之后睡了几秒钟。顺便说一下,我们不希望消费者收到其他消费者收到的相同消息。

在官方网站中提到,我们应该设置预取限制,以决定在任何时间点可以向消费者流式传输多少条消息。它可以被配置和编码。

我尝试过的一种方法是使用PrefetchPolicy类来绑定ConnectionFactory类,如下图所示。

PrefetchPolicy poli = new PrefetchPolicy(); 
poli.QueuePrefetch = 0; 
ConnectionFactory fac = new ConnectionFactory("activemq:tcp://Localhost:61616?jms.prefetchPolicy.queuePrefetch=1"); 
fac.PrefetchPolicy = poli; 
using (IConnection con = fac.CreateConnection()) 
{ 
    using (ISession se = con.CreateSession()) 
    { 
     IDestination destination = SessionUtil.GetDestination(se, queue, DestinationType.Queue); 
     using (IMessageConsumer consumer = se.CreateConsumer(queue1)) 
     { 
      con.Start(); 
      while (true) 
      { 
       ITextMessage message = consumer.Receive() as ITextMessage; 
       Thread.Sleep(2000); 
       if (message != null) 
       { 
       Task.Factory.StartNew(() => extractAndSend(message.Text)); //do something 
       } 
       else 
       { 
       Console.WriteLine("No message received~"); 
       } 
     } 
     } 
    } 
} 

但是不管设置什么预取值,消费者的行为都和以前一样。

我尝试了第二种方式来获得结果,即配置服务器conf文件。我更改了服务器的activemq.xml,如下图所示。 “producerFlowControl =” 真 “的memoryLimit = ”5MB“/> ”producerFlowControl =“ 真” 的memoryLimit = “5MB”> 不过,虽然我已经设置了dispatchpolicy的消息仍然发给一位消费者。

我想知道: 只需配置服务器xml文件以使所有消费者都能从一个队列中接收消息,是否可以实现此行为?如果是这样,如何配置这个以及我的配置有什么问题?如果不是,我如何使用代码来实现目标? 谢谢。

回答

2

看看“消息组”功能。

我有同样的问题。只有一个消费者处理所有消息。我在我的代码中发现我送过程中使用的组头:

request.Properties["NMSXGroupID"] = "cheese"; 

根据官方文档:

标准JMS的JMSXGroupID头用来定义该消息属于该消息组 。然后,消息组功能会确保同一消息组的所有消息都会发送到同一个JMS 消费者 - 而该消费者仍然活着。只要消费者 死亡另一个将被选中。

查看完整的细节在http://activemq.apache.org/message-groups.html

相关问题