2017-02-10 77 views
34

我有大量的文本文件,比如说50,我需要读入大量的数据框。目前,我正在使用以下步骤。如何加速读取多个文件并将数据放入数据框?

  1. 阅读每个文件,并检查标签是什么。我需要的信息通常包含在前几行中。相同的标签只是为文件的其余部分重复使用,每次都会针对它们列出不同类型的数据。
  2. 用这些标签创建一个数据框。
  3. 再次读取该文件,并用值填充数据帧。
  4. 将数据帧与主数据帧连接起来。

这对于100 KB大小的文件非常适用 - 几分钟,但在50 MB,它只需要几个小时,并且不实际。

如何优化我的代码?特别是 -

  1. 如何确定哪些功能需要最多时间,哪些需要优化?它是文件的阅读吗?这是写入数据框吗?我的计划在哪里花费时间?
  2. 我应该考虑多线程还是多处理?
  3. 我可以改进算法吗?
    • 或许读取整个文件在一个进入名单,而不是一行一行,在块/整个文件
    • 解析数据,而不是一行行,
    • 分配数据到数据帧块/一个去,而不是逐行。
  4. 有什么我可以做,让我的代码更快执行?

下面是一个示例代码。我自己的代码稍微复杂一点,因为文本文件比较复杂,所以我必须使用大约10个正则表达式和多个while循环来读取数据并将其分配到正确数组中的正确位置。为了保持MWE简单,我还没有在MWE的输入文件中使用重复标签,所以它会让我无故读取文件两次。我希望这是有道理的!

import re 
import pandas as pd 

df = pd.DataFrame() 
paths = ["../gitignore/test1.txt", "../gitignore/test2.txt"] 
reg_ex = re.compile('^(.+) (.+)\n') 
# read all files to determine what indices are available 
for path in paths: 
    file_obj = open(path, 'r') 
    print file_obj.readlines() 

['a 1\n', 'b 2\n', 'end'] 
['c 3\n', 'd 4\n', 'end'] 

indices = [] 
for path in paths: 
    index = [] 
    with open(path, 'r') as file_obj: 
     line = True 
     while line: 
      try: 
       line = file_obj.readline() 
       match = reg_ex.match(line) 
       index += match.group(1) 
      except AttributeError: 
       pass 
    indices.append(index) 
# read files again and put data into a master dataframe 
for path, index in zip(paths, indices): 
    subset_df = pd.DataFrame(index=index, columns=["Number"]) 
    with open(path, 'r') as file_obj: 
     line = True 
     while line: 
      try: 
       line = file_obj.readline() 
       match = reg_ex.match(line) 
       subset_df.loc[[match.group(1)]] = match.group(2) 
      except AttributeError: 
       pass 
    df = pd.concat([df, subset_df]).sort_index() 
print df 

    Number 
a  1 
b  2 
c  3 
d  4 

我的输入文件:

test1.txt的

a 1 
b 2 
end 

的test2.txt

c 3 
d 4 
end 
+2

可能会得到一个更快的磁盘:) –

+3

在此期间,查找一个好的Python分析器。这是一般工具类,它会告诉你程序的哪一部分是瓶颈。 –

+1

你不能读取数据框中的整个50个文件,然后运行基于正则表达式的操作?这将是快速的,因为对熊猫的过滤操作非常快...... – vks

回答

0

事实证明,首先创建一个空白的DataFrame,搜索索引以找到一行数据的正确位置,然后更新DataFrame的这一行是一个愚蠢的时间昂贵的过程。

这样做的更快的方法是将输入文件的内容读入基本数据结构(如列表列表或列表中的字典),然后将其转换为DataFrame。

当您读取的所有数据都在相同列中时使用列表。否则,使用dicts来明确地说出每一位数据应该到哪个列。

更新1月18日:这是链接到How to parse complex text files using Python?我也写了blog article explaining how to parse complex files to beginners

1

可以导入多模型和使用工作进程池同时打开多个文件作为文件对象,加快速度你的代码的加载部分。要测试的时间,无论是进口日期时间函数,并使用下面的代码:

import datetime 
start=datetime.datetime.now() 

#part of your code goes here 

execTime1=datetime.datetime.now() 
print(execTime1-start) 

#the next part of your code goes here 

execTime2=datetime.datetime.now() 
print(execTime2-execTime1) 

至于读取每个文件只有一次,请考虑使用其他多脚本来构建每个文件行的列表,这样你就可以检查用于没有文件I/O操作的匹配。

1

首先,使用一个分析器您的脚本(see this question)。准确地分析这部分消耗更多的时间。看看你能不能优化它。

其次,我觉得,I/O操作 - 读取文件最有可能是瓶颈,可以使用并发方式进行优化,我会建议同时读取文件并创建数据帧,每个线程可以将新创建​​的数据帧推送到队列中,主线程监控队列可以从队列中获取数据帧并合并与主数据帧。

希望这会有所帮助。

1

1为文件创建一个输出模板(如结果数据框应具有列A,BC)

2读取每个文件,将其转换为输出模板(即在步骤1中建立)并保存文件(如temp_idxx.csv)这可以并行完成:)

