2014-10-28 901 views
13

我有一个应用程序使用RabbitMQ作为消息队列来发送/接收两个组件之间的消息:发送者和接收者。发送者以非常快的方式发送消息。接收器接收到该消息,然后执行一些非常耗时的任务(主要是为非常大的数据大小编写数据库)。由于接收者需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送者将会继续快速填充队列。所以我的问题是:这会导致消息队列溢出吗?RabbitMQ:快速生产者和慢速消费者

的消息消费者看起来如下:

public void onMessage() throws IOException, InterruptedException { 
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue(); 
    channel.queueBind(queueName, EXCHANGE_NAME, ""); 

    QueueingConsumer consumer = new QueueingConsumer(channel); 
    channel.basicConsume(queueName, true, consumer); 

    while (true) { 
     QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
     String message = new String(delivery.getBody()); 
     System.out.println(" [x] Received '" + message + "'"); 

     JSONObject json = new JSONObject(message); 
     String caseID = json.getString("caseID"); 
     //following takes very long time    
     dao.saveToDB(caseID); 
    } 
} 

消费者收到的每条消息都含有caseID。对于每个caseID,它会将大量数据保存到数据库,这需要很长时间。目前只有一个消费者为RabbitMQ设置,因为生产者/消费者使用相同的队列来发布/订阅caseID。那么如何加快消费者吞吐量,让消费者赶上生产者并避免队列中的消息溢出?我应该在消费者部分使用多线程来加快消费速度吗?还是应该使用多个消费者同时使用收到的消息?或者是否有任何异步方式让消费者异步使用消息而不等待完成?欢迎任何建议。

回答

1

“那么,如何加快消费者吞吐量,让消费者赶上生产者并避免队列中的消息溢出?”这是“使用多个消费者同时消费传入消息”的答案,使用多线程并行运行这些消费者实现原则共享任何东西,http://www.eaipatterns.com/CompetingConsumers.html

+0

从[RabbitMQ的文档](http://www.rabbitmq.com/tutorials/tutorial-three-python.html),这里有两种方法:工作者队列,发布/订阅。我现在正在使用pub/sub模型。我应该使用工作者队列来代替多个消费者吗? – tonga 2014-10-28 20:30:46

+0

对于你需要的应该是工作队列。这是如何实现https://github.com/victorpictor/Hotel/blob/master/Infrastructure/MessageTransport/Receivers/Subscriber.cs#L29 – voutrin 2014-10-28 20:35:53

+0

但是,如果我想用几个队列用于不同的目的呢?现在,caseID消息只有一个队列。除caseID外,可能还有更多的数据。所以我可能需要使用发布/订阅模式来拥有多个队列。 – tonga 2014-10-28 20:42:29

0

作为答案,我建议:两者。

您可以利用多个接收器,以及设置每个接收器在单独的线程中执行任务,从而允许接收器接受队列中的下一条消息。

当然,这种方法假定每个操作的结果(如果我理解正确的话,在db上写入)不会以任何方式影响后续操作对其他消息的响应结果。

1

你有很多方法来提高你的表现。

  1. 您可以创建一个更多生产者的工作队列,这样您就可以创建一个简单的负载均衡系统。不要使用交换--->队列,但只能排队。阅读本帖RabbitMQ Non-Round Robin Dispatching

  2. 当你收到一条消息时,你可以创建一个poolthread用于在数据库中插入数据,但在这种情况下,你必须管理失败。

但我认为主要问题是数据库而不是RabbitMQ。通过良好的调优,多线程和工作队列,您可以拥有可扩展且快速的解决方案。

让我知道

14

“这会不会导致消息队列溢出?”

是的。随着队列长度的增加,RabbitMQ将进入“流量控制”状态以防止过多的内存消耗。它也会开始将消息保存到磁盘,而不是将它们保存在内存中。

“所以,我怎么能加快消费者的吞吐量,使消费者 可以与制片人赶上并避免 队列中的消息溢出”

你有2种选择:

  1. 添加更多消费者。请记住,如果您选择此选项,您的数据库现在将被多个并发进程操纵。确保数据库能够承受额外的压力。
  2. 增加消费渠道的价值QOS。这将从队列中提取更多消息并将其缓存在消费者上。这会增加总体处理时间;如果缓冲了5条消息,则第5条消息将完成消息1 ... 5的处理时间。

“我应该使用多线程在消费部分,以加快 消费率是多少?”

不除非你有一个设计良好的解决方案。向应用程序添加并行性会在消费者方面增加很多开销。您可能会耗尽ThreadPool或限制内存使用量。

在处理AMQP时,您确实需要考虑每个流程的业务需求,以设计最佳解决方案。您的传入消息的时间敏感程度如何?他们是否需要坚持到数据库尽快,或者这对您的用户是否重要,无论这些数据是否立即可用?

如果数据不需要立即保存,则可以修改应用程序,以便使用者只需从队列中移除消息并将其保存到Redis中的缓存集合中。引入第二个进程,然后依次读取和处理缓存的消息。这将确保您的队列长度不会增长到足以导致流量控制,同时防止数据库被写入请求轰炸,写入请求通常比读取请求更昂贵。您的消费者现在只需从队列中移除消息,稍后再由另一个进程处理。

+0

谢谢保罗。这是一个非常好的建议。我的数据不需要立即保存在数据库中。数据库持久部分需要很长时间,因为它涉及每种情况的数据解析,然后在一个数据库插入中保存大量数据(〜10000行)。因此,使用Redis是一个好主意,因为它是内存缓存。但最终我仍然需要将数据保存到数据库。那么在消费者接收消息并保存到Redis后,如何使用Redis完成数据库写入任务?如果数据库插入速度很慢,消费者是否会溢出Redis缓存大小限制? – tonga 2014-10-30 21:43:12

+0

我会消耗来自单个或多个进程的每条消息,一旦它被提交给数据库,就会清除来自Redis的消息。 Redis中没有缓存限制 - 您受主机上RAM数量的限制。 1,000,000个相对较小的密钥大约是200Mb。如果您担心内存不足,请查看:http://redis.io/topics/memory-optimization – 2014-11-01 15:04:00

+0

我添加了一篇文章,概述了扩展AMQP的方法以及相关的奖励和缺点:http://insidethecpu.com/2014/11/11/rabbitmq-qos-vs-competing-consumers/ – 2014-11-11 14:16:03

1

虽然确实如此,但增加更多消费者可能会加快速度,真正的问题将会保存到数据库中。

这里已经有很多关于添加消费者(线程和/或机器)和改变QoS的答案,所以我不打算重申这一点。相反,您应该认真考虑使用Aggregator模式将消息聚合为一组消息,然后一次性批量插入数据库。

您的每条消息的当前代码可能会打开一个连接,插入数据并关闭该连接(或返回到池)。更糟的是,它甚至可能使用交易。

通过使用聚合器模式,您基本上在刷新之前缓冲数据。

现在写一个好的聚合器是棘手的。您将需要决定如何缓冲(即每个工作人员都有自己的缓冲区或Redis等中央缓冲区)。我认为Spring集成有一个聚合器。