我正在使用Spring Boot和ActiveMQ。我想发送和接收来自主题的消息。这很好。我的代码如下所示:用于Spring Boot的DLQ主题
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {
JmsSpike.TestListener1.class,
JmsSpike.TestListener2.class,
JmsSpike.Config.class
})
@TestPropertySource(properties = {
"spring.activemq.broker-url: tcp://localhost:61616",
"spring.activemq.password: admin",
"spring.activemq.user: admin",
"spring.jms.pub-sub-domain: true", // queue vs. topic
})
@EnableJms
@EnableAutoConfiguration
public class JmsSpike {
@Autowired
private JmsTemplate jmsTemplate;
@Test
public void sendMessage() throws Exception {
sendMessageInThread();
Thread.sleep(10000);
}
private void sendMessageInThread() {
new Thread() {
public void run() {
jmsTemplate.convertAndSend("asx2ras", "I'm a test");
}
}.start();
}
@TestComponent
protected static class TestListener1 {
@JmsListener(destination = "asx2ras")
public void receiveMessage(String message) {
System.out.println("****************** 1 *******************");
System.out.println("Hey 1! I got a message: " + message);
System.out.println("****************** 1 *******************");
}
}
@TestComponent
protected static class TestListener2 {
@JmsListener(destination = "asx2ras")
public void receiveMessage(String message) {
throw new RuntimeException("Nope");
}
}
@Configuration
protected static class Config {
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
topicPolicy.setMaximumRedeliveries(1);
return topicPolicy;
}
@Bean
public ConnectionFactory connectionFactory(@Value("${spring.activemq.user}") final String username,
@Value("${spring.activemq.password}") final String password,
@Value("${spring.activemq.broker-url}") final String brokerUrl) {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(username, password, brokerUrl);
cf.setRedeliveryPolicy(redeliveryPolicy());
return cf;
}
}
}
- 我可以发送消息
- 我收到消息既听众
- 一个听者永远是可行的,只是打印一些控制台消息
- 其他监听器将总是会抛出异常
我将重试设置为“1”,因此失败的侦听器将在exc之后再次被调用eption被抛出。但是,在重试之后,消息不会传递到错误队列(或错误主题)。如何将消息发送到错误队列,以便稍后再次调用失败的监听器?
请注意,我只想再次调用失败的侦听器,而不是所有主题上的侦听器。那可能吗?
编辑
这里是我的activemq.xml
(只是broker
标签):
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
你能在这里发表什么是在activemq.xml中的代理配置。你在使用持久性吗?来自官方文档:“默认情况下,ActiveMQ不会将无法传递的非持久消息放在死信队列中。” –
@IulianRosca当然。我更新了我的问题。 –
感谢您的更新。我现在要做的就是在activemq.xml中设置maximumRedeliveries为1。默认情况下,这些值设置为6(http://activemq.apache.org/redelivery-policy.html)。如果在此配置之后,消息以DLQ结束,则意味着客户机覆盖代理重新传送策略时机制存在问题。如果启用调试日志记录,则可以查看协商值(WireFormat)。 –