您应该在Channel.basicConsume和DefaultConsumer抽象类看一看:https://www.rabbitmq.com/api-guide.html#consuming
Java并发需要回调来处理每个消息线程,但你可以使用一个线程池来重用线程。
static final ExecutorService threadPool;
static {
threadPool = Executors.newCachedThreadPool();
}
现在,你需要创建一个消费者,将通过创建将被传递到线程池来执行一个Runnable实例句柄每次交货。
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final byte msgBody = body; // a 'final' copy of the body that you can pass to the runnable
final long msgTag = envelope.getDeliveryTag();
Runnable runnable = new Runnable() {
@Override
public void run() {
// handle the message here
doStuff(msgBody);
channel.basicAck(msgTag, false);
}
};
threadPool.submit(runnable);
}
});
这说明了如何在单个线程的单一连接和频道不while循环将被阻止在每个交货处理并发交付。为了您的理智,您可能希望将您的Runnable
实现分解到它自己的类中,该类可以接受channel
,msgBody
,msgTag
和任何其他数据作为调用run()
方法时可访问的参数。
非常感谢您的帮助。 – crawlero 2014-10-12 22:24:32