2017-08-01 135 views
0

我有一堆长时间运行的进程,我想分成多个进程。这部分我可以没有问题。我碰到的问题有时这些进程进入挂起状态。为了解决这个问题,我希望能够为一个进程正在处理的每个任务设置一个时间阈值。当超过时间阈值时,我想重新启动或终止任务。Python多重处理 - 终止/重启工作进程

最初我的代码非常简单,使用一个进程池,但是对于池我无法弄清楚如何检索池中的进程,从来不知道如何重新启动/终止池中的进程。

我已转向使用队列和处理对象作为在该示例(https://pymotw.com/2/multiprocessing/communication.html#passing-messages-to-processes与一些变化中示出。

我尝试算出这个是在下面的代码,在它的当前状态的过程中没有实际上终止了,除此之外,我无法弄清楚在当前任务结束后如何让程序转到下一个任务上,任何建议/帮助表示赞赏,或许我会以错误的方式回答这个问题

谢谢

import multiprocess 
    import time 

    class Consumer(multiprocess.Process): 
     def __init__(self, task_queue, result_queue, startTimes, name=None): 
      multiprocess.Process.__init__(self) 
      if name: 
       self.name = name 
      print 'created process: {0}'.format(self.name) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.startTimes = startTimes 

     def stopProcess(self): 
      elapseTime = time.time() - self.startTimes[self.name] 
      print 'killing process {0} {1}'.format(self.name, elapseTime) 
      self.task_queue.cancel_join_thread() 
      self.terminate() 
      # now want to get the process to start procesing another job 

     def run(self): 
      ''' 
      The process subclass calls this on a separate process. 
      '''  
      proc_name = self.name 
      print proc_name 
      while True: 
       # pulling the next task off the queue and starting it 
       # on the current process. 
       task = self.task_queue.get() 
       self.task_queue.cancel_join_thread() 

       if task is None: 
        # Poison pill means shutdown 
        #print '%s: Exiting' % proc_name 
        self.task_queue.task_done() 
        break 
       self.startTimes[proc_name] = time.time() 
       answer = task() 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

    class Task(object): 
     def __init__(self, a, b, startTimes): 
      self.a = a 
      self.b = b 
      self.startTimes = startTimes 
      self.taskName = 'taskName_{0}_{1}'.format(self.a, self.b) 

     def __call__(self): 
      import time 
      import os 

      print 'new job in process pid:', os.getpid(), self.taskName 

      if self.a == 2: 
       time.sleep(20000) # simulate a hung process 
      else: 
       time.sleep(3) # pretend to take some time to do the work 
      return '%s * %s = %s' % (self.a, self.b, self.a * self.b) 

     def __str__(self): 
      return '%s * %s' % (self.a, self.b) 

    if __name__ == '__main__': 
     # Establish communication queues 
     # tasks = this is the work queue and results is for results or completed work 
     tasks = multiprocess.JoinableQueue() 
     results = multiprocess.Queue() 

     #parentPipe, childPipe = multiprocess.Pipe(duplex=True) 
     mgr = multiprocess.Manager() 
     startTimes = mgr.dict() 

     # Start consumers 
     numberOfProcesses = 4 
     processObjs = [] 
     for processNumber in range(numberOfProcesses): 
      processObj = Consumer(tasks, results, startTimes) 
      processObjs.append(processObj) 

     for process in processObjs: 
      process.start() 

     # Enqueue jobs 
     num_jobs = 30 
     for i in range(num_jobs): 
      tasks.put(Task(i, i + 1, startTimes)) 

     # Add a poison pill for each process object 
     for i in range(numberOfProcesses): 
      tasks.put(None) 

     # process monitor loop, 
     killProcesses = {} 
     executing = True 
     while executing: 
      allDead = True 
      for process in processObjs: 
       name = process.name 
       #status = consumer.status.getStatusString() 
       status = process.is_alive() 
       pid = process.ident 
       elapsedTime = 0 
       if name in startTimes: 
        elapsedTime = time.time() - startTimes[name] 
       if elapsedTime > 10: 
        process.stopProcess() 

       print "{0} - {1} - {2} - {3}".format(name, status, pid, elapsedTime) 
       if allDead and status: 
        allDead = False 
      if allDead: 
       executing = False 
      time.sleep(3) 

     # Wait for all of the tasks to finish 
     #tasks.join() 

     # Start printing results 
     while num_jobs: 
      result = results.get() 
      print 'Result:', result 
      num_jobs -= 1 
+0

你可以减少你的代码到[mcve]。目前它太复杂了。 –

+1

阅读关于[multiprocessing.html#synchronization-primitives](https://docs.python.org/3.6/library/multiprocessing.html#synchronization-primitives),Section _ ** multiprocessing.Event ** _。 – stovfl

回答

1

我通常建议不要继承multiprocessing.Process,因为它会导致代码难以阅读。

我宁愿将你的逻辑封装在一个函数中,并在一个单独的过程中运行它。这使代码更加清洁和直观。尽管如此,我建议您使用一些已经为您解决问题的库,例如Pebblebilliard,而不是重新发明轮子。

例如,Pebble库允许将超时设置为独立运行的进程或在Pool内运行的进程。

一个超时一个单独的进程中运行你的函数:

from pebble import concurrent 
from concurrent.futures import TimeoutError 

@concurrent.process(timeout=10) 
def function(foo, bar=0): 
    return foo + bar 

future = function(1, bar=2) 

try: 
    result = future.result() # blocks until results are ready 
except TimeoutError as error: 
    print("Function took longer than %d seconds" % error.args[1]) 

相同的例子,但有一个过程池。

with ProcessPool(max_workers=5, max_tasks=10) as pool: 
    future = pool.schedule(function, args=[1], timeout=10) 

    try: 
     result = future.result() # blocks until results are ready 
    except TimeoutError as error: 
     print("Function took longer than %d seconds" % error.args[1]) 

在这两种情况下,超时过程都会自动终止。

2

一种更简单的解决方案将是继续使用而不是重新实现Pool是设计一种机制,使您正在运行的功能超时。 例如:

from time import sleep 
import signal 

class TimeoutError(Exception): 
    pass  

def handler(signum, frame): 
    raise TimeoutError() 

def run_with_timeout(func, *args, timeout=10, **kwargs): 
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout) 
    try: 
     res = func(*args, **kwargs) 
    except TimeoutError as exc: 
     print("Timeout") 
     res = exc 
    finally: 
     signal.alarm(0) 
    return res 


def test(): 
    sleep(4) 
    print("ok") 

if __name__ == "__main__": 
    import multiprocessing as mp 

    p = mp.Pool() 
    print(p.apply_async(run_with_timeout, args=(test,), 
         kwds={"timeout":1}).get()) 

signal.alarm设置超时,当超时,它运行处理程序,它阻止你的函数的执行。

编辑:如果您使用的是Windows系统,它似乎有点复杂,因为signal不实施SIGALRM。另一个解决方案是使用C级python API。此代码已从此SO answer改编,适用于64位系统。我只在linux上测试过,但它应该在Windows上工作。

import threading 
import ctypes 
from time import sleep 


class TimeoutError(Exception): 
    pass 


def run_with_timeout(func, *args, timeout=10, **kwargs): 
    interupt_tid = int(threading.get_ident()) 

    def interupt_thread(): 
     # Call the low level C python api using ctypes. tid must be converted 
     # to c_long to be valid. 
     res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
      ctypes.c_long(interupt_tid), ctypes.py_object(TimeoutError)) 
     if res == 0: 
      print(threading.enumerate()) 
      print(interupt_tid) 
      raise ValueError("invalid thread id") 
     elif res != 1: 
      # "if it returns a number greater than one, you're in trouble, 
      # and you should call it again with exc=NULL to revert the effect" 
      ctypes.pythonapi.PyThreadState_SetAsyncExc(
       ctypes.c_long(interupt_tid), 0) 
      raise SystemError("PyThreadState_SetAsyncExc failed") 

    timer = threading.Timer(timeout, interupt_thread) 
    try: 
     timer.start() 
     res = func(*args, **kwargs) 
    except TimeoutError as exc: 
     print("Timeout") 
     res = exc 
    else: 
     timer.cancel() 
    return res 


def test(): 
    sleep(4) 
    print("ok") 


if __name__ == "__main__": 
    import multiprocessing as mp 

    p = mp.Pool() 
    print(p.apply_async(run_with_timeout, args=(test,), 
         kwds={"timeout": 1}).get()) 
    print(p.apply_async(run_with_timeout, args=(test,), 
         kwds={"timeout": 5}).get()) 
+0

我喜欢这个解决方案,但不幸的是它不适用于我,因为我必须能够在windoze上运行此代码!还有其他建议吗? – Lafleur

+0

请参阅我的编辑第二个解决方案,该解决方案应该可以在Windows上工作。让我知道这个是否奏效! –