2010-06-03 76 views
0

我有一个情况,我有一个activemq代理与2个队列,Q1和Q2。我有两个使用activemessaging的基于ruby的消费者。我们称他们为C1和C2。两个消费者都订阅每个队列。订阅每个队列时,我将activemq.prefetchSize设置为1。我也设置了ack =客户端。activemeqaging与跺脚和activemq.prefetchSize = 1

考虑事件的序列如下:

1)触发一个长时间运行的工作被发布到队列Q1的消息。称这个M1。

2)M1发送给消费者C1,开始长时间的操作。

3)两条触发短作业的消息被发布到队列Q2。称这些M2和M3。

4)M2被派遣到C2快速运行短期工作。

5)即使C1仍在运行M1,M3仍被分派到C1。它能够分派到C1,因为在队列订阅上设置了prefetchSize = 1,而不是在连接上。所以Q1消息已经被分派的事实并不会阻止一个Q2消息被分派。

由于主动消费者是单线程的,最终结果是M3坐在C1上等待很长时间直到C1完成处理M1。所以,尽管事实上消费者C2处于空闲状态(因为它很快完成了消息M2),所以M3不被处理很长一段时间。

从本质上讲,无论什么时候运行一个长的Q1作业,然后创建了一大堆短的Q2作业,恰好有一个短的Q2作业卡住了等待第一季度作业完成的消费者。

有没有办法在连接级别而不是在订阅级别设置prefetchSize?我真的不希望在处理M1时向C1发送任何消息。另一种选择是我可以创建一个专门处理Q1的消费者,然后让其他消费者致力于处理Q2。但是,由于Q1消息不多,我宁愿不这样做 - Q1的专门消费者会在一天的大部分时间内闲置以占用内存。

回答

2

根据ActiveMQ文档的扩展脚本头(http://activemq.apache.org/stomp.html),activemq.prefetchSize仅在SUBSCRIBE消息中可用,而不是CONNECT。下面是相关信息:

动词:SUBSCRIBE

头:activemq.prefetchSize

类型:int

描述:指定将 被分派的最大数目 未决消息的给客户。一旦达到最大值 ,将不会再发送更多消息 ,直到客户端 确认消息。设置为1为 消息处理 消息可能很慢,消息的消息的公平分配 。

我对此的阅读和体验是,由于M1没有被确认(b/c你有客户端确认打开),所以这个M1应该是prefetchSize = 1设置允许的1条消息订阅。我很惊讶地听说它不起作用,但也许我需要进行更详细的测试。您的设置应该适合您所需的行为。

我听说过其他人关于activemq dispatch的片状,所以有可能这是你使用的版本的错误。

我的一个建议是嗅探网络流量以查看M1是否由于某种原因得到了回应,或者将一些置入语句放入ruby stomp gem中以观察通信(这是我通常最终在调试跺脚问题时做)。

如果我有机会尝试一下,我会用我自己的结果更新我的评论。

一个建议:很可能会发送多个长处理消息,并且如果长处理消息的数量超过了您的处理数量,那么您将在此修复中处理快速处理消息。

我倾向于至少有一个专门的进程,只是做快速的工作,或者换个说法,专用一组只能做更长时间工作的进程。让所有的轮询器消费者进程听取多空均可能导致次优结果,而不管分派如何。进程组是配置一个消费者听目的地的一个子集的方式:http://code.google.com/p/activemessaging/wiki/Configuration

processor_group名, * list_of_processors

A processor group is a way to run the poller to only execute a subset of 

处理器在轮询组通过的 名称命令行 参数。

You specify the name of the processor as its underscored lowercase 

版本。所以,如果你有一个 FooBarProcessor和BarFooProcessor在 处理器组,它看起来像 这样:

ActiveMessaging::Gateway.define do |s| 
     ... 
     s.processor_group :my_group, :foo_bar_processor, :bar_foo_processor 
    end 

The processor group is passed into the poller like the following: 

    ./script/poller start -- process-group=my_group 
+0

感谢您的回复。我们使用处理器组来区分高优先级和低优先级作业。高优先级的工作是由我们的用户界面启动的,低优先级的工作由cron启动。我们可以在这里使用一个处理器组,但是长时间运行的工作实际上每天只能启动一次。我们可以从activemq中完全删除它。对于长期的工作,我们主要是使用activemq进行容错,因为我们在多个服务器上拥有消费者。 – 2010-06-03 23:19:18

0

我不知道,如果ActiveMessaging支持这一点,但你可以取消你的其他消费者当长处理消息到达,然后在处理后重新订阅它们。

它应该给你想要的效果。

+0

不支持主动消息。 – 2012-02-28 20:30:38