2017-03-07 75 views
2

我有一个应用程序依靠超时信号来执行一些阻塞操作。Python线程中超时信号的替代方案

例如:

def wait_timeout(signum, frame): 
    raise Exception("timeout") 

signal.signal(signal.SIGALRM, wait_timeout) 
signal.setitimer(signal.ITIMER_REAL, 5) 

try: 
    while true: 
     print("zzz") 
     sleep(1) 
except Exception as e: 
    # timeout 
    print("Time's up") 

使用相同的方法现在,我已经实现了多线程,但所有的线程我得到ValueError: signal only works in main thread

我假设信号超时的方法不适用于线程。

不幸的是我不能使用这样的事情:

timeout = 5 
start = time.time() 

while true: 
    print("zzz") 
    sleep(1) 
    if time.time() <= start+timeout: 
     print("Time's up) 
     break 

由于操作while循环可能会阻止并可能永远持续下去,因此循环可能永远达不到的,如果条款。

问:我该如何在线程中实现一个超时,就像我曾经用信号做过的那样?

编辑:我碰到过this blog post,在python中显示了类似的解决方案,用于JavaScript中的setTimeout()。我认为这可能是一个可能的解决方案,但我真的不知道如何使用它。

EDIT2:我开始在主线程如下:

p = Popen(["tool", "--param", arg], stdin=PIPE, stdout=PIPE, stderr=STDOUT) 
t = Thread(target=process_thread, daemon=True, args=(p,arg1,arg2)) 
t.start() 

process_thread函数处理tool的标准输出,通过执行以下操作:

for line in p.stdout: 
    # process line of the processes stdout 

该过程可以永远拿走,例如一旦tool不产生任何输出。我只想要tool的输出,比方说5秒,所以for循环需要在特定超时后中断。

这就是我使用的信号,但显然他们不能在线程中工作。

edit3:我已经创建了一个更精细和准确的示例,说明如何在线程中使用信号。 See the gist here

+0

你是如何开始你的线程?当你定义它们时,你是否设置了'daemon = True'?如果是这样,那么当主线程死亡时,这些线程将被终止。那是你想要做什么? – Billy

+0

是的,我以deamons开头,我会在一分钟内编辑OP。不,那不是我正在尝试的,我会尽力在OP中更好地解释它。 – SaAtomic

+0

我已经更新了OP @Billy – SaAtomic

回答

2

您要寻找的是看门狗

def watchdog(queue): 
    while True: 
     watch = queue.get() 
     time.sleep(watch.seconds) 

     try: 
      watch = queue.get_nowait() 
      # No except, got queue message, 
      # do noting wait for next watch 

     except queue.Empty: 
      os.kill(watch.pid, signal.SIGKILL) 

def workload_thread(queue): 
    pid = os.getpid() 
    queue.put({'pid':pid, 'seconds':5}) 

    # do your work 
    # Test Watchdog 
    # time.sleep(6) 

    queue.put({'pid':pid, 'done':True}) 

注:代码没有测试过,可能有语法错误!

+0

我觉得ike我以前见过/使用过这个或类似的东西,但我无法包裹我的现在就围着它走。在研究看门狗主题时,我也发现[this](http://liveincode.blogspot.de/2012/11/watchdog-timer-in-python.html),但我仍然不知道从哪里开始。你会如此善良,以我发布的代码创建一个例子吗? – SaAtomic

1

这实现了class Terminator, ,它发送一个给定的timeout=5Threads Popen processsignal.SIG...。可能有多个不同的pid

class Terminator(object): 
    class WObj(): 
     def __init__(self, process, timeout=0, sig=signal.SIGABRT): 
      self.process = process 
      self.timeout = timeout 
      self.sig = sig 

    def __init__(self): 
     self.__queue = queue.Queue() 
     self.__t = Thread(target=self.__sigterm_thread, args=(self.__queue,)) 
     self.__t.start() 
     time.sleep(0.1) 

    def __sigterm_thread(self, q): 
     w = {} 
     t = 0 
     while True: 
      time.sleep(0.1); 
      t += 1 
      try: 
       p = q.get_nowait() 
       if p.process == 0 and p.sig == signal.SIGTERM: 
        # Terminate sigterm_thread 
        return 1 

       if p.process.pid not in w: 
        if p.timeout > 0 and p.sig != signal.SIGABRT: 
         w[p.process.pid] = p 
       else: 
        if p.sig == signal.SIGABRT: 
         del (w[p.process.pid]) 
        else: 
         w[p.process.pid].timeout = p.timeout 

      except queue.Empty: 
       pass 

      if t == 10: 
       for key in list(w.keys()): 
        p = w[key] 
        p.timeout -= 1 
        if p.timeout == 0: 
         """ A None value indicates that the process hasn't terminated yet. """ 
         if p.process.poll() == None: 
          p.process.send_signal(p.sig) 
         del (w[p.process.pid]) 
       t = 0 
      # end if t == 10 
     # end while True 

    def signal(self, process, timeout=0, sig=signal.SIGABRT): 
     self.__queue.put(self.WObj(process, timeout, sig)) 
     time.sleep(0.1) 

    def close(self, process): 
     self.__queue.put(self.WObj(process, 0, signal.SIGABRT)) 
     time.sleep(0.1) 

    def terminate(self): 
     while not self.__queue.empty(): 
      trash = self.__queue.get() 

     if self.__t.is_alive(): 
      self.__queue.put(self.WObj(0, 0, signal.SIGTERM)) 

    def __enter__(self): 
     return self 

    def __exit__(self, exc_type, exc_val, exc_tb): 
     self.__del__() 

    def __del__(self): 
     self.terminate() 

例如,这是工作量:

def workload(n, sigterm): 
    print('Start workload(%s)' % n) 
    arg = str(n) 
    p = Popen(["tool", "--param", arg], stdin=PIPE, stdout=PIPE, stderr=STDOUT) 

    sigterm.signal(p, timeout=4, sig=signal.SIGTERM) 
    while True: 
     for line in p.stdout: 
      # process line of the processes stdout 
      print(line.strip()) 
      time.sleep(1) 

     if p.poll() != None: 
      break 

    sigterm.close(p) 
    time.sleep(0.1) 
    print('Exit workload(%s)' % n) 

if __name__ == '__main__': 
    with Terminator() as sigterm: 
     p1 = Thread(target=workload, args=(1, sigterm)); p1.start(); time.sleep(0.1) 
     p2 = Thread(target=workload, args=(2, sigterm)); p2.start(); time.sleep(0.1) 
     p3 = Thread(target=workload, args=(3, sigterm)); p3.start(); time.sleep(0.1) 
     p1.join(); p2.join(); p3.join() 

     time.sleep(0.5) 
    print('EXIT __main__') 

测试使用Python 3.4.2和Python:2.7.9

+0

谢谢您的反馈和示例,但我认为这不适用于多线程,如OP中所述。我在OP中添加了一个更详细的代码片段作为链接。 – SaAtomic

+0

此外,如果我尝试将该解决方案扩展到多个线程,则会遇到问题,即我的所有线程都报告相同的PID。 – SaAtomic

+1

是的,它不可扩展。在阅读更详细的代码片段后,将回来一个scalabel解决方案。 – stovfl