3连击这些temp_idxx.csv文件转换成一个巨大的文件,并删除临时工

此过程中

优点是,它可以并行运行,并且它不会吃掉所有的内存 缺点是创建输出格式并坚持它,以及磁盘空间使用率

1

使用pd.read_csv将文件直接读入熊猫数据框。创建你的subset_df。使用诸如skipfooter之类的方法可以跳过文件末尾的行,这是你不需要的。还有更多可用的方法可以替代您正在使用的一些正则表达式循环函数,如error_bad_lines和skip_blank_lines。

然后使用熊猫提供的工具清理不需要的数据。

这将允许您读取打开并只读取一次文件。

12

在拔出多重加工锤之前,您的第一步应该是做一些分析。使用cProfile快速浏览以确定哪些功能需要很长时间。不幸的是,如果你的线路全部在一个函数调用中,它们将显示为库调用。 line_profiler更好,但需要更多的安装时间。

注意。如果使用ipython,则可以使用%timeit(timeit模块的magic命令)和%prun(profile模块的magic命令)来为语句和函数定时。谷歌搜索将显示一些指南。

熊猫是一个美好的图书馆,但我一直是一个偶然的受害者,糟糕的使用它与残酷的结果。尤其要注意append()/ concat()操作。这可能是你的瓶颈,但你应该确定。通常,如果您不需要执行索引/列对齐,则numpy.vstack()和numpy.hstack()操作会更快。在你的情况下,它看起来像你可能能够通过系列或一维numpy ndarrays可以节省时间。

顺便说一句,Python中的try块比检查无效条件慢得多,速度往往要比检查无效条件慢10倍或更多,因此确保在将它粘贴到每条线的循环中时绝对需要它。这可能是时间的另一个掠夺者;我想象一下,在match.group(1)失败的情况下,您会尝试使用try块来检查AttributeError。我会先检查一下有效的比赛。

即使这些小小的修改应该足以让程序在尝试任何像多处理一样激烈的事情之前运行得更快。那些Python库很棒,但带来了一系列新的挑战。

+1

看到他的脚本逐行读取50MB文件是瓶颈发生的地方,这是相当明显的。即使在50MB文件上做pandas.read_excel也需要几分钟的时间。 – Nemo

2

一般蟒蛇考虑:

首先对时间测量你可以使用这样一个片段:

from time import time, sleep 


class Timer(object): 
    def __init__(self): 
     self.last = time() 


    def __call__(self): 
     old = self.last 
     self.last = time() 
     return self.last - old 

    @property 
    def elapsed(self): 
     return time() - self.last 



timer = Timer() 

sleep(2) 
print timer.elapsed 
print timer() 
sleep(1) 
print timer() 

然后,你可以基准运行的代码很多次,检查是否有差异。

关于这一点,我的评论在线:

with open(path, 'r') as file_obj: 
    line = True 
    while line: #iterate on realdines instead. 
     try: 
      line = file_obj.readline() 
      match = reg_ex.match(line) 
      index += match.group(1) 
      #if match: 
      # index.extend(match.group(1)) # or extend 

     except AttributeError: 
      pass 

你以前的代码笏不是真的Python的,你可能会想尝试/除外。 然后尝试只在最小可能的行上做。

相同的通知适用于第二个代码块。

如果您需要多次读取相同的文件。您可以使用StringIO将它们存储在RAM中,或者更轻松地保存只读一次的{path:content}字典。

Python正则表达式很慢,你的数据看起来很简单,你可以考虑在你的输入行上使用split和strip方法。

striped=[l.split() for l in [c.strip() for c in file_desc.readlines()] if l] 

我建议你阅读本:https://gist.github.com/JeffPaine/6213790的correspondig视频是在这里https://www.youtube.com/watch?v=OSGv2VnC0go

7

我用过很多次,因为它是一个特别容易实现多的。

import pandas as pd 
from multiprocessing import Pool 

def reader(filename): 
    return pd.read_excel(filename) 

def main(): 
    pool = Pool(4) # number of cores you want to use 
    file_list = [file1.xlsx, file2.xlsx, file3.xlsx, ...] 
    df_list = pool.map(reader, file_list) #creates a list of the loaded df's 
    df = pd.concat(df_list) # concatenates all the df's into a single df 

if __name__ == '__main__': 
    main() 

使用这个,你应该能够在没有太多工作的情况下大幅提高程序的速度。为了使这个运行速度更快,考虑你的文件更改为CSV并在使用熊猫功能pandas.read_csv

:如果你不知道你有多少个处理器有,你可以拉起你的shell并输入

echo %NUMBER_OF_PROCESSORS% 

编辑检查

+0

