2012-07-21 99 views
1

我想了解RabbitMQ服务器与发件人和接收器程序。现在,当发送者发送单个消息并且接收者将收到相同的消息时,整个设置工作良好。发送者和接收器对在RabbitMQ

但是,当我发送两条消息(通过运行发送器两次)并运行接收器程序两次,我只得到第一条消息。

发件人

ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 


     channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
     String message = "He12!"; 
     channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 
     System.out.println("Sent "+message); 
     channel.close(); 
     connection.close(); 

接收机

ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 

    channel.queueDeclare(QUEUE_NAME, true, false, false, null); 

    QueueingConsumer consumer = new QueueingConsumer(channel); 
    channel.basicConsume(QUEUE_NAME, true, consumer); 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    /*channel.basicCancel(consumer.getConsumerTag()); */ 

    String message; 
    if (delivery != null) { 
     message = new String(delivery.getBody()); 
     System.out.println("Reciever .."+message); 

    } 

    channel.close(); 
    connection.close(); 

回答

0

你有没有尝试过没有

channel.basicCancel(consumer.getConsumerTag()); 

线?

此外,它会更好做这样的事情:

while(true){ 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    String message; 
    if (delivery != null) { 
    message = new String(delivery.getBody()); 
    System.out.println("Reciever .."+message); 
    } 
} 

,让你只有一个消费者的消费在循环中的所有消息。这只是一个例子,我只是有一种干净的方式来打破循环,而不是必须ctrl-c。

+0

感谢您的回复..我曾尝试没有basicCancel(编辑相同的代码),但问题仍然相同。我认为更重要的是没有一个while循环,这样就可以让Web服务一次发送一个消息。每个请求跨越一位消费者是不是一个好主意? – 2012-07-22 13:51:05

+0

通过以调试模式执行接收器并在RabitMq服务器中运行几个命令,我观察到在执行basicConsume函数调用后不久,队列大小变为零(命令rabbitmqctl list_queues) – 2012-07-22 16:55:28

+0

@praveena_kd在执行使用者之前,队列大小是多少? – robthewolf 2012-07-22 18:20:51

1

当我用no_Ack = false和一个对BasicAck的调用修改了Receiver时,我解决了这个问题。感谢@robthewolf和@Card(通过twitter)提供帮助。

PFB现在修改过的接收器。

 ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 

    channel.queueDeclare(QUEUE_NAME, true, false, false, null); 

    QueueingConsumer consumer = new QueueingConsumer(channel); 
    channel.basicConsume(QUEUE_NAME, false, consumer); 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(10); 
    /*channel.basicCancel(consumer.getConsumerTag()); */ 

    String message = null; 

    if (delivery != null) { 
     message = new String(delivery.getBody()); 
     System.out.println("Reciever .."+message); 
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
    } 

    channel.close(); 
    connection.close(); 
    return message;