0

我有多个线程处理数据并将其放在一个队列中,并有一个线程从队列中获取数据,然后将其保存到数据库中。如何在守护程序线程中关闭sqlite连接?

我认为以下原因会导致内存泄漏:

class DBThread(threading.Thread): 
    def __init__(self, myqueue): 
     threading.Thread.__init__(self) 
     self.myqueue = myqueue 

    def run(self): 
     conn = sqlite3.connect("test.db") 
     c = conn.cursor() 

     while True: 
      data = myqueue.get() 
      if data: 
       c.execute("INSERT INTO test (data) VALUES (?)", (data,)) 
       conn.commit() 

      self.myqueue.task_done() 

     #conn.close() <--- never reaches this point 

q = Queue.Queue() 

# Create other threads 
.... 

# Create DB thread 
t = DBThread(q) 
t.setDaemon(True) 
t.start() 

q.join() 

我不能把conn.close() while循环,因为我认为,将关闭在第一回路的连接。我不能将它放在if data:语句中,因为它不会保存稍后可能放入队列的数据。

我在哪里关闭数据库连接?如果我不关闭它,这会不会导致内存泄漏?

回答

0

如果您可以使用不会出现在正常数据中的标记值,例如None,您可以通知线程停止并关闭在finally条款数据库连接:

import threading 
import Queue 
import sqlite3 

class DBThread(threading.Thread): 
    def __init__(self, myqueue, db_path): 
     threading.Thread.__init__(self) 
     self.myqueue = myqueue 
     self.db_path = db_path 

    def run(self): 
     conn = sqlite3.connect(self.db_path) 

     try: 
      while True: 
       data = self.myqueue.get()  
       if data is None: # check for sentinel value 
        break 

       with conn: 
        conn.execute("INSERT INTO test (data) VALUES (?)", (data,)) 
       self.myqueue.task_done() 
     finally: 
      conn.close() 


q = Queue.Queue() 
for i in range(100): 
    q.put(str(i)) 

conn = sqlite3.connect('test.db') 
conn.execute('create table if not exists test (data text)') 
conn.close() 

t = DBThread(q, 'test.db') 
t.start() 

q.join() 
q.put(None) # tell database thread to terminate 

如果您不能使用哨兵值,你可以一个标志添加到在while循环检查得到的类别。还要将stop()方法添加到设置该标志的线程类。您将需要使用非阻塞Queue.get()

class DBThread(threading.Thread): 
    def __init__(self, myqueue, db_path): 
     threading.Thread.__init__(self) 
     self.myqueue = myqueue 
     self.db_path = db_path 
     self._terminate = False 

    def terminate(self): 
     self._terminate = True 

    def run(self): 
     conn = sqlite3.connect(self.db_path) 

     try: 
      while not self._terminate: 
       try: 
        data = self.myqueue.get(timeout=1) 
       except Queue.Empty: 
        continue 

       with conn: 
        conn.execute("INSERT INTO test (data) VALUES (?)", (data,)) 
       self.myqueue.task_done() 
     finally: 
      conn.close() 

.... 
q.join() 
t.terminate() # tell database thread to terminate 

最后,值得一提的是,如果分贝线程管理排排队,即如果q.join()回报你的程序可能会终止。这是因为数据库线程是一个守护进程线程,并不会阻止主线程退出。您需要确保您的工作线程产生足够的数据以保持db线程繁忙,否则将返回q.join()并且主线程将退出。

+0

在这种情况下'conn:'做什么?通常它会清理资源(在这种情况下是连接),这不是我们现在想要的吗? – Caramiriel

+1

@ Caramiriel:没错,我们不想在那个时候清理。然而,它不会做你的想法;它实现了事务的自动提交/回滚。请参阅[使用连接作为上下文管理器](https://docs.python.org/2/library/sqlite3.html#using-the-connection-as-a-context-manager)。 – mhawke