2017-09-22 100 views
0

我用用芹菜的烧瓶中的应用,这里是我的配置:芹菜多个队列无法正常工作。所有任务都发送到默认队列

app.config['CELERY_TASK_QUEUES'] = (
    Queue('fast', Exchange('fast'), routing_key='fast'), 
    Queue('default', Exchange('default'), routing_key='default'), 
    Queue('processing', Exchange('processing'), routing_key='processing'), 
) 

app.config['CELERY_TASK_ROUTES'] = { 
    'app.tasks.extract_text': {'queue': 'processing', 'routing_key': 'processing'}, 
    ... 

    'app.tasks.vt_notifications': {'queue': 'default', 'routing_key': 'default'}, 
    ... 

    'app.tasks.update_files_from_search': {'queue': 'fast', 'routing_key': 'fast'}, 
    ... 
} 


app.config['CELERY_DEFAULT_QUEUE'] = 'default' 
app.config['CELERY_DEFAULT_EXCHANGE'] = 'default' 
app.config['CELERY_DEFAULT_ROUTING_KEY'] = 'default' 

我结束了与运行芹菜情况是这样的:

celery -A app.tasks.celery worker -Q 'processing' --concurrency 1 -l debug -n processing 
celery -A app.tasks.celery worker -Q 'fast' --concurrency 1 -l debug -n fast 
celery -A app.tasks.celery worker -Q 'default' --concurrency 1 -l debug -n default 

所以,问题是所有的任务都被发送到'默认'队列。任何帮助,高度赞赏。谢谢!

+0

celery version? – ItayB

回答

3

如果使用芹菜> 4,我会推荐几件事情: 首先,尝试添加name到 你的任务(以确保您在CELERY_TASK_ROUTES使用正确的名称,例如:

@app.task(name='extract_text']) 
    def extract_text(..): 
     pass 

二,试图改变CELERY_TASK_ROUTES到:

CELERY_ROUTES = { 
    'extract_text': { 
     'exchange': 'processing', 
     'exchange_type': 'direct', 
     'routing_key': 'processing' 
    } 
} 

(而不是queue - 尝试添加exchangeexchange_type

最后一件事,你不必使用它,只是为了调试,就可以触发时明确地航线任务:

(extract_text.signature(args=(...), queue='processing')).delay() 

编辑:

你确定你正在使用的配置如所须?这里是一个例子:

celery_app = Celery() 
celeryconfig = {} 
celeryconfig['BROKER_URL'] = 'amqp://' 
celeryconfig['CELERY_RESULT_BACKEND'] = 'redis://localhost' 
celeryconfig['CELERY_QUEUES'] = (
    Queue('fast', Exchange('fast'), routing_key='fast'), 
    Queue('default', Exchange('default'), routing_key='default'), 
    Queue('processing', Exchange('processing'), routing_key='processing'), 
) 
celeryconfig['CELERY_ROUTES'] = { 
    'extract_text': { 
     'exchange': 'processing', 
     'exchange_type': 'direct', 
     'routing_key': 'processing' 
    } 
} 

celery_app.config_from_object(celeryconfig) 
+2

如果我在触发时明确地路由任务,它完美地工作。看起来像CELERY_TASK_ROUTES变量只是被忽略 –

+0

所以,你能接受答案吗? :-) – ItayB

+1

您是否尝试过'CELERY_ROUTES'?也许你没有根据需要使用配置,我会在第二秒更新我的答案 – ItayB