2012-04-08 35 views
1

我有一个提醒类型的应用程序,它使用“eta”参数在芹菜中安排任务。如果提醒对象中的参数改变(例如提醒时间),则我撤销先前发送的任务并排队新的任务。如何在多个芹菜进程中跟踪撤销的任务

我想知道在芹菜重启过程中是否有任何好方法跟踪撤销的任务。我希望能够迅速地向上/向下缩放celeryd进程,并且看起来在revoke命令发送后启动的任何celeryd进程仍将执行该任务。

这样做的一种方法是保留已撤销任务ID的列表,但此方法将导致列表随意增大。修剪此列表需要保证该任务不再处于RabbitMQ队列中,这似乎不可能。

我也试过对每个芹菜工人使用共享--statedb文件,但似乎只是更新了terminateb上的工作,因此不适合我想要完成的语句。

在此先感谢!

回答

0

我不得不在我的项目中做类似的事情,并使用celerycamdjango-admin-monitor。监视器会对任务进行快照并定期将其保存到数据库中。并且有一个很好的用户界面来浏览和检查所有任务的状态。你甚至可以使用它even if your project is not Django based

0

我前段时间实现了类似于此的解决方案,而我提出的解决方案与您的解决方案非常相似。

我解决这个问题的方法是让作业在作业运行时(通过传递主键,如文档建议)从数据库中获取Task对象。就你而言,在发送提醒之前,工作人员应该执行检查以确保任务已准备好运行。如果没有,它应该简单地返回而不做任何工作(假设ETA已经改变,另一名工人将接受新工作)。

+0

这是不是仍然在理论上要求我保留在数据库中所有以前的任务的结果,因为任何修剪会导致在重新启动新的工作进程担保损失不要运行以前撤销的任务? – 2012-04-08 13:26:09

+0

我假设您已经设置了某种数据库模型,您还使用它来存储任务ID,以便在必要时撤销该任务?如果是这样,你可以给这个模型添加一个'completed'标志。 – 2012-04-08 13:29:05

+0

我只想出了一个选择:保留一个撤销任务ID的列表,并且每次celeryd进程启动或重新启动后,脚本都会遍历整个列表并重新发送撤销命令。这样我们只需保留自上次脚本运行以来已被撤销的任务ID。你能看到这个实现中的任何缺陷吗? – 2012-04-08 13:30:13

1

有趣的问题,我认为应该很容易解决使用广播命令。 如果新员工启动,它会请求所有其他员工将其撤销的 任务转储给新员工。添加了两个新的远程控制命令, 您可以轻松地通过使用@Panel.register添加新的命令,

模块control.py:

from celery.worker import state 
from celery.worker.control import Panel 

@Panel.register 
def bulk_revoke(panel, ids): 
    state.revoked.update(ids) 

@Panel.register 
def broadcast_revokes(panel, destination): 
    panel.app.control.broadcast("bulk_revoke", arguments={ 
     "ids": list(state.revoked)}, 
     destination=destination) 

将它添加到CELERY_IMPORTS:

CELERY_IMPORTS = ("control",) 

唯一缺少的问题现在是连接它,以便新员工在启动时触发broadcast_revokes。我想你可以使用这个worker_ready 信号:

from celery import current_app as celery 
from celery.signals import worker_ready 

def request_revokes_at_startup(sender=None, **kwargs): 
    celery.control.broadcast("broadcast_revokes", 
          destination=sender.hostname)