2013-02-27 59 views
8

使用Python RQ,我们试图动态管理工作进程。我们使用一个定制的工人脚本,该脚本(以简化形式)如下:如何正确关闭Python RQ工作进程动态?

from rq import Connection, Worker 

queues_to_listen_on = get_queues_to_listen_on() 

with Connection(connection = get_worker_connection()): 
    w = Worker(queues_to_listen_on) 
    w.work() 

我们是在工人的关闭特别感兴趣。我们主要关心的是如何正常关闭工作人员,以便在关闭之前完成当前工作。在适当的Worker对象上的request_stop(...)信号处理程序似乎正在做我们需要的东西,但似乎没有办法(至少据我所知)发射它,除非它是通过在终端中运行的工作进程上按CTRL+C

在我看来,有两个可能的解决方案(也可以肯定是更多) - 按优先顺序排列:

  1. 编程,使用rq库,将信号发送到request_stop,从而触发正常关闭。
  2. 以某种方式获取正确进程的PID(不确定主进程或工作进程侦听器进程)并使用其他方法将适当的信号发送到该进程。我们有一些方法可以完成,但它很可能需要更多的工作,并引入其他变量来解决我宁愿被忽略的问题(例如,使用Fabric来运行远程命令或沿着这些命令行的东西)。

如果有更好的方法来解决这个问题或者一个不同的替代方案来实现相同的目标,我将不胜感激您的建议。

+0

如果你需要PID,你实际上可以从w.pid中得到它 – Borys 2013-02-27 19:11:21

回答

4

选项1在设计方面绝对更好。

但是解决不必使用CTRL + C退出过程(我讨厌太)的您的特定问题,您可以使用下面的策略为你的员工:

# WORKER_NAME.py 
import os 

PID = os.getpid() 

@atexit.register 
def clean_shut(): 
    print "Clean shut performed" 

    try: 
     os.unlink("WORKER_NAME.%d" % PID) 
    except: 
     pass 

# Worker main 
def main(): 
    f = open("WORKER_NAME.%d" % PID, "w") 
    f.write("Delete this to end WORKER_NAME gracefully") 
    f.close() 

    while os.path.exists("WORKER_NAME.%d" % PID): 
     # Worker working 

而在你的主脚本,得到工作人员PID作为@Borys建议,发送热烈的停止请求,并os.unlink("path/to/WORKER_NAME.%d" % worker_PID)以确保正常关机:)

这只适用于运行无限循环的工人。如果工作进程调用阻塞连续的一次性工作的事情,则必须进一步追踪可能的阻塞例程以从那里解析,例如应用某种超时策略。