2016-01-21 66 views
0

对于请求/响应,我需要一个临时队列来应答。我想创建一个队列并始终保持打开状态(而不是使用SessionCallback.doInJms()为每个请求创建一个新的队列)。如何在Spring中创建临时JMS队列?

我如何用Spring的JMS支持来做到这一点?

回答

1

我找不到办法做到这一点,所以我创建了一个解决方法。这个类将保持会话和连接打开,直到上下文被销毁。这样,你可以确定你会得到每一个答复。其他代码通常会发送消息,打开回复队列,然后有时会看不到回复,因为它是在发件人打开回复队列之前发送的。

用法:

@Bean 
public JmsTemplate replyJmsTemplate() { 
    JmsTemplate result = new JmsTemplate(jmsConnectionFactory()); 
    result.setDefaultDestination(replyQueueProvider().getQueue()); 
    result.setReceiveTimeout(10000); 
    return result; 
} 

@Bean 
public QueueProvider replyQueueProvider() { 
    QueueProvider result = new QueueProvider(jmsConnectionFactory()); 
    result.init(); // Must call manually; no @PostConstruct! 
    return result; 
} 

实现:

import java.util.concurrent.atomic.AtomicInteger; 

import javax.annotation.PreDestroy; 
import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Queue; 
import javax.jms.Session; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.jms.UncategorizedJmsException; 
import org.springframework.jms.support.JmsUtils; 

public class QueueProvider { 

    private static final Logger log = LoggerFactory.getLogger(QueueProvider.class); 

    private static final AtomicInteger COUNT = new AtomicInteger(); 

    private final ConnectionFactory connectionFactory; 
    private String queueName; 
    private boolean isTemporary; 
    private Connection connection; 
    private Session session; 
    private Queue queue; 
    private boolean transacted; 
    private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; 

    public QueueProvider(ConnectionFactory connectionFactory, String queueName) { 
     this.connectionFactory = connectionFactory; 
     this.queueName = queueName; 
    } 

    public QueueProvider(ConnectionFactory connectionFactory) { 
     this.connectionFactory = connectionFactory; 
     this.isTemporary = true; 
     this.queueName = "TemporaryQueue-" + COUNT.incrementAndGet(); 
    } 

    public void setTransacted(boolean transacted) { 
     this.transacted = transacted; 
    } 

    public boolean getTransacted() { 
     return transacted; 
    } 

    public void setAcknowledgeMode(int acknowledgeMode) { 
     this.acknowledgeMode = acknowledgeMode; 
    } 

    public int getAcknowledgeMode() { 
     return acknowledgeMode; 
    } 

    public void init() { 
     try { 
      connection = connectionFactory.createConnection(); 
      connection.start(); 

      session = connection.createSession(transacted, acknowledgeMode); 

      log.debug("Opening queue {}", queueName); 
      if (isTemporary) { 
       queue = session.createTemporaryQueue(); 
      } else { 
       queue = session.createQueue(queueName); 
      } 
     } catch(Exception e) { 
      throw new UncategorizedJmsException("Error creating queue " + queueName, e); 
     } 
    } 

    @PreDestroy 
    public void close() { 
     log.debug("Closing queue {}", queueName); 
     queue = null; 
     JmsUtils.closeSession(session); 
     JmsUtils.closeConnection(connection); 
    } 

    public Queue getQueue() { 
     if(null == queue) { 
      throw new IllegalStateException("Either init() wasn't called or close() was already called"); 
     } 
     return queue; 
    } 
}