python
  • rabbitmq
  • celery
  • 2012-08-09 81 views 8 likes 
    8

    我想调用一个任务,并为该任务创建一个队列,如果它不存在,那么立即插入到该队列的被调用任务。我有以下代码:芹菜动态队列创建和路由

    @task 
    def greet(name): 
        return "Hello %s!" % name 
    
    
    def run(): 
        result = greet.delay(args=['marc'], queue='greet.1', 
         routing_key='greet.1') 
        print result.ready() 
    

    然后我有一个自定义路由器:

    class MyRouter(object): 
    
        def route_for_task(self, task, args=None, kwargs=None): 
         if task == 'tasks.greet': 
          return {'queue': kwargs['queue'], 
            'exchange': 'greet', 
            'exchange_type': 'direct', 
            'routing_key': kwargs['routing_key']} 
         return None 
    

    这将创建一个名为交流和greet.1排队叫greet.1但队列为空。交换机应该叫做greet,该交换机知道如何将路由密钥(如greet.1)路由到名为greet.1的队列。

    任何想法?

    回答

    13

    当你做到以下几点:

    task.apply_async(queue='foo', routing_key='foobar') 
    

    然后芹菜会采取默认值从CELERY_QUEUES, “富”队列或如果不存在,那么使用(队列= FOO,Exchange会自动创建= FOO,routing_key = FOO)

    所以,如果 '富' 不存在CELERY_QUEUES你最终会用:

    queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo') 
    

    生产者将随后宣布,队列,但因为你重写routing_key, 实际发送使用routing_key = 'foobar'

    这似乎很奇怪的消息,但该行为是话题的交流, 在那里你发布到不同主题的真正有用的。

    虽然很难做到你想要的,但你可以自己创建队列 并声明它,但这不适用于自动发布消息的重试。 如果apply_async的队列参数可以支持 自定义kombu.Queue而不是将声明并用作目标,那将会更好。 也许你可以在http://github.com/celery/celery/issues

    +0

    我不再担心手动创建队列打开该问题,而不是只产生一个新的工作自动创建队列和交换这使得更多的意义我的问题。一如既往,感谢您的回复。 :) – Marconi 2012-08-18 06:38:17

    相关问题