2016-12-06 48 views
1

我试图创建一些芹菜任务作为类,但我有一些困难。这些类是:芹菜:自定义基类/子类基础的任务不显示在app.tasks

class BaseCeleryTask(app.Task): 

def is_complete(self): 
    """ default method for checking if celery task has completed. """ 
    # simply return result (since by default tasks return boolean indicating completion) 

    try: 
     return self.result 
    except AttributeError: 
     logger.error('Result not defined. Make sure task has run!') 
     return False 


class MacroReportTask(BaseCeleryTask): 

def run(self, params): 
    """ Override the default run method with signal factory run""" 
    # hold on to the factory 
    process = MacroCountryReport(params) 
    self.result = process.run() 
    return self.result 

但是当我初始化程序,并检查app.tasks(或运行工),应用程序似乎并没有在它的注册表上述这些任务。其他基于功能的任务(using app.task() decorator)似乎已被记录正常。

我运行上面的任务为:

process = SignalFactoryTask() 
process.delay(params) 

芹菜工人错误以下消息: Received unregistered task of type None

我认为我遇到的问题是:如何将自定义类添加到任务注册表中,就像我使用常规基于函数的任务一样?

回答

2

跑到完全相同的问题,花了几个小时找到解决方案,因为我90%肯定这是一个错误。在你的类的任务,请尝试以下

class BaseCeleryTask(app.Task): 

    def __init__(self): 
     self.name = "[modulename].BaseCeleryTask" 

class MacroReportTask(app.Task): 

    def __init__(self): 
     self.name = "[modulename].MacroReportTask" 

它似乎与应用程序注册它仍然具有这样的名称不会自动配置的错误。让我知道这是否有效。

+0

感谢您的帮助!我必须测试,但会让你知道它是否有效! – kri