有趣的问题,我认为应该很容易解决使用广播命令。 如果新员工启动,它会请求所有其他员工将其撤销的 任务转储给新员工。添加了两个新的远程控制命令, 您可以轻松地通过使用@Panel.register
添加新的命令,
模块control.py:
from celery.worker import state
from celery.worker.control import Panel
@Panel.register
def bulk_revoke(panel, ids):
state.revoked.update(ids)
@Panel.register
def broadcast_revokes(panel, destination):
panel.app.control.broadcast("bulk_revoke", arguments={
"ids": list(state.revoked)},
destination=destination)
将它添加到CELERY_IMPORTS:
CELERY_IMPORTS = ("control",)
唯一缺少的问题现在是连接它,以便新员工在启动时触发broadcast_revokes
。我想你可以使用这个worker_ready
信号:
from celery import current_app as celery
from celery.signals import worker_ready
def request_revokes_at_startup(sender=None, **kwargs):
celery.control.broadcast("broadcast_revokes",
destination=sender.hostname)
这是不是仍然在理论上要求我保留在数据库中所有以前的任务的结果,因为任何修剪会导致在重新启动新的工作进程担保损失不要运行以前撤销的任务? – 2012-04-08 13:26:09
我假设您已经设置了某种数据库模型,您还使用它来存储任务ID,以便在必要时撤销该任务?如果是这样,你可以给这个模型添加一个'completed'标志。 – 2012-04-08 13:29:05
我只想出了一个选择:保留一个撤销任务ID的列表,并且每次celeryd进程启动或重新启动后,脚本都会遍历整个列表并重新发送撤销命令。这样我们只需保留自上次脚本运行以来已被撤销的任务ID。你能看到这个实现中的任何缺陷吗? – 2012-04-08 13:30:13