Python原生CSV模块允许指定''''作为分隔符。 –

1

你的代码不会做你所描述的。

问题:1.阅读每个文件,并检查标签是什么。我需要的信息通常包含在前几行中。

但是看了你的整个文件,而不是只有几行。 这导致在读取文件两次

问题:2.再次读取文件,并用数值填充数据帧。

您在循环覆盖df['a'|'b'|'c'|'d']连连,
我相信这不是你想要的东西是无用的。
这适用于问题中给出的数据,但如果您必须处理n个值,则不适用。


预算与不同的逻辑:

data = {} 
for path in paths: 
    with open(path, 'r') as file_obj: 
     line = True 
     while line: 
      try: 
       line = file_obj.readline() 
       match = reg_ex.match(line) 
       if match.group(1) not in data: 
        data[ match.group(1) ] = [] 

       data[match.group(1)].append(match.group(2)) 
      except AttributeError: 
       pass 

print('data=%s' % data) 
df = pd.DataFrame.from_dict(data, orient='index').sort_index() 
df.rename(index=str, columns={0: "Number"}, inplace=True) 

输出

data={'b': ['2'], 'a': ['1'], 'd': ['4'], 'c': ['3']} 
<class 'pandas.core.frame.DataFrame'> 
Index: 4 entries, a to d 
Data columns (total 1 columns): 
Number 4 non-null object 
dtypes: object(1) 
memory usage: 32.0+ bytes 
    Number 
a  1 
b  2 
c  3 
d  4 

时间表

   Code from Q: to_dict_from_dict 
    4 values 0:00:00.033071 0:00:00.022146 
1000 values 0:00:08.267750 0:00:05.536500 
10000 values 0:01:22.677500 0:00:55.365000 

与Python测试:3.4.2 - 熊猫:0.19.2 - 回复:2.2.1

+0

是的,你是对的。我的MWE并不好。 – bluprince13

+0

请扩大** MWE ** – stovfl

+0

当我开始修改它时,它开始变得非常复杂。我想我会保持原样,但在我的解释中,我会更加清楚地表明,我试图让MWE保持简单。 – bluprince13

2

首先,如果你正在阅读多次的文件,好像这将是瓶颈。尝试将文件读入1个字符串对象,然后多次使用cStringIO

其次,在读入所有文件之前,您并没有真正显示出构建索引的任何理由。即使你这样做,你为什么使用Pandas进行IO?看起来你可以使用常规的Python数据结构(可能使用__slots__)来构建它,然后将它放在主数据框中。如果你在读取文件Y之前不需要文件X索引(正如你第二次循环似乎建议的那样),你只需要循环一次文件。

第三,您可以使用简单的split/strip在弦上拉出空间分隔的记号,或者如果它是更复杂(有串引号和等)使用来自Python的标准库中的CSV模块。在展示您如何真正构建数据之前,很难提出与此相关的修复方案。

到目前为止,你已经证明什么可以用简单的

for path in paths: 
    data = [] 
    with open(path, 'r') as file_obj: 
     for line in file_obj: 
      try: 
       d1, d2 = line.strip().split() 
      except ValueError: 
       pass 
      data.append(d1, int(d2))) 
    index, values = zip(*data) 
    subset_df = pd.DataFrame({"Number": pd.Series(values, index=index)}) 

这里相当迅速完成在时刻的差当我不预先分配(生成的文件是磁盘空间的虚拟机上运行在大小大致24MB):

import pandas as pd 
from random import randint 
from itertools import combinations 
from posix import fsync 


outfile = "indexValueInput" 

for suffix in ('1', '2'): 
    with open(outfile+"_" + suffix, 'w') as f: 
     for i, label in enumerate(combinations([chr(i) for i in range(ord('a'), ord('z')+1)], 8)) : 
      val = randint(1, 1000000) 
      print >>f, "%s %d" % (''.join(label), val) 
      if i > 3999999: 
       break 
     print >>f, "end" 
     fsync(f.fileno()) 

def readWithPandas(): 
    data = [] 
    with open(outfile + "_2", 'r') as file_obj: 
     for line in file_obj: 
      try: 
       d1, d2 = str.split(line.strip()) 
      except ValueError: 
       pass 
      data.append((d1, int(d2))) 
    index, values = zip(*data) 
    subset_df = pd.DataFrame({"Numbers": pd.Series(values, index=index)}) 

def readWithoutPandas(): 
    data = [] 
    with open(outfile+"_1", 'r') as file_obj: 
     for line in file_obj: 
      try: 
       d1, d2 = str.split(line.strip()) 
      except ValueError: 
       pass 
      data.append((d1, int(d2))) 
    index, values = zip(*data) 

def time_func(func, *args): 
    import time 
    print "timing function", str(func.func_name) 
    tStart = time.clock() 
    func(*args) 
    tEnd = time.clock() 
    print "%f seconds " % (tEnd - tStart) 

time_func(readWithoutPandas) 
time_func(readWithPandas) 

所得时间为:

timing function readWithoutPandas 
4.616853 seconds 
timing function readWithPandas 
4.931765 seconds 

您可以尝试使用索引编制的这些功能,查看时间差异。几乎可以肯定的是,减速来自多个磁盘读取。由于Pandas没有时间从字典中构建数据框,因此在将数据传递给Pandas之前,最好先考虑如何在纯Python中构建索引。但是,在读取1个磁盘的同时读取数据和建立索引。

我想另外一个警告是,如果从代码内部打印,预计需要花费大量的时间。将纯文本写入tty需要花费时间读取/写入磁盘。

相关问题