2017-06-03 76 views
2

我有一个程序,维护与定期心跳服务器的连接。每过一段时间,服务器都会停止响应心跳,我必须重新连接。我用一个定时器实现了这一点,如果在n秒后没有听到任何响应,将会调用重新连接。每次发生这种情况时,我都会泄漏一个线程,随着时间的推移,我最终会耗尽线程。Python的select.select泄漏线程

现在,为了便于重复使用,大量简化了这个插图,介绍了延迟后重新连接的方式以及总是会导致线程增加的方式。 如何杀死旧线程/套接字/选择(可能正在等待recv)?

import socket 
import select 
import threading 

class Connection(): 

    def tick(self): 
     print(threading.active_count()) # this increases every 1s! 
     # ... certain conditions not met/it's been too long, then: 
     self.reconnect() 

    def reconnect(self): 
     self.socket.shutdown(socket.SHUT_WR) 
     self.socket.close() 
     self.timer.cancel() 
     self.connect() 

    def connect(self): 
     self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.socket.connect((IP, TCP_PORT)) 
     self.timer = threading.Timer(1, self.tick) 
     self.timer.start() 
     r,_,_ = select.select([self.socket], [], []) 

if __name__ == '__main__': 
    Connection().connect() 

回答

2

我很确定,它不是select()泄漏任何线程。我们假设select()不会返回,即它永远阻止。

在这种情况下

  • .tick()从计时器线程调用。在计时器线程内调用.reconnect()
  • .reconnect()关闭现有的套接字。这导致select()调用失败,IOError“错误的文件描述符”(这也是您应该确实修复代码的原因)。
  • .reconnect()尝试取消当前计时器。 这不做任何事情,因为定时器已经触发(我们目前在定时器功能中!)。
  • .reconnect()来电.connect()那个人建立一个新的计时器,在这里我们再次去。

所以问题是:这种操作模式在哪里挂在现有的计时器对象上?那么,您的所有计时器线程都会被select()调用中的IOError终止。这存储了异常的每个线程引用。

我的猜测是,这可以防止CPython中的引用计数清理触发,因此计时器线程只会在垃圾回收期间清理。这是不可靠的,因为不能保证定时器线程能够及时清理。

如果您在.connect()的开头添加import gc; gc.collect(),问题(似乎)就会消失。但是,这是一个非解决方案。

为什么不使用timeout参数select()来实现类似的结果,而无需使用定时器线程?

r = [] 
while not r: 
    if self.socket: 
     self.socket.shutdown(socket.SHUT_WR) 
     self.socket.close() 

    self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
    self.socket.connect((IP, TCP_PORT)) 
    # select returns empty lists on timeout 
    r, _, _ = select.select([self.socket], [], [], 1) 

不要忘记设置self.socket = NoneConnection.__init__()这个工作。