2013-11-14 433 views
5

我正在尝试创建一个程序使用多个进程,我想干净地终止所有派生进程,如果发生错误。下面我写了一些伪类型代码,我认为我需要做的事情,但我不知道什么是最好的方法是与所有进程通信,发生错误,并且应该终止。干净的Python多进程终止依赖于退出标志

我认为我应该为这类事情使用类,但是我对Python很陌生,所以我只是试图让我的头在基础知识第一。

#imports 

exitFlag = True 

# Function for threads to process 
def url_thread_worker(): 
# while exitFlag: 
    try: 
     # do something 
    except: 
     # we've ran into a problem, we need to kill all the spawned processes and cleanly exit the program 
     exitFlag = False 

def processStarter(): 

    process_1 = multiprocessing.Process(name="Process-1", target=url_thread_worker, args=()) 
    process_2 = multiprocessing.Process(name="Process-2", target=url_thread_worker, args=()) 

    process_1.start() 
    process_2.start() 


if __name__ == '__main__': 
    processStarter() 

在此先感谢

回答

4

这里是我的建议:

import multiprocessing 
import threading 
import time 

def good_worker(): 
    print "[GoodWorker] Starting" 
    time.sleep(4) 
    print "[GoodWorker] all good" 

def bad_worker(): 
    print "[BadWorker] Starting" 
    time.sleep(2) 
    raise Exception("ups!") 

class MyProcManager(object): 
    def __init__(self): 
     self.procs = [] 
     self.errors_flag = False 
     self._threads = [] 
     self._lock = threading.Lock() 

    def terminate_all(self): 
     with self._lock: 
      for p in self.procs: 
       if p.is_alive(): 
        print "Terminating %s" % p 
        p.terminate() 

    def launch_proc(self, func, args=(), kwargs= {}): 
     t = threading.Thread(target=self._proc_thread_runner, 
          args=(func, args, kwargs)) 
     self._threads.append(t) 
     t.start() 

    def _proc_thread_runner(self, func, args, kwargs): 
     p = multiprocessing.Process(target=func, args=args, kwargs=kwargs) 
     self.procs.append(p) 
     p.start() 
     while p.exitcode is None: 
      p.join() 
     if p.exitcode > 0: 
      self.errors_flag = True 
      self.terminate_all() 

    def wait(self): 
     for t in self._threads: 
      t.join() 

if __name__ == '__main__': 
    proc_manager = MyProcManager() 
    proc_manager.launch_proc(good_worker) 
    proc_manager.launch_proc(good_worker) 
    proc_manager.launch_proc(bad_worker) 
    proc_manager.wait() 
    if proc_manager.errors_flag: 
     print "Errors flag is set: some process crashed" 
    else: 
     print "Everything closed cleanly" 

你需要为每个进程运行的包装线,即等待其结束。 当一个进程结束时,检查exitcode:if> 0,意味着它引发了一些未处理的异常。现在调用terminate_all()来关闭所有剩余的活动进程。 包装线程也将完成,因为它们依赖于进程运行。

此外,在您的代码中,您可以随时根据需要调用proc_manager.terminate_all()。你可以检查一些不同的线程或类似的标志..

希望它对你的情况很好。在你的原始代码中,你做了一些像全局的exit_flag:在多处理中你永远不会有一个“全局的”exit_flag,因为它不是全局的,因为你正在使用分离的内存空间分离进程。这只适用于可以共享状态的线程环境。如果你在多处理中需要它,那么你必须在进程之间有明确的通信(Pipe and Queue accomplish that)或者类似shared memory objects

+0

这是一个很好的例子,帮助了很多。非常感谢你。 –

+1

你选择了处理“容易的部分” - 当孩子死亡。但是,如果父母死了呢?比你会留下孤立的进程(工人)。至少对我而言,这是一个大问题。 –

+0

这是一个非常不同的情况下解决,而不是真正常见的,恕我直言。您应该对系统进行编码以确保父代对错误具有适应性,并且至少可以在出现问题时终止子进程。 – abranches

1

如果你希望子进程在父进程退出时自动终止;你可以让它们为守护神(在.start()之前设置.daemon=True),即,如果父母检测到错误;它可能会放弃 - 孩子们会得到照顾。

如果你想要孩子自己清理;你可以use multiprocessing.Event() as a global flag

import multiprocessing 

def event_func(event): 
    print '\t%r is waiting' % multiprocessing.current_process() 
    event.wait() 
    print '\t%r has woken up' % multiprocessing.current_process() 

if __name__ == '__main__': 
    event = multiprocessing.Event() 

    processes = [multiprocessing.Process(target=event_func, args=(event,)) 
       for i in range(5)] 

    for p in processes: 
     p.start() 

    print 'main is sleeping' 
    time.sleep(2) 

    print 'main is setting event' 
    event.set() 

    for p in processes: 
     p.join()