您可以编写自定义加载程序或使用信号。
装载机具有on_task_init
方法,在任务即将执行时调用, 和on_worker_init
由celery + celerybeat主进程调用。
使用的信号可能是最简单的,可用的信号是:
0.8.4:
task_prerun(task_id, task, args, kwargs)
当任务即将由工人来执行调度(或本地 如果使用apply
/或者如果CELERY_ALWAYS_EAGER
已设置)。
task_postrun(task_id, task, args, kwargs, retval)
在与上面相同的条件下执行任务后分派。
task_sent(task_id, task, args, kwargs, eta, taskset)
在0.9.x版本(在github当前主分支)当施加一个任务被调用(不适合长时间运行的操作)可用
附加信号:
下面是一个例子预先计算的东西第一次任务的过程中运行:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it's empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
如果你想对所有任务运行功能,只跳过sender
说法。
你需要运行什么样的自定义初始化? – diegueus9 2010-01-25 03:37:41
我需要加载处理每个任务所需的〜10MB数据结构(所有任务的结构都是相同的)。 – xelk 2010-01-25 04:12:38