2012-03-26 57 views
6

在下面的测试我试图模拟以下情形:如何在AUTO_ACKNOWLEDGE JMS会话场景中模拟消息重新传递?

  1. 消息队列开始。
  2. 设计为在消息处理过程中失败的消费者已启动。
  3. 生成消息。
  4. 消费者开始处理消息。
  5. 处理期间抛出异常来模拟消息处理失败。失败的消费者停止。
  6. 另一位消费者的意图是选择重新发送的消息。

但是我的测试失败,消息没有重新发送给新的消费者。我会很感激这方面的任何提示。

MessageProcessingFailureAndReprocessingTest.java

@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig", 
     loader=JavaConfigContextLoader.class) 
public class MessageProcessingFailureAndReprocessingTest extends AbstractJUnit4SpringContextTests { 
    @Autowired 
    private FailureReprocessTestScenario testScenario; 

    @Before 
    public void setUp() { 
     testScenario.start(); 
    } 

    @After 
    public void tearDown() throws Exception { 
     testScenario.stop(); 
    } 

    @Test public void 
    should_reprocess_task_after_processing_failure() { 
     try { 
      Thread.sleep(20*1000); 

      assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{ 
        "task-1", 
      }))); 
     } catch (InterruptedException e) { 
      fail(); 
     } 
    } 

    @Configurable 
    public static class FailureReprocessTestScenario { 
     @Autowired 
     public BrokerService broker; 

     @Autowired 
     public MockTaskProducer mockTaskProducer; 

     @Autowired 
     public FailingWorker failingWorker; 

     @Autowired 
     public SucceedingWorker succeedingWorker; 

     @Autowired 
     public TaskScheduler scheduler; 

     public void start() { 
      Date now = new Date(); 
      scheduler.schedule(new Runnable() { 
       public void run() { failingWorker.start(); } 
      }, now); 

      Date after1Seconds = new Date(now.getTime() + 1*1000); 
      scheduler.schedule(new Runnable() { 
       public void run() { mockTaskProducer.produceTask(); } 
      }, after1Seconds); 

      Date after2Seconds = new Date(now.getTime() + 2*1000); 
      scheduler.schedule(new Runnable() { 
       public void run() { 
        failingWorker.stop(); 
        succeedingWorker.start(); 
       } 
      }, after2Seconds); 
     } 

     public void stop() throws Exception { 
      succeedingWorker.stop(); 
      broker.stop(); 
     } 
    } 

    @Configuration 
    @ImportResource(value={"classpath:applicationContext-jms.xml", 
      "classpath:applicationContext-task.xml"}) 
    public static class ContextConfig { 
     @Autowired 
     private ConnectionFactory jmsFactory; 

     @Bean 
     public FailureReprocessTestScenario testScenario() { 
      return new FailureReprocessTestScenario(); 
     } 

     @Bean 
     public MockTaskProducer mockTaskProducer() { 
      return new MockTaskProducer(); 
     } 

     @Bean 
     public FailingWorker failingWorker() { 
      TaskListener listener = new TaskListener(); 
      FailingWorker worker = new FailingWorker(listenerContainer(listener)); 
      listener.setProcessor(worker); 
      return worker; 
     } 

     @Bean 
     public SucceedingWorker succeedingWorker() { 
      TaskListener listener = new TaskListener(); 
      SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener)); 
      listener.setProcessor(worker); 
      return worker; 
     } 

     private DefaultMessageListenerContainer listenerContainer(TaskListener listener) { 
      DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer(); 
      listenerContainer.setConnectionFactory(jmsFactory); 
      listenerContainer.setDestinationName("tasksQueue"); 
      listenerContainer.setMessageListener(listener); 
      listenerContainer.setAutoStartup(false); 
      listenerContainer.initialize(); 
      return listenerContainer; 
     } 

    } 

    public static class FailingWorker implements TaskProcessor { 
     private Logger LOG = Logger.getLogger(FailingWorker.class.getName()); 

     private final DefaultMessageListenerContainer listenerContainer; 

     public FailingWorker(DefaultMessageListenerContainer listenerContainer) { 
      this.listenerContainer = listenerContainer; 
     } 

     public void start() { 
      LOG.info("FailingWorker.start()"); 
      listenerContainer.start(); 
     } 

     public void stop() { 
      LOG.info("FailingWorker.stop()"); 
      listenerContainer.stop(); 
     } 

     @Override 
     public void processTask(Object task) { 
      LOG.info("FailingWorker.processTask(" + task + ")"); 
      try { 
       Thread.sleep(1*1000); 
       throw Throwables.propagate(new Exception("Simulate task processing failure")); 
      } catch (InterruptedException e) { 
       LOG.log(Level.SEVERE, "Unexpected interruption exception"); 
      } 
     } 
    } 

    public static class SucceedingWorker implements TaskProcessor { 
     private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName()); 

     private final DefaultMessageListenerContainer listenerContainer; 

     public final List<String> processedTasks; 

     public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) { 
      this.listenerContainer = listenerContainer; 
      this.processedTasks = new ArrayList<String>(); 
     } 

     public void start() { 
      LOG.info("SucceedingWorker.start()"); 
      listenerContainer.start(); 
     } 

     public void stop() { 
      LOG.info("SucceedingWorker.stop()"); 
      listenerContainer.stop(); 
     } 

     @Override 
     public void processTask(Object task) { 
      LOG.info("SucceedingWorker.processTask(" + task + ")"); 
      try { 
       TextMessage taskText = (TextMessage) task; 
       processedTasks.add(taskText.getText()); 
      } catch (JMSException e) { 
       LOG.log(Level.SEVERE, "Unexpected exception during task processing"); 
      } 
     } 
    } 

} 

