2015-01-12 36 views
6

我有一个主要的Python脚本连接到MySQL数据库,并从中剔除少量记录。基于返回的结果,它将启动尽可能多的线程(类实例)和抓取许多记录。每个线程应该返回到数据库并通过将一个状态标志设置为不同的状态(“进程已启动”)来更新另一个表。如何使用Python多线程处理MySQL连接(s)

要做到这一点我想:

1)通过了数据库连接的所有主题 2)打开从每个线程

一个新的数据库连接,但他们都不是工作。

我可以运行我的更新没有任何问题在这两种情况下通过使用try/except,但MySQL表尚未更新,并且没有生成错误。我在这两种情况下都使用了commit。

我的问题是如何在这种情况下处理MySQL连接?

更新基于第一几点意见:

MAIN SCRIPT 
----------- 

#Connecting to DB 
db = MySQLdb.connect(host = db_host, 
         db = db_db, 
         port = db_port, 
         user = db_user, 
         passwd = db_password, 
         charset='utf8') 

# Initiating database cursor 
cur = db.cursor() 

# Fetching records for which I need to initiate a class instance 

cur.execute('SELECT ...') 

for row in cur.fetchall() : 
    # Initiating new instance, appending it to a list and 
    # starting all of them 



CLASS WHICH IS INSTANTIATED 
--------------------------- 

# Connecting to DB again. I also tried to pass connection 
# which has been opened in the main script but it did not 
# work either. 

db = MySQLdb.connect(host = db_host, 
         db = db_db, 
         port = db_port, 
         user = db_user, 
         passwd = db_password, 
         charset='utf8') 

# Initiating database cursor 
cur_class = db.cursor() 
cur.execute('UPDATE ...') 
db.commit() 
+0

很难说,不知道如何连接到你的数据库,你如何实现更新任何东西。 – Ashalynd

+0

我不完全理解你的问题。是最简单的情况下工作,像单线程python连接到MySQL和更新表? – qqibrow

+0

@Ashalynd感谢您花时间!抱歉,我是AFK。我用我的主代码和类代码中的代码snipet更新了我的问题。这是我如何启动实例并打开数据库连接的方式。我试图捕捉错误,当打开连接和执行查询从尝试/除了没有运气的情况下。 – g0m3z

回答

3

看来我的代码没有问题,但与我的MySQL版本。我正在使用MySQL标准社区版,并根据发现的官方文档here

线程池插件是一个商业功能。它不包含在MySQL社区发行版中。

我即将升级到MariaDB来解决此问题。

9

下面是使用Python中的多线程处理MySQL的一个例子,我不知道你 表和数据,因此,只需更改代码可能帮助:

import threading 
import time 
import MySQLdb 

Num_Of_threads = 5 

class myThread(threading.Thread): 

    def __init__(self, conn, cur, data_to_deal): 
     threading.Thread.__init__(self) 
     self.threadID = threadID 
     self.conn = conn 
     self.cur = cur 
     self.data_to_deal 

    def run(self): 

     # add your sql 
     sql = 'insert into table id values ({0});' 
     for i in self.data_to_deal: 
      self.cur.execute(sql.format(i)) 
      self.conn.commit() 

threads = [] 
data_list = [1,2,3,4,5] 

for i in range(Num_Of_threads): 
    conn = MySQLdb.connect(host='localhost',user='root',passwd='',db='') 
    cur = conn.cursor() 
    new_thread = myThread(conn, cur, data_list[i]) 

for th in threads: 
    th.start() 

for t in threads: 
    t.join() 
+0

对不起,我迟到的回复。参考你上面的例子,我有两个不同的文件,我的课程和我的主要脚本。我猜这应该不是问题。另一个我做不同的事情是我不把我的data_list传递给我的线程,因为我需要我的线程即时从我的数据库查询数据。所以我做的是:1)打开一个数据库连接(主脚本)2)查询记录(主脚本)3)启动尽可能多的类实例和我拥有的许多记录(主脚本)4。)尝试从每个实例(类实例)更新数据库中的表记录 – g0m3z

1

看起来像mysql 5.7确实支持多线程。

正如你以前试过的 - 绝对确保在def worker()中传递连接。全局定义的连接是我的错

下面是通过5个线程打印10个记录示例代码,5次

import MySQLdb 
import threading 


def write_good_proxies():  
    local_db = MySQLdb.connect("localhost","username","PassW","DB", port=3306) 
    local_cursor = local_db.cursor (MySQLdb.cursors.DictCursor) 
    sql_select = 'select http from zproxies where update_time is null order by rand() limit 10' 
    local_cursor.execute(sql_select) 
    records = local_cursor.fetchall() 
    id_list = [f['http'] for f in records] 
    print id_list 
def worker(): 
    x=0 
    while x< 5: 
     x = x+1 
     write_good_proxies() 

threads = [] 


for i in range(5): 
    print i 
    t = threading.Thread(target=worker) 
    threads.append(t) 
    t.start()