2016-11-07 28 views
3

TL; DRPython的芹菜增加并发对工人

是否有可能增加或在运行芹菜工人减少并发无重新启动呢?

我用芹菜4.0.0与RabbitMQ的作为经纪人在Ubuntu 14.10

我USECASE

我经常面临的一个大队列的任务,其中大部分主要执行的基于HTTP请求并做一些小处理。我让工作人员在相当强大的机器上运行,并希望最大限度地利用资源。这在大多数情况下都不是问题,除非处理大量的HTTP请求,这可能会超时或需要很长时间才能响应等。处理这些问题时,我想暂时增加--concurrency - 参数,而不必重新启动worker。

目前我正在运行芹菜--concurrency 150,但这只会让服务器瓶颈(CPU)的利用率降低约10%。我想一种解决方案是在那段时间内产生另一个150个并发工作者,并在稍后将其杀死,但这可能会增加复杂性。如果可能的话,我想坚持1个工人/机器。

+1

如何对自动缩放选项:http://docs.celeryproject.org/en/latest/userguide/workers.html#autoscaling –

+0

原则上我喜欢使用自动缩放的想法比我想出的更好。然而,这些文档并不清楚如何进行子类化和添加自己的逻辑。 请注意,当我试图超越169个进程时,芹菜在任何地方都会抛出错误,但我认为这个事实可能是GitHub的一个特例。 – MoorzTech

回答

2

这可能是可能的使用内置的autoscaling(感谢菲利普Tzou)的子类。不幸的是,自动缩放功能的文档记录很差。

但是,在做了一些更多的挖掘之后,我遇到了celery.app.control,这些(除其他外)允许通过RabbitMQ将消息发送给工作人员进行缩放。这里有一个如何去这一个小例子:

import os, time 
from celery import Celery 
from celery.app.control import Control 

app = Celery() 
controller = Control(app) 

while True: 
    n=5 # the numer of processes to add/remove 
    upper_load_threshold = 6 
    lower_load_threshold = 4 
    if os.getloadavg()[0] <= lower_load_threshold: # we're looking at the 5 min load avg here 
     controller.pool_grow(n) 
    elif os.getloadavg()[0] >= upper_load_threshold: 
     controller.pool_shrink(n) 
    time.sleep(10)