1
我试图创建一些芹菜任务作为类,但我有一些困难。这些类是:芹菜:自定义基类/子类基础的任务不显示在app.tasks
class BaseCeleryTask(app.Task):
def is_complete(self):
""" default method for checking if celery task has completed. """
# simply return result (since by default tasks return boolean indicating completion)
try:
return self.result
except AttributeError:
logger.error('Result not defined. Make sure task has run!')
return False
class MacroReportTask(BaseCeleryTask):
def run(self, params):
""" Override the default run method with signal factory run"""
# hold on to the factory
process = MacroCountryReport(params)
self.result = process.run()
return self.result
但是当我初始化程序,并检查app.tasks(或运行工),应用程序似乎并没有在它的注册表上述这些任务。其他基于功能的任务(using app.task() decorator
)似乎已被记录正常。
我运行上面的任务为:
process = SignalFactoryTask()
process.delay(params)
芹菜工人错误以下消息: Received unregistered task of type None
。
我认为我遇到的问题是:如何将自定义类添加到任务注册表中,就像我使用常规基于函数的任务一样?
感谢您的帮助!我必须测试,但会让你知道它是否有效! – kri