2017-03-23 24 views
0

我正在使用Spring Boot和ActiveMQ。我想发送和接收来自主题的消息。这很好。我的代码如下所示:用于Spring Boot的DLQ主题

@RunWith(SpringRunner.class) 
@SpringBootTest(classes = { 
     JmsSpike.TestListener1.class, 
     JmsSpike.TestListener2.class, 
     JmsSpike.Config.class 
}) 
@TestPropertySource(properties = { 
     "spring.activemq.broker-url: tcp://localhost:61616", 
     "spring.activemq.password: admin", 
     "spring.activemq.user: admin", 
     "spring.jms.pub-sub-domain: true", // queue vs. topic 
}) 
@EnableJms 
@EnableAutoConfiguration 
public class JmsSpike { 

    @Autowired 
    private JmsTemplate jmsTemplate; 

    @Test 
    public void sendMessage() throws Exception { 
     sendMessageInThread(); 
     Thread.sleep(10000); 
    } 

    private void sendMessageInThread() { 
     new Thread() { 
      public void run() { 
       jmsTemplate.convertAndSend("asx2ras", "I'm a test"); 
      } 
     }.start(); 
    } 

    @TestComponent 
    protected static class TestListener1 { 

     @JmsListener(destination = "asx2ras") 
     public void receiveMessage(String message) { 
      System.out.println("****************** 1 *******************"); 
      System.out.println("Hey 1! I got a message: " + message); 
      System.out.println("****************** 1 *******************"); 
     } 
    } 

    @TestComponent 
    protected static class TestListener2 { 

     @JmsListener(destination = "asx2ras") 
     public void receiveMessage(String message) { 
      throw new RuntimeException("Nope"); 
     } 
    } 

    @Configuration 
    protected static class Config { 

     @Bean 
     public RedeliveryPolicy redeliveryPolicy() { 
      RedeliveryPolicy topicPolicy = new RedeliveryPolicy(); 
      topicPolicy.setMaximumRedeliveries(1); 
      return topicPolicy; 
     } 

     @Bean 
     public ConnectionFactory connectionFactory(@Value("${spring.activemq.user}") final String username, 
                @Value("${spring.activemq.password}") final String password, 
                @Value("${spring.activemq.broker-url}") final String brokerUrl) { 

      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(username, password, brokerUrl); 
      cf.setRedeliveryPolicy(redeliveryPolicy()); 
      return cf; 
     } 
    } 
} 
  • 我可以发送消息
  • 我收到消息既听众
  • 一个听者永远是可行的,只是打印一些控制台消息
  • 其他监听器将总是会抛出异常

我将重试设置为“1”,因此失败的侦听器将在exc之后再次被调用eption被抛出。但是,在重试之后,消息不会传递到错误队列(或错误主题)。如何将消息发送到错误队列,以便稍后再次调用失败的监听器?

请注意,我只想再次调用失败的侦听器,而不是所有主题上的侦听器。那可能吗?


编辑

这里是我的activemq.xml(只是broker标签):

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"> 

    <destinationPolicy> 
     <policyMap> 
      <policyEntries> 
      <policyEntry topic=">" > 
       <pendingMessageLimitStrategy> 
       <constantPendingMessageLimitStrategy limit="1000"/> 
       </pendingMessageLimitStrategy> 
      </policyEntry> 
      </policyEntries> 
     </policyMap> 
    </destinationPolicy> 

    <managementContext> 
     <managementContext createConnector="false"/> 
    </managementContext> 

    <persistenceAdapter> 
     <kahaDB directory="${activemq.data}/kahadb"/> 
    </persistenceAdapter> 

     <systemUsage> 
     <systemUsage> 
      <memoryUsage> 
       <memoryUsage percentOfJvmHeap="70" /> 
      </memoryUsage> 
      <storeUsage> 
       <storeUsage limit="100 gb"/> 
      </storeUsage> 
      <tempUsage> 
       <tempUsage limit="50 gb"/> 
      </tempUsage> 
     </systemUsage> 
    </systemUsage> 

    <transportConnectors> 
     <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
     <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
     <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
     <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
     <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
     <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
    </transportConnectors> 

    <shutdownHooks> 
     <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> 
    </shutdownHooks> 

</broker> 
+0

你能在这里发表什么是在activemq.xml中的代理配置。你在使用持久性吗?来自官方文档:“默认情况下,ActiveMQ不会将无法传递的非持久消息放在死信队列中。” –

+0

@IulianRosca当然。我更新了我的问题。 –

+0

感谢您的更新。我现在要做的就是在activemq.xml中设置maximumRedeliveries为1。默认情况下,这些值设置为6(http://activemq.apache.org/redelivery-policy.html)。如果在此配置之后,消息以DLQ结束,则意味着客户机覆盖代理重新传送策略时机制存在问题。如果启用调试日志记录,则可以查看协商值(WireFormat)。 –

回答

1

按照official documentation,可以覆盖在客户端代理配置:

经纪人将他偏好的默认投递策略发送给 客户端连接在他的BrokerInfo命令包中。但是,客户端可以 使用 ActiveMQConnection.getRedeliveryPolicy()方法来覆盖的策略设置:

RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 
policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); 
policy.setUseExponentialBackOff(true); 
policy.setMaximumRedeliveries(2); 

因此,您所配置的再分发政策的方式似乎确定。

我看到的唯一问题是当您创建RedeliveryPolicy的新实例并仅设置单个字段topicPolicy.setMaximumRedeliveries(1);所有其他原语字段都将被分配默认值。你或许应该设置最大重复传递就交还政策的现有实例:

RedeliveryPolicy policy = cf.getRedeliveryPolicy(); 
policy.setMaximumRedeliveries(1); 

编辑

同时,确保使用@JmsListener没有使用CLIENT_ACKNOWLEDGE。根据此thread,当使用CLIENT_ACKNOWLEDGE时,消息不会被重新发送。