2014-01-12 156 views
1

我们试图将ActiveMQ 5.9.0作为使用JMS主题的消息代理进行设置,但我们在使用消息时遇到了一些问题。带有JMS主题的ActiveMQ - 消费者未出队的一些消息

出于测试目的,我们有1个主题,1个事件生产者和1个消费者的简单配置。我们一个接一个地发送10条消息,但每次运行应用程序时,都会消耗1-3条消息!其他消息被消耗并且处理得很好。 我们可以看到,即使我们重新启动应用程序(我们可以看到“Enqueue”和“Dequeue”中的数字),我们发布到ActiveMQ管理控制台中的主题的所有消息,但他们永远不会到达消费者,列是不同的)。

编辑:我还应该提到,当使用队列而不是主题时,不会出现此问题。

这是怎么发生的?它可能与atomikos(这是交易管理器)有关吗?或者也许在配置中的其他东西?任何想法/建议都欢迎。 :)

这是的ActiveMQ/JMS弹簧配置:

<bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" 
    init-method="init" destroy-method="close"> 
    <property name="uniqueResourceName" value="amq" /> 
    <property name="xaConnectionFactory"> 
     <bean class="org.apache.activemq.spring.ActiveMQXAConnectionFactory" 
      p:brokerURL="${activemq_url}" /> 
    </property> 
    <property name="maxPoolSize" value="10" /> 
    <property name="localTransactionMode" value="false" /> 
</bean> 

<bean id="cachedConnectionFactory" 
    class="org.springframework.jms.connection.CachingConnectionFactory"> 
    <property name="targetConnectionFactory" ref="connectionFactory" /> 
</bean> 

<!-- A JmsTemplate instance that uses the cached connection and destination --> 
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory" ref="cachedConnectionFactory" /> 
    <property name="sessionTransacted" value="true" /> 
    <property name="pubSubDomain" value="true"/> 
</bean> 

<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic"> 
    <constructor-arg value="test.topic" /> 
</bean> 

<!-- The Spring message listener container configuration --> 
<jms:listener-container destination-type="topic" 
    connection-factory="connectionFactory" transaction-manager="transactionManager" 
    acknowledge="transacted" concurrency="1"> 
    <jms:listener destination="test.topic" ref="testReceiver" 
     method="receive" /> 
</jms:listener-container> 

生产者:

@Component("producer") 
public class EventProducer { 

    @Autowired 
    private JmsTemplate jmsTemplate; 

    @Transactional 
    public void produceEvent(String message) { 
     this.jmsTemplate.convertAndSend("test.topic", message); 
    } 
} 

消费者:

@Component("testReceiver") 
public class EventListener { 

    @Transactional 
    public void receive(String message) { 
     System.out.println(message); 
    } 
} 

测试:

@Autowired 
    private EventProducer eventProducer; 

    public void testMessages() { 

    for (int i = 1; i <= 10; i++) { 
     this.eventProducer.produceEvent("message" + i); 
    } 

回答

3

这就是JMS主题的本质 - 只有当前订阅者默认接收邮件。在容器启动后,您有竞争条件并在消费者建立其订阅之前发送消息。这是单元/集成测试中常见的错误,其中包含您在同一应用程序中发送和接收的主题。

随着Spring的更新版本,有一个method you can poll to wait until the subscriber is established(从3.1,我认为)。或者,您可以在开始发送之前稍等片刻,或者您可以使订阅持久。

+0

嗨加里, 谢谢你的回答,但我不知道我理解它。 首先,我们没有使用JUnit,这只是我们创建的一个简单场景,用于在添加更多主题,侦听器等之前检查配置是否工作。 此外,丢失的消息不一定是我们发送的第一个(So一些消息在丢失之前已经被接收)。 在我们开始发送之前,我们尝试添加Thread.sleep(2000),但它没有解决问题,并且我们也不会在真实场景中执行它...... 我会检查您提供的链接并看看它是否有帮助。 :) – Ayelet

+0

另外,您是否可以举例说明如何在spring xmls中配置持久订阅? 谢谢!:) – Ayelet

+1

我改变了我的答案,删除对JUnit的引用;我的意思是测试一般你在哪里发送和接收在同一个应用程序。确保在发送消息之前上下文已完全刷新并且消费者已启动。查看TRACE级别的日志以观察消费者开始消费。持久订阅:请参阅此处的clientId,destinationType和订阅:http://docs.spring.io/spring-framework/docs/current/spring-framework-reference/html/jms.html#jms-namespace记住订阅在第一次订阅完成之前不会持续(因此您的第一次尝试可能会失败)。 –