2017-07-28 291 views
0

我正在创建一个自定义Celery任务类,以覆盖在任务达到最大重试次数(on_failure)时发生的情况。如果任务失败,我需要更新用户模型的状态。将参数传递给Celery任务的on_failure方法

下面是我的自定义任务类:

class ReadyTask(Task): 

    def run(self, user): 
     try: 
      user.get_results() 
     except Exception as exc: 
      raise self.retry(exc=exc, max_retries=3) 

    def on_failure(self, exc, task_id, *args, **kwargs): 
     user.status = Status.READY 
     user.save() 

如何传递用户对象的on_failure()方法来更新其状态?

回答

0

我相信你可以检查你的argskwargs作为用户的id,如果你把它作为参数发送给你的任务。如果你在kwargs中制作它,那么你就不需要进行arg位置检查。那么只需从id中获取用户并进行更改?

因此,不要把它变成了run功能,而是作为一个参数/关键字参数是您呼叫的任务功能,通过function.apply(kwargs),或function.apply_async(kwargs=kwargs)function.delay(kwargs)

所以:

user_id = kwargs.get('user_id')

# then resolve to user object, then update object

0

您也可以将对象绑定到你的自定义任务类。通过使用bind=True

class ReadyTask(Task): 

    def run(self, user): 

     self.user_object = user 

     try: 
      self.user_object.get_results() 
     except Exception as exc: 
      raise self.retry(exc=exc, max_retries=3) 

    def on_failure(self, exc, task_id, *args, **kwargs): 
     self.user_object.status = Status.READY 
     self.user_object.save() 

@app.task(base=ReadyTask, bind=True) 
def do_stuff(self, *args, **kwargs): 
    self.user_object.do_stuff()