TaskListener.java

public class TaskListener implements MessageListener { 

    private TaskProcessor processor; 

    @Override 
    public void onMessage(Message message) { 
     processor.processTask(message); 
    } 

    public void setProcessor(TaskProcessor processor) { 
     this.processor = processor; 
    } 

} 

MockTaskProducer.java

@Configurable 
public class MockTaskProducer implements ApplicationContextAware { 
    private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName()); 

    @Autowired 
    private JmsTemplate jmsTemplate; 

    private Destination destination; 

    private int taskCounter = 0; 

    public void produceTask() { 
     LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")"); 

     taskCounter++; 

     jmsTemplate.send(destination, new MessageCreator() { 
      @Override 
      public Message createMessage(Session session) throws JMSException { 
       TextMessage message = session.createTextMessage("task-" + taskCounter); 
       return message; 
      } 
     }); 
    } 

    @Override 
    public void setApplicationContext(ApplicationContext applicationContext) 
      throws BeansException { 
     destination = applicationContext.getBean("tasksQueue", Destination.class); 
    } 
} 
+1

当我设置'listenerContainer.setSessionTransacted(true)'我看到消息被重新传递,但只传递给'FailingWorker'。在停止相应的侦听器容器之后的事件中,'SucceedingWorker'永远不会获得重新发送的消息。 – 2012-03-26 13:57:47

+1

我出现了'listenerContainer.stop()' - 方法不关闭与提供的连接,因此JMS提供程序继续尝试将失败的消息重新传递给同一个使用者。为了避免失败的使用者在某个时候应该调用listenerContainer.shutdown()。 – 2012-03-27 08:36:02

回答

7

显然我昨天看到的文档的来源Creating Robust JMS Applications以某种方式误导了我(或者我可能错误地理解了它)。特别是摘录如下:

在确认JMS消息之前,它不被认为是成功使用了 。成功消费 通常发生在三个阶段。

  1. 客户端收到该消息。
  2. 客户端处理消息。
  3. 该消息被确认。确认由JMS提供商或客户端启动,具体取决于会话 确认模式。

我认为AUTO_ACKNOWLEDGE正是这么做的 - 确认该消息监听器方法返回一个结果之后。但是根据JMS规范,它有点不同,而Spring收听器的容器并不会试图改变JMS规范的行为。这是用AbstractMessageListenerContainer的Javadoc,不得不说 - 我已经强调了重要的句子:

监听器容器提供了以下消息确认 选项:

  • “SessionAcknowledgeMode来”设为“ AUTO_ACKNOWLEDGE“(默认):在侦听器执行之前自动发送消息确认;在抛出异常情况下无法重新投递。
  • “sessionAcknowledgeMode”设置为“CLIENT_ACKNOWLEDGE”:成功侦听器执行后自动确认消息;没有 在发生异常情况下重新发送。
  • “sessionAcknowledgeMode”设置为“DUPS_OK_ACKNOWLEDGE”:在侦听器执行期间或之后的惰性消息确认;潜在的 在发生异常情况下重新发送。
  • “sessionTransacted”设置为“true”:成功侦听器执行后的事务确认;保证在发生异常情况下的重新投递。

所以关键要我的解决方案是listenerContainer.setSessionTransacted(true);

我面临的另一个问题是,JMS提供者保留重新传送失败的消息发送回了邮件的处理过程中失败的相同的消费者。我不知道JMS规范是否给出了处方提供者应该在这种情况下应该做什么,但是对我来说有效的是使用listenerContainer.shutdown();来断开发生故障的消费者并允许提供者重新传递消息并给予一个机会到另一个消费者。