2017-02-03 79 views
1

我正在研究从本地驱动器读取DBF文件并将数据加载到sql服务器表的程序。我对Python很绿,我发现了一些关于多线程的细节,其中大部分都是令人困惑的。读取和插入的性能很慢,看我的CPU使用率,我有足够的容量。我也在运行SSD。Python - 多线程帮助 - 读取多个文件 - ETL到SQL服务器

此代码将被扩展到大约400个拉链之间的大约20个DBF文件中。所以我们总共讨论了8000个DBF文件。

我很难做到这一点。你能提供指针吗?

这里是我的代码(这是一个有点混乱,但以后我会清理),

import os, pyodbc, datetime, shutil 
from dbfread import DBF 
from zipfile import ZipFile 

# SQL Server Connection Test 
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost\test;DATABASE=TEST_DBFIMPORT;UID=test;PWD=test') 
cursor = cnxn.cursor() 

dr = 'e:\\Backups\\dbf\\' 
work = 'e:\\Backups\\work\\' 
archive = 'e:\\Backups\\archive\\' 


for r in os.listdir(dr): 

    curdate = datetime.datetime.now() 
    filepath = dr + r 
    process = work + r 
    arc = archive + r 

    pth = r.replace(".sss","") 
    zipfolder = work + pth 
    filedateunix = os.path.getctime(filepath) 
    filedateconverted=datetime.datetime.fromtimestamp(int(filedateunix) 
               ).strftime('%Y-%m-%d %H:%M:%S') 
    shutil.move(filepath,process) 
    with ZipFile(process) as zf: 
     zf.extractall(zipfolder) 


    cursor.execute(
     "insert into tblBackups(backupname, filedate, dateadded) values(?,?,?)", 
    pth, filedateconverted, curdate) 
    cnxn.commit() 

    for dirpath, subdirs, files in os.walk (zipfolder): 

     for file in files: 
      dateadded = datetime.datetime.now() 

      if file.endswith(('.dbf','.DBF')): 
       dbflocation = os.path.abspath(os.path.join(dirpath,file)).lower() 

       if dbflocation.__contains__("\\bk.dbf"): 
        table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore') 
        for record in table.records: 
         rec1 = str(record['code']) 
         rec2 = str(record['name']) 
         rec3 = str(record['addr1']) 
         rec4 = str(record['addr2']) 
         rec5 = str(record['city']) 
         rec6 = str(record['state']) 
         rec7 = str(record['zip']) 
         rec8 = str(record['tel']) 
         rec9 = str(record['fax']) 
         cursor.execute(
         "insert into tblbk(code,name,addr1,addr2,city,state,zip,tel,fax) values(?,?,?,?,?,?,?,?,?)", 
         rec1, rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9, rec10, rec11, rec12, rec13) 
       cnxn.commit() 


       if dbflocation.__contains__("\\cr.dbf"): 
        table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore') 
        for record in table.records: 
         rec2 = str(record['cal_desc']) 
         rec3 = str(record['b_date']) 
         rec4 = str(record['b_time']) 
         rec5 = str(record['e_time']) 
         rec6 = str(record['with_desc']) 
         rec7 = str(record['recuruntil']) 
         rec8 = record['notes'] 
         rec9 = dateadded 
         cursor.execute(
         "insert into tblcalendar(cal_desc,b_date,b_time,e_time,with_desc,recuruntil,notes,dateadded) values(?,?,?,?,?,?,?,?)", 
         rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9) 
       cnxn.commit() 

    shutil.move(process, archive) 
    shutil.rmtree(zipfolder) 
+0

我想要的另一个选择是可能更简单的多处理。 – HMan06

回答

1

TL;博士:测量的,后来修复!


注意,在最常用的Python实现(CPython的)只有一个线程在同一时间可以执行Python字节码。 因此,线程不是提高CPU限制性能的好方法。如果工作是I/O限制的,他们可以很好地工作。

但你应该首先做的是措施。这不能够强调。如果你不知道是什么导致缺乏表现,你不能修复它!

编写完成该任务的单线程代码,然后在分析器下运行该代码。首先尝试内置cProfile。如果这不能为您提供足够的信息,请尝试一个line profiler

分析应该告诉你哪些步骤消耗的时间最多。一旦你知道了,你可以开始改进。

例如,使用multiprocessing来读取DBF文件是没有意义的,如果这是将数据填充到SQL服务器中的操作花费最多的时间!这甚至可能会减慢速度,因为有几个进程正在争夺SQL服务器的注意力。

如果 SQL服务器不是瓶颈,它可以处理多个连接,我会用multiprocessing,大概Pool.map()阅读DBF的并行和数据填充到SQL服务器。在这种情况下,您应该在DBF文件名列表上覆盖Pool.map,以便在工作进程中打开这些文件。

+0

谢谢罗兰,你对这个完全正确。除了我的代码外,没有任何瓶颈。我能够使用多处理模块,并以1秒的延迟一次加载15个进程。我这样做是因为我注意到有时这些进程试图获取相同的文件。现在我遇到了SQL Server CPU瓶颈,这正是我想看到的。 SQL Server CPU使用率从大约8%上升到50%,处理8k文件的时间从10多个小时增加到45分钟! – HMan06

+0

@ HMan06速度超过10倍?太好了! –