2015-07-21 93 views
7

,我发现了以下错误在我的芹菜工人的一个奇怪的错误:与Redis的芹菜

2015-07-21T15:02:04.010066+00:00 app[worker.1]: Traceback (most recent call last): 
2015-07-21T15:02:04.010069+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/app/trace.py", line 296, in trace_task 
2015-07-21T15:02:04.010070+00:00 app[worker.1]:  on_chord_part_return(task, state, R) 
2015-07-21T15:02:04.010073+00:00 app[worker.1]:  deps.delete() 
2015-07-21T15:02:04.010074+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/result.py", line 773, in delete 
2015-07-21T15:02:04.010071+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 587, in on_chord_part_return 
2015-07-21T15:02:04.010078+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 329, in delete_group 
2015-07-21T15:02:04.010076+00:00 app[worker.1]:  (backend or self.app.backend).delete_group(self.id) 
2015-07-21T15:02:04.010079+00:00 app[worker.1]:  return self._delete_group(group_id) 
2015-07-21T15:02:04.010081+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 499, in _delete_group 
2015-07-21T15:02:04.010082+00:00 app[worker.1]:  self.delete(self.get_key_for_group(group_id)) 
2015-07-21T15:02:04.010083+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/redis.py", line 172, in delete 
2015-07-21T15:02:04.010084+00:00 app[worker.1]:  self.client.delete(key) 
2015-07-21T15:02:04.010085+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 824, in delete 
2015-07-21T15:02:04.010087+00:00 app[worker.1]:  return self.execute_command('DEL', *names) 
2015-07-21T15:02:04.010088+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 565, in execute_command 
2015-07-21T15:02:04.010089+00:00 app[worker.1]:  return self.parse_response(connection, command_name, **options) 
2015-07-21T15:02:04.010090+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 579, in parse_response 
2015-07-21T15:02:04.010091+00:00 app[worker.1]:  return self.response_callbacks[command_name](response, **options) 
2015-07-21T15:02:04.010093+00:00 app[worker.1]: ValueError: invalid literal for int() with base 10: 'QUEUED' 

我感到奇怪的是,我没有看到调用int在堆栈跟踪的最后一行。 QUEUED可能是作为工人的身份出现的。我正在使用它作为这样的自定义工作状态:

@before_task_publish.connect 
def update_sent_state(sender=None, body=None, **kwargs): 
    # the task may not exist if sent using `send_task` which 
    # sends tasks by name, so fall back to the default result backend 
    # if that is the case. 
    task = current_app.tasks.get(sender) 
    backend = task.backend if task else current_app.backend 
    logging.debug("Setting status for %s" % body["id"]) 

    backend.store_result(body['id'], None, "QUEUED") 

这里有什么问题?


万一它是相关的,这里是我的任务的代码。我只打电话直接取fetch

@app.task 
def fetch(url_or_urls, subscribe=None): 
    """This fetches a (list of) podcast(s) and stores it in the db. It assumes that it only gets called 
    by Podcast.get_by_url, or some other method that knows whether a given podcast has 
    already been fetched. 

    If *subscribe* is given, it should be a User instance to be subscribed to the given podcasts.""" 
    if isinstance(url_or_urls, basestring): 
     url_or_urls = [url_or_urls] 
    body = _store_podcasts.s() 
    if subscribe: 
     body.link(_subscribe_user.s(user=subscribe)) 
    return chord([_fetch_podcast_data.s(url) for url in url_or_urls])(body) 

@app.task 
def _fetch_podcast_data(url): 
    return do_fetch(url) # This function returns a dict of podcast data. 

@app.task 
def _store_podcasts(podcasts_data): 
    """Given a list of dictionaries representing podcasts, store them all in the database.""" 
    podcasts = [Podcast(**pdata) for pdata in podcasts_data] 
    return Podcast.objects.insert(podcasts) 

@app.task 
def _subscribe_user(podcasts, user): 
    """Subscribe the given users to all the podcasts in the list.""" 
    return user.subscribe_multi(podcasts) 

还有什么可以在这里相关吗?如图pip freeze


库版本:

redis==2.10.3 
celery==3.1.18 
+0

你可以提供芹菜和redis-py版本吗?我有一些观点需要调查,但是你的错误跟踪行号与我的不一样。 – mrorno

+0

@mrorno“pip freeze”显示的版本:'redis == 2.10.3','celery == 3.1.18' – bigblind

回答

2

redis python包预期来自DEL动作的响应总是整数,我假设是删除的行的计数。

对int的调用发生在最后一行(return self.response_callbacks[command_name](response, **options)),其中self.response_callbacks['DEL']等于int

作为一种变通方法,你可以继承的redis.client.StrictRedis并设置DEL响应回调比int其他的东西,只要确保你熟悉的含义。

+0

为什么它没有得到int?就我而言,我不是在与Celery讨论Redis。 – bigblind

+0

如果您异步调用它,它将返回一个http://celery.readthedocs.org/en/latest/reference/celery.result.html。一旦任务完成,实际结果(可能是整数)将在该结果的结果属性中可访问。 – garnertb

+0

当然,并且该结果存储在redis中。看起来像Celery正在做一些清理工作,删除中间组结果,并且在那个文件中的某个地方,redis获取我的任务状态而不是删除计数。这是如何发生的?如果它是相关的,我将添加我打给该问题的任务的代码。 – bigblind

3

无需工作代码就很难调试这样的错误。这是我想它可能是。 让我们从这里开始:

http://celery.readthedocs.org/en/latest/_modules/celery/backends/base.html#BaseBackend.store_result

def store_result(self, task_id, result, status, 
       traceback=None, request=None, **kwargs): 
    """Update task state and result.""" 
    result = self.encode_result(result, status) 
    self._store_result(task_id, result, status, traceback, 
         request=request, **kwargs) 
    return result 

它调用ecnode_result。让我们看看

def encode_result(self, result, status): 
     if status in self.EXCEPTION_STATES and isinstance(result, Exception): 
      return self.prepare_exception(result) 
     else: 
      return self.prepare_value(result) 

它看起来像“状态”预计是从预定义的STATE常量的东西。

它的代码是在这里

http://celery.readthedocs.org/en/latest/_modules/celery/states.html#state

这里文档

http://celery.readthedocs.org/en/latest/reference/celery.states.html

这看起来并不像他们希望看到类似“QUEU ED“在那里。尝试其中一个预定义的。

+0

芹菜任务有一个'update_state()'方法,在那里你可以提供任何字符串,我期待成说。文档:http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states – bigblind

+0

嗯'task.update_state'似乎做我正在做同样的事情:'self.backend.store_result (task_id,meta,state)',其中meta默认为None。 – bigblind

+0

你能尝试预定状态的东西,所以我们可以保证的问题是,我们正在寻找。 – singer