2014-10-11 64 views
1

我想创建一个服务器来处理来自用户的套接字连接,并且在我的服务器中,我希望每个连接都有一个到rabbitmq的连接,但是在他们的网页提供的示例中我只看到“while”循环等待消息,在这种情况下,我将需要为每个连接创建一个线程,以便处理来自rabbitmq的消息。java如何处理rabbitmq消息的回调

有没有办法在java中使用spring或任何框架来做到这一点,我只是为rabbitmq而不是使用while循环创建回调?我正在使用node.js,这是非常简单的, ,我想知道一些关于java的建议。

回答

1

您应该在Channel.basicConsume和DefaultConsumer抽象类看一看:https://www.rabbitmq.com/api-guide.html#consuming

Java并发需要回调来处理每个消息线程,但你可以使用一个线程池来重用线程。

static final ExecutorService threadPool; 

static { 
    threadPool = Executors.newCachedThreadPool(); 
} 

现在,你需要创建一个消费者,将通过创建将被传递到线程池来执行一个Runnable实例句柄每次交货。

channel.basicConsume(queueName, false, new DefaultConsumer(channel) { 
    @Override 
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
     final byte msgBody = body; // a 'final' copy of the body that you can pass to the runnable 
     final long msgTag = envelope.getDeliveryTag(); 
     Runnable runnable = new Runnable() { 
      @Override 
      public void run() { 
       // handle the message here 
       doStuff(msgBody); 
       channel.basicAck(msgTag, false); 
      } 
     }; 
     threadPool.submit(runnable); 
    } 
}); 

这说明了如何在单个线程的单一连接和频道不while循环将被阻止在每个交货处理并发交付。为了您的理智,您可能希望将您的Runnable实现分解到它自己的类中,该类可以接受channel,msgBody,msgTag和任何其他数据作为调用run()方法时可访问的参数。

+0

非常感谢您的帮助。 – crawlero 2014-10-12 22:24:32