2015-12-03 41 views
2

我想并行化一个文件过滤操作,其中每个过滤器是一个大的正则表达式,因此整个事情需要时间来运行。该文件本身大约100GB。单进程的版本是这样的:python使用多进程来过滤海量文件

def func(line): 
    # simple function as an example 
    for i in range(10**7): 
     pass 
    return len(line) % 2 == 0 


with open('input.txt') as in_sr, open('output.txt', 'w') as out_sr: 
    for line in input: 
     if func(line): 
      out_sr.write(line) 

我尝试使用multiprocessingimap但给人ValueError: I/O operation on closed file.我认为迭代被复制到每一个过程,但不是所有的进程有处理打开。

有没有办法做到这一点使用multiprocessing,最好是利用池?

+0

如果__name__ =='__main __':'?你必须保持线条的秩序? – eph

+0

@eph是的行必须与输入文件的顺序相同。在我的真实代码中,'with'是函数中的某个地方。 – simonzack

+0

什么是你的文件和正则表达式?在命令行或其他文件处理工具上使用awk会更容易吗? – DainDwarf

回答

1

我没有错误运行下面的代码。确保您不要在with声明之外拨打in_srout_sr

from multiprocessing import Pool 

def func(line): 
    # simple function as an example 
    for i in xrange(10**7): 
     pass 
    return len(line) % 2 == 0, line 

def main(): 
    with open('input.txt','r') as in_sr, open('output.txt', 'w') as out_sr: 
     pool = Pool(processes=4) 
     for ret,line in pool.imap(func, in_sr, chunksize=4): 
      if ret: 
       out_sr.write(line) 
     pool.close() 

if __name__ == '__main__': 
    main() 
+0

奇怪,我只是试图安装python 3.5.0,它确实在那里工作,我认为我以前的python版本是buggy(它是3.4.x)。感谢您的答案,它绝对帮助我诊断问题! – simonzack

+0

Btw'contextlib。关闭'可以在这里用作替代风格。 – simonzack

1

的代码与此类似:

def func(line): 
    ... 

if __name__ == '__main__': 

    from multiprocessing import Pool 
    from itertools import tee, izip 

    pool = Pool(processes=4) 

    with open('input.txt') as in_sr, open('output.txt', 'w') as out_sr: 
     lines1, lines2 = tee(in_sr) 
     for line, flag in izip(lines1, pool.imap(func, lines2)): 
      if flag: 
       out_sr.write(line) 
+0

由于ValueError,imap没有工作,请参阅我的问题 – simonzack

+0

@simonzack我不认为'ValueError'是由于'imap',如果只有行字符串作为参数传递。 – eph