2012-07-26 61 views
3

我可能会问一个非常基本的问题,但我真的不知道如何在python中创建一个简单的并行应用程序。 我正在16核心的机器上运行我的脚本,我想高效地使用它们。我有16个巨大的文件要读取,我希望每个CPU读取一个文件,然后合并结果。 这里,我给的,我想怎么做一个简单的例子:Python,阅读许多文件并合并结果

parameter1_glob=[] 
    parameter2_glob[] 


    do cpu in arange(0,16): 
     parameter1,parameter2=loadtxt('file'+str(cpu)+'.dat',unpack=True) 

     parameter1_glob.append(parameter1) 
     parameter2_glob.append(parameter2) 

我认为multiprocessing模块可以帮助,但我不知道如何将它应用到了我想做的事情。

+0

[您是否听说过Python的GIL](http://stackoverflow.com/questions/990102/python-global-interpreter-lock-gil-workaround-on-multi-core-systems-using-task) – tkone 2012-07-26 14:58:11

+2

有没有意义使用多个线程......您的应用程序将是磁盘绑定的,而不是CPU绑定的。 – 2012-07-26 14:59:16

+2

@tkone:'multiprocessing'通过使用单独的解释器来避免GIL,尽管它不会使磁盘更快。 – geoffspear 2012-07-26 15:00:10

回答

1

要逐行合并吗?有些协程对于I/O绑定的应用程序比传统的多任务更有趣。您可以链接生成器和协程以进行各种路由,合并和广播。用这个nice presentation by David Beazley吹出你的想法。

您可以使用协同程序作为一个水槽(未经测试,请参阅dabeaz例子):

# A sink that just prints the lines 
@coroutine 
def printer(): 
    while True: 
     line = (yield) 
     print line, 

sources = [ 
    open('file1'), 
    open('file2'), 
    open('file3'), 
    open('file4'), 
    open('file5'), 
    open('file6'), 
    open('file7'), 
] 

output = printer() 
while sources: 
    for source in sources: 
     line = source.next() 
     if not line: # EOF 
      sources.remove(source) 
      source.close() 
      continue 
     output.send(line) 
2

我同意Colin Dunklau在他的评论中所说的,这个过程会在读写这些文件时遇到瓶颈,对CPU的要求很少。即使你有17个专用驱动器,即使只有一个内核也不会超出。另外,虽然我认识到这与您的实际问题相切,但您可能会遇到这些“巨大”文件的内存限制 - 将16个文件作为数组加载到内存中,然后将它们组合到另一个文件中几乎肯定会占用更多的内存你有。

你可能会发现更好的结果,看看shell脚本这个问题。特别是,GNU sort使用内存有效的合并排序来快速排序一个或多个文件 - 比Python中或大多数其他语言中最精心编写的应用程序的速度要快得多。

我会建议避免任何类型的多线程工作,它将大大增加复杂性,并且利益最小。确保一次只保留内存中的少量文件,否则会很快耗尽。无论如何,您绝对想要在两个独立的磁盘上运行读写。与同时读取和写入同一磁盘相关的速度减慢非常痛苦。

0

假设每个文件的结果是短小的,你可以用我的包jug做到这一点:

from jug import TaskGenerator 
loadtxt = TaskGenerator(loadtxt) 

parameter1_glob=[] 
parameter2_glob[] 

@TaskGenerator 
def write_parameter(oname, ps): 
    with open(oname, 'w') as output: 
     for p in ps: 
      print >>output, p 

parameter1_glob = [] 
parameter2_glob = [] 

for cpu in arange(0,16): 
    ps = loadtxt('file'+str(cpu)+'.dat',unpack=True) 
    parameter1_glob.append(ps[0]) 
    parameter2_glob.append(ps[1]) 

write_parameter('output1.txt', parameter1_glob) 
write_parameter('output2.txt', parameter2_glob) 

现在,您可以执行多个jug execute职位。