2016-07-28 73 views
0

我使用Python 2.7(叹气),芹菜== 3.1.19,librabbitmq == 1.6.1,rabbitmq-server-3.5.6-1.noarch和redis 2.8.24(来自redis-cli info) 。Celery,RabbitMQ,Redis:芹菜消息进入交换,但不排队?

我试图从芹菜生产者发送消息给芹菜消费者,并在生产者中获得结果。有1个生产者和1个消费者,但是2个rabbitmq(作为经纪人)和1个redis(用于结果)之间。

我现在面临的问题是:

  1. 在消费,我回去得通过async_result = ZipUp.delay(unique_directory)的AsyncResult,但async_result.ready()从未 真(至少在9秒钟内它不会) - 即使是一个 消费者任务,它基本上什么也不做,只是返回一个字符串。
  2. 我可以看到,在rabbitmq管理Web界面中,我的消息 正在被rabbitmq交换机接收,但它并没有显示在 对应的rabbitmq队列中。另外,ZipUp任务开始时 发送的日志消息似乎没有被登录到 。

如果我不试图从AsyncResult获得结果,那么事情就会奏效!但我有点希望能够得到通话的结果 - 这很有用:)。

以下是配置细节。

我们正在建立芹菜回报如下:

CELERY_RESULT_BACKEND = 'redis://%s' % _SHARED_WRITE_CACHE_HOST_INTERNAL 
CELERY_RESULT = Celery('TEST', broker=CELERY_BROKER) 
CELERY_RESULT.conf.update(
    BROKER_HEARTBEAT=60, 
    CELERY_RESULT_BACKEND=CELERY_RESULT_BACKEND, 
    CELERY_TASK_RESULT_EXPIRES=100, 
    CELERY_IGNORE_RESULT=False, 
    CELERY_RESULT_PERSISTENT=False, 
    CELERY_ACCEPT_CONTENT=['json'], 
    CELERY_TASK_SERIALIZER='json', 
    CELERY_RESULT_SERIALIZER='json', 
    ) 

我们还有芹菜的配置,并不期望回报值,并且该工程 - 在相同的程序。它看起来像:

CELERY = Celery('TEST', broker=CELERY_BROKER) 
CELERY.conf.update(
    BROKER_HEARTBEAT=60, 
    CELERY_RESULT_BACKEND=CELERY_BROKER, 
    CELERY_TASK_RESULT_EXPIRES=100, 
    CELERY_STORE_ERRORS_EVEN_IF_IGNORED=False, 
    CELERY_IGNORE_RESULT=True, 
    CELERY_ACCEPT_CONTENT=['json'], 
    CELERY_TASK_SERIALIZER='json', 
    CELERY_RESULT_SERIALIZER='json', 
    ) 

的芹菜生产的存根样子:

@CELERY_RESULT.task(name='ZipUp', exchange='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION) 
def ZipUp(directory): # pylint: disable=invalid-name 
    """ Task stub """ 
    _unused_directory = directory 
    raise NotImplementedError 

它已经提到,使用队列=代替交换=此存根会更简单。任何人都可以证实(我使用Google搜索,但在主题上完全没有发现)?显然你可以使用队列=除非你想使用扇出或类似的东西,因为不是所有的芹菜后端都有交换的概念。

无论如何,芹菜消费者开始时用:

@task(queue='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION, name='ZipUp') 
@StatsInstrument('workflow.ZipUp') 
def ZipUp(directory): # pylint: disable=invalid-name 
    ''' 
    Zip all files in directory, password protected, and return the pathname of the new zip archive. 
    :param directory Directory to zip 
    ''' 
    try: 
     LOGGER.info('zipping up {}'.format(directory)) 

但“拉上了”没有取得任何记录。我搜索了芹菜服务器上的每个(磁盘备份)文件,并获得了两个命中:/ usr/bin/zip和我的芹菜任务的代码 - 并且没有日志消息。

有什么建议吗?

感谢您的阅读!

回答

0

看来,用制片以下任务存根解决了这个问题:

@CELERY_RESULT.task(name='ZipUp', queue='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION) 
def ZipUp(directory): # pylint: disable=invalid-name 
    """ Task stub """ 
    _unused_directory = directory 
    raise NotImplementedError 

总之,它使用=代替交换队列=。