2013-05-14 67 views
5

异步子我有产卵与超时异步子流程在Python 3与超时

我想达到什么问题:我想产卵多个进程异步无需等待结果,但我想也可以放心,每个产生的进程将在给定的超时内结束。

我在这里发现了类似的问题:Using module 'subprocess' with timeoutAsynchronous background processes in Python?但他们没有解决我的问题。

我的代码看起来像这样。我有一个命令类的建议在Using module 'subprocess' with timeout

class Command(object): 
    def __init__(self, cmd): 
    self.cmd = cmd 
    self.process = None 

    def run(self, timeout): 
    def target(): 
     print('Thread started') 
     args = shlex.split(self.cmd) 
     self.process = subprocess.Popen(args, shell=True) 
     self.process.communicate() 
     print('Thread finished') 

    thread = threading.Thread(target=target) 
    thread.start() 

    thread.join(timeout) 
    if thread.is_alive(): 
     print('Terminating process') 
     self.process.terminate() 
     thread.join() 

,然后当我想产卵子流程:当我运行此输出似乎等待每个命令产卵和结束

for system in systems: 
    for service in to_spawn_system_info: 
    command_str = "cd {0} && python proc_ip.py {1} {2} 0 2>>{3}".format(home_dir, 
     service, system, service_log_dir) 
    command = Command(command_str) 
    command.run(timeout=60) 

。我得到

Thread started 
Thread finished 
Thread started 
Thread finished 
Thread started 
Thread finished 
Thread started 
Thread finished 

所以我的问题是我做错了什么?现在我开始怀疑是否有可能产生一个进程并通过超时限制它的执行。

为什么我需要这个? spawner脚本将在cron中运行。它将每隔10分钟执行一次,它必须产生大约20个子进程。我想保证每个子进程都会在脚本从cron重新运行之前结束。

回答

3

如前所述,对process.communicate()的调用使您的代码等待子进程的完成。但是,如果你只是删除了communications()调用,那么线程在产生进程后会立即退出,导致你的线程。join()调用过早退出,你会过早地关闭子进程。要做到你想要什么,而不轮询或忙等待,您可以设置超时后,计时器,将终止该进程(和亚军线程)如果进程尚未完成:

class Command(object): 
    def __init__(self, cmd): 
    self.cmd = cmd 
    self.process = None 

    def run(self, timeout): 
    def target(): 
     print('Thread started') 
     # May want/need to skip the shlex.split() when using shell=True 
     # See Popen() constructor docs on 'shell' argument for more detail. 
     args = shlex.split(self.cmd) 
     self.process = subprocess.Popen(args, shell=True) 
     self.timer.start() 
     self.process.wait() 
     self.timer.cancel() 

    def timer_callback(): 
     print('Terminating process (timed out)') 
     self.process.terminate() 

    thread = threading.Thread(target=target) 
    self.timer = threading.Timer(timeout, timer_callback) 
    thread.start() 
+0

当我尝试这个解决方案时,它不会在超时后终止我的线程。我将超时设置为1秒,并在目标函数中添加time.sleep(1)。没有线程被终止。 – sebast26 2013-05-14 13:20:11

+0

Hrmm。当target()退出时,线程应该终止。请记住,如上所述,如果过程正常退出而不超时,则不会得到打印输出。我会仔细观察一下,我可能忽略了一些东西。 – mshildt 2013-05-14 13:29:15

+0

因此,如果线程在子进程完成之前终止,子进程将终止?这与unutbu说的相反。他说,“那么,即使你的线程完成后,你产生的每个子进程都将继续存在”。我还有一个印象,就是子进程会继续下去。 – b10hazard 2013-05-14 13:36:29

0
from threading import * 
from time import time 
import shlex 
import subprocess 
from random import randint 
class Worker(Thread): 
    def __init__(self, param, cmd, timeout=10): 
     self.cmd = cmd 
     self.timeout = timeout 

     Thread.__init__(self) 
     self.name = param 
    def run(self): 
     startup = time() 
     print(self.name + ' is starting') 

     args = shlex.split(self.cmd) 
     #Shell should be false when given a list (True for strings) 
     process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) 

     while time()-startup <= self.timeout: 
      if process.poll() != None: 
       break 

     process.stdout.close() 
     process.stdin.close() 
     process.stderr.close() 

     print(self.name + ' is dead') 

for i in range(0, 100): 
    x = Worker('Name-'+str(i), 'ping -n ' + str(randint(0,5)) + ' www.google.se') 
    x.start() 

while len(enumerate()) > 1: 
    pass # Wait for the threads to die 

这可以简化您的工作方法吗? 特别是考虑到你不需要等待一个结果,这只会启动一个类对象进入外层空间,执行超时工作。

还要注意:

  • 不关闭标准输出,标准输入和标准错误会导致“对于许多文件句柄开放”几乎所有的系统
  • 作为另一个答案中指出,.communicate()等待处理退出(使用.poll()代替)
1

使用相互独立开始和结束的线程。如果您知道所有要提前运行的命令,此方法将非常有用。下面是一个例子...

from threading import Thread 
import subprocess 
import Queue 
import multiprocessing 

class Command(object): 
    def __init__(self, cmds): 
     self.cmds = cmds 

    def run_cmds(self): 
     cmd_queue = Queue.Queue() 
     for cmd in self.cmds: 
      cmd_queue.put(cmd) 

     available_threads = multiprocessing.cpu_count() 
     for x in range(0,available_threads): 
      t = Thread(target=self.run_cmd,args=(cmd_queue,)) 
      t.setDaemon(True) 
      t.start() 

     cmd_queue.join() 


    def run_cmd(self, cmd_queue): 
     while True: 
      try: cmd = cmd_queue.get() 
      except: break 
      print 'Thread started' 
      process = subprocess.Popen(cmd, shell=True) 
      process.communicate() 
      print 'Thread finished' 
      cmd_queue.task_done() 


# create list of commands you want to run 
cmds = ['cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop'] 
# create class 
c = Command(cmds) 
# run them... 
c.run_cmds() 

这将打印....

Thread started 
Thread started 
Thread started 
Thread startedThread finished 

Thread started 
Thread finishedThread finished 

Thread finished 
Thread finished 

正如你可以从输出的子过程开始看到彼此独立的结束,并没有子等待另一个子进程完成,因为它们都在不同的线程中调用。当然,你可以添加超时时间和其他任何你想要的,这只是一个简单的例子。这假定你知道你想要运行的所有命令。如果你想添加线程超时,请参阅epicbrews答案。如果你愿意的话,你可以把他的线程超时例子加入到这个例子中。

+0

正如我在我的例子? :P Altho我没有把它描述得像你一样干净。 – Torxed 2013-05-14 12:17:46

+0

实际上,当我编写我的答案时,您的示例中有process.communicate()。否则我不会回答。我在编辑历史记录中看到您删除了它。 – b10hazard 2013-05-14 12:30:55

+0

Yepp,但我尽可能早地删除了它,因为我之前只是在他的代码中粘贴了他的代码,以便在我的连接断开之前(在火车上,所以它的DC每2分钟):) – Torxed 2013-05-14 12:48:27