2011-11-22 764 views
7

通过继承PeriodicTask,我设法在django-celery中获得定期任务。我试图创建一个测试任务,并设置它运行一些无用的东西。有用。停止/清除Django-Celery中的定期任务

现在我无法阻止它。我读过文档,我找不到如何从执行队列中删除任务。我尝试过使用celeryctl并使用shell,但registry.tasks()是空的,所以我看不到如何删除它。

我已经看到了我应该“撤销”它的建议,但是为此我需要一个任务ID,而且我看不到如何找到任务ID。

谢谢。

回答

21

任务是一条消息,“周期性任务”定期发送任务消息。每个发送的任务都会分配一个唯一的ID。

revoke将只取消单个任务消息。要获取任务的ID,您必须保留发送ID的 跟踪,但您也可以在发送任务时指定自定义ID。

我不确定是否要取消单个任务消息,或者如果您想停止发送更多消息的定期任务,那么我会列出两个答案。

没有内置的方式来保持与周期性任务, 但你可以设置ID为每个任务周期性任务的名义发送任务的ID,这样 的ID将指任何与周期性任务一起发送的任务(通常是最后一个任务)。 您可以指定一个自定义的ID这样,

或者与@periodic_task装饰:

@periodic_task(options={"task_id": "my_periodic_task"}) 
def my_periodic_task(): 
    pass 

或与CELERYBEAT_SCHEDULE设置:

CELERYBEAT_SCHEDULE = {name: {"task": task_name, 
           "options": {"task_id": name}}} 

如果你想简单地删除您周期性任务从代码库中删除@periodic_task,或从CELERYBEAT_SCHEDULE删除条目。 如果您使用的是Django数据库调度程序,则必须从Django Admin界面中删除定期任务 。

PS1:revoke不会停止已经启动的任务。它只取消尚未开始的 任务。您可以使用 revoke(task_id, terminate=True)来终止正在运行的任务。默认情况下,如果您想发送另一个信号(例如KILL),使用 revoke(task_id, terminate=True, signal="KILL"),则会将TERM信号发送到 的过程。

PS2:撤销是一个远程控制命令,所以它只支持RabbitMQ 和Redis broker传输。 如果你希望你的任务通过存储在数据库中的cancelled 标志支持取消,你应该这样做,有任务检查标志启动时:

from celery.task import Task 

class RevokeableTask(Task): 
    """Task that can be revoked. 

    Example usage: 

     @task(base=RevokeableTask) 
     def mytask(): 
      pass 
    """ 

    def __call__(self, *args, **kwargs): 
     if revoke_flag_set_in_db_for(self.request.id): 
      return 
     super(RevokeableTask, self).__call__(*args, **kwargs) 
+0

谢谢你这个非常透彻的答案。 –

3

以防万一这可能帮助别人...我们在工作中遇到了同样的问题,并且竭力寻找某种管理命令来删除周期性任务,但我们做不到。所以这里有一些指针。

您应该首先仔细检查您正在使用的scheduler class

默认调度程序是celery.beat.PersistentScheduler,它只是跟踪本地数据库文件(搁置)中的上次运行时间。

在我们的案例中,我们使用的是djcelery.schedulers.DatabaseScheduler类。

Django的芹菜还附带在Django的数据库

存储日程虽然文件没有提到一个方法,以消除周期性任务调度程序:

使用django-celery的调度程序,您可以从Django Admin添加,修改和删除定期任务。

我们希望以编程方式或通过shell中的(celery/management)命令执行清除。

由于我们无法找到一个命令行中,我们使用Django的/ Python的外壳:

$ python manage.py shell 
>>> from djcelery.models import PeriodicTask 
>>> pt = PeriodicTask.objects.get(name='the_task_name') 
>>> pt.delete() 

我希望这有助于!