2016-03-01 104 views
0

我是Java新手,正在研究消耗多个(不同)主题并将其发送到另一个服务器的项目。我想知道处理多个主题的最佳方式是什么。JMS消耗多个主题

据我了解每个消费者被绑定到一个话题,所以,如果我不得不消耗多个主题,我需要一个消费者对每一个不同的主题。由于消费者进行阻塞调用,我需要调用每个消费者的线程来并行使用这些主题。

如果我想提高吞吐量进一步它是一个很好的做法,每个消费者(附加到主题)一个老板线程,并让每个老板线程设置辅助线程相应地提高性能?

请咨询,如果这是一个很好的做法,如果不是什么其他的替代选择?以及是否有任何众所周知的设计模式,为什么我选择了一个消费模式,而不是一个侦听器模型来处理这个问题

我还有一个约束是,消费者接收到一个消息后,它需要将消息发送到另一个接收服务器。如果接收服务器关闭(在新版本推送期间),那么我必须暂停使用消息,直到接收服务器启动。在这种情况下,有一个消息监听器不会有帮助,因为当接收服务器关闭时我无法暂停监听器。我说得对,还是有一种方法可以暂停侦听器,并在接收服务器启动之前停止使用消息?

回答

1

我会这样做的方式是使用侦听器功能。

你的对象实现了MessageListener接口,然后添加到您的消费者消息监听器。在这种情况下,客户端库将为您读取队列中的消息并将其发送给侦听器。

import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 

import org.apache.activemq.ActiveMQConnectionFactory; 

public class MyMessageConsumer implements MessageListener { 

    public static void main() { 
     try { 

      MyMessageConsumer myMessageConsumer = new MyMessageConsumer(); 

      // This example is using the ActiveMQ client library 
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("nio://localhost:61616"); 
      Connection connection = connectionFactory.createConnection(); 
      connection.start(); 

      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

      Destination destination1 = session.createQueue("MyTopic1"); 
      MessageConsumer consumer1 = session.createConsumer(destination1); 
      consumer1.setMessageListener(myMessageConsumer); 

      Destination destination2 = session.createQueue("MyTopic2"); 
      MessageConsumer consumer2 = session.createConsumer(destination2); 
      consumer2.setMessageListener(myMessageConsumer); 

     } catch (Exception e) { 
      System.out.println("Caught: " + e); 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void onMessage(Message message) { 
     // Handle my messages here 
    } 
} 

会话事务

在这个选项中,我们使用事务性的消息,如果session.rollback()被调用时,它将传递邮件。您确认()您的操作何时成功,否则确认(rollback())。

package io.bessel.test;

import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import org.apache.activemq.ActiveMQConnectionFactory; 

public class MyMessageConsumer implements MessageListener { 

    public static void main(String ... arguments) { 
     try { 
      // This example is using the ActiveMQ client library 
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("nio://localhost:61616"); 
      Connection connection = connectionFactory.createConnection(); 
      connection.start(); 

      Session session = connection.createSession(true, Session.SESSION_TRANSACTED); 

      MyMessageConsumer myMessageConsumer = new MyMessageConsumer(session); 

      Destination destination1 = session.createQueue("MyTopic1"); 
      MessageConsumer consumer1 = session.createConsumer(destination1); 
      consumer1.setMessageListener(myMessageConsumer); 

      Destination destination2 = session.createQueue("MyTopic2"); 
      MessageConsumer consumer2 = session.createConsumer(destination2); 
      consumer2.setMessageListener(myMessageConsumer); 

     } catch (Exception e) { 
      System.out.println("Caught: " + e); 
      e.printStackTrace(); 
     } 
    } 

    private final Session session; 

    public MyMessageConsumer(Session session) { 
     this.session = session; 
    } 

    public void onMessage(Message message) { 
     if (message instanceof TextMessage) { 
      try { 
       String text = ((TextMessage) message).getText(); 
       System.out.println(String.format("Received message: %s", text)); 
       this.session.rollback(); 

      } catch (JMSException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

} 
+0

我想过使用一个监听器,我没有提到这一点。我还有一个约束,即在消费者收到消息之后,它需要将消息发送到另一个接收服务器。如果接收服务器关闭(在新版本推送期间),那么我必须暂停使用消息,直到接收服务器启动。在这种情况下,有一个消息监听器不会有帮助,因为当接收服务器关闭时我无法暂停监听器。我说得对,还是有一种方法可以暂停侦听器,并在接收服务器启动之前停止使用消息?谢谢。 – mariner

+0

为事务处理消息添加了第二个代码示例。将一遍又一遍地重传信息直到它被确认。这会在某处引入开销(就像所有事情一样),只是不确定它将在应用程序中出现问题,但它是最直接的。 – mookins