2011-11-28 49 views
8

我工作的是一个基于多租户云应用程序的Web应用程序(许多客户端,每个客户端都有自己独立的“环境”,但全部位于共享硬件集上)和我们正在引入用户批量处理以供日后处理的功能。批量工作的类型实际上并不重要,只有足够的数量才能在没有工作队列的情况下完成工作并不实际。我们选择RabbitMQ作为我们的底层队列框架。工作者池和多租户队列与RabbitMQ

因为我们是一个多租户应用程序,所以我们不一定希望客户端能够为另一个客户端导致冗长的队列处理时间,所以我们已经浮起来的一个想法是创建一个队列客户基础,并拥有一个共享的工作人员池指向我们所有的客户端队列。问题在于,为了我能想象的最好,工作人员直接与特定的队列绑定,而不是交换。在我们的理想世界中,我们的客户端队列仍然会被处理,而没有一个客户端阻塞另一个客户端,而是从共享的工作池中处理,我们可以通过启动更多工作人员或关闭闲置服务器来增加或减少。将工作人员与特定队列绑定在一起可以防止我们从实际意义上解决这个问题,因为我们经常有很多工作人员只是在没有活动的队列中闲置。

有没有比较直接的做法呢?我对RabbitMQ相当陌生,并没有真正能够完成我们所追求的。我们也不想写一个非常复杂的多线程的消费者应用程序,这是开发和测试时间的沉没,我们可能无法承受。我们的堆栈是基于Windows/.Net/C#的,如果这是germaine,但我不认为这应该对问题有重大影响。

回答

1

您可以让您的工作人员池都使用相同的唯一队列。工作将分布在他们之间,您可以扩大/缩小您的游泳池以增加/减少您的工作处理能力。

+1

我不是在问分配多个工作人员到同一个队列,我有点反问。我想要一个有限的工作池从一个大的队列中消耗(我们称之为500个队列)。 – bakasan

+1

我已经尝试过使用这种方法的第一手资料,它并不漂亮:很难找到合适的启发式来处理所有这些队列。你是否首先处理最完整的队列?还是那些旧信息?在这两种情况下,您都没有使用AMQP协议,必须开始处理Rabbit管理API。然后你认为:让我们拥有与工作人员相同数量的队列,并在500 Q和工作队列之间添加一致的哈希映射。然后你意识到,只需一个队列和n个工人就可以参与竞争。 –

+0

我有类似的要求,但我想确保来自特定客户的消息按顺序处理。联系人在创建之前不会被删除等。是否有一些RabbitMQ的配置或设置可以实现这一点,但共享工作人员之间的队列? (这是一个新的Q ...?) – Aaron

1

我不明白你为什么不使用RabbitMQ的虚拟主机,并让你的应用程序登录到RabbitMQ并在每个用户的单独连接上进行身份验证。

这并不意味着您不能让工作人员主管将工作人员分配给某个用户或另一个用户。但是这意味着每个用户的所有消息都由完全独立的交换和队列处理。

0

工人被分配0+队列,而不是交换。

将通过CELERYD_CONSUMER(默认为celery.worker.consumer.Consumer)指示的类中实现从每个工作人员的哪个队列中获取任务的逻辑。

您可以创建一个自定义消费者类ro来实现您喜欢的任何逻辑。困难的部分将决定你想要使用的“公平”算法的细节;但一旦你决定了,你可以实现它创建一个自定义的消费者类,并分配给适当的工人。

1

你可以看一下优先级队列实现(在这个问题最初是问这是未实现):https://www.rabbitmq.com/priority.html

如果不适合你,你可以尝试一些其他的黑客来实现你想要(它应该与老版本的RabbitMQ一起工作):

您可以将100个队列绑定到话题交换并将路由键设置为用户ID%100的散列,即每个任务将具有1之间的键100和同一用户的任务将具有相同的密钥。每个队列都以1到100之间的唯一模式进行绑定。现在,您有一组工人,它们以随机队列号开始,然后在每次作业后递增该队列号,再次在队列100后循环回队列1.

现在您的工作队可以处理多达100个并行的独特用户,或者如果没有其他工作要做,所有工作人员都可以关注单个用户。如果工作人员需要在每个作业之间遍历所有100个队列,在单个用户在单个队列中只有很多作业的情况下,每个作业之间自然会产生一些开销。少量的队列是解决这个问题的方法之一。您也可以让每个工作人员与每个队列保持连接,并使用每个队列最多一个未确认的消息。只要未确认的消息超时设置得足够高,工作人员就可以更快地循环访问内存中的待处理消息。

或者,您可以创建两个交换机,每个交换机都有一个绑定队列。所有的工作都进入第一次交换和排队,这是一群工人消耗的。如果一个工作单元花费太长时间,工作人员可以取消它并将其推送到第二个队列。当第一个队列中没有任何内容时,工作人员只处理第二个队列。您可能还需要一些具有相反队列优先级的工作人员,以确保在存在永不停止的短任务流时仍处理长时间运行的任务,以便最终始终处理用户批处理。这不会真正将工作人员分配到所有任务中,但它将阻止持有员工的长期运行任务,从而不会为同一用户执行短期运行任务。它还假定您可以取消一项工作,稍后重新运行而不会出现任何问题。这也意味着将会出现超时并需要重新运行为低优先级任务的资源浪费。除非您可以提前识别快速和慢速任务

如果对于单个用户有100个慢速任务,则另一个用户发布一批任务时,100个队列的第一个建议也可能有问题。直到其中一项缓慢的任务完成后,才会查看这些任务。如果事实证明这是一个合理的问题,你可以结合这两种解决方案。