channel.basicQos(1);
while (true) {
GetResponse res = channel.basicGet(TEST_QUEUE, false);
if (res != null) {
deliveryTag = res.getEnvelope().getDeliveryTag();
}
// Handle all messages If the condition is true
if (condition) {
// nack all messages unhandled previously
channel.basicNack(deliveryTag - 1, true, true);
// ack current message only
channel.basicAck(deliveryTag, false);
}
else {
// Do not handle current message and continue to get next one
}
}
Q1。
我不确定如果我可以同时使用nack和ack。
我可以使用deliveryTag - 1来表示以前的所有消息吗?Nack当前消息和Ack当前消息(rabbitmq,java)之前的所有消息
总之,我想跳过所有不符合条件的消息。
如果当前消息符合条件,那么消除所有跳过的消息并确认当前消息。
通过这样做,我想延迟处理一些特定的消息。 Q2302。
恐怕如果我写作while(true)并且有多个工作人员在运行,那么channel.basicQos(1)将无法按预期工作。
我应该编写这样的代码来限制计数吗?或者我应该如何编写以确保所有其他工作人员均匀地收到邮件?
int prefetch = 1;
int count = 0;
while (count++ <= prefetch) {
}
Q3。
我注意到只要连接处于打开状态,工作程序就不会终止。
连接将会打开多久,我是否需要手动关闭它?
最后, 的RabbitMQ Java客户端API和AmqpTemplate VS RabbitTemplate哪一个更适合在该情况下(不使用了MessageListener(ChannelAwareMessageListener)模型)?