2010-01-25 85 views
16

我有一个Django项目,我正在尝试使用Celery提交后台处理任务(http://ask.github.com/celery/introduction.html)。 Celery与Django完美集成,我已经能够提交自定义任务并获得结果。在运行我的任务之前,如何设置Celery调用自定义初始化函数?

唯一的问题是我无法找到在守护进程中执行自定义初始化的理智方式。我需要调用一个昂贵的函数,在我开始处理任务之前加载大量内存,并且我无法每次调用该函数。

有没有人有过这个问题?任何想法如何解决它而无需修改芹菜源代码?

谢谢

+0

你需要运行什么样的自定义初始化? – diegueus9 2010-01-25 03:37:41

+0

我需要加载处理每个任务所需的〜10MB数据结构(所有任务的结构都是相同的)。 – xelk 2010-01-25 04:12:38

回答

15

您可以编写自定义加载程序或使用信号。

装载机具有on_task_init方法,在任务即将执行时调用, 和on_worker_init由celery + celerybeat主进程调用。

使用的信号可能是最简单的,可用的信号是:

0.8.4:

  • task_prerun(task_id, task, args, kwargs)

    当任务即将由工人来执行调度(或本地 如果使用apply /或者如果CELERY_ALWAYS_EAGER已设置)。

  • task_postrun(task_id, task, args, kwargs, retval) 在与上面相同的条件下执行任务后分派。

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    在0.9.x版本(在github当前主分支)当施加一个任务被调用(不适合长时间运行的操作)可用

附加信号:

  • worker_init()

    当芹菜已启动时调用(在任务初始化之前,因此如果在 系统支持fork,则任何内存更改将被复制到子工作进程 )。

  • worker_ready()

    时调用celeryd能够接收任务。

  • worker_shutdown()

    时celeryd正在关闭调用。

下面是一个例子预先计算的东西第一次任务的过程中运行:

from celery.task import Task 
from celery.registry import tasks 
from celery.signals import task_prerun 

_precalc_table = {} 

class PowersOfTwo(Task): 

    def run(self, x): 
     if x in _precalc_table: 
      return _precalc_table[x] 
     else: 
      return x ** 2 
tasks.register(PowersOfTwo) 


def _precalc_numbers(**kwargs): 
    if not _precalc_table: # it's empty, so haven't been generated yet 
     for i in range(1024): 
      _precalc_table[i] = i ** 2 


# need to use registered instance for sender argument. 
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name]) 

如果你想对所有任务运行功能,只跳过sender说法。