2012-12-17 36 views
2

我想跳过从map_async返回的结果。他们在记忆中增长,但我不需要他们。Python多处理map_async

下面是一些代码:

def processLine(line): 
    #process something 
    print "result" 
pool = Pool(processes = 8) 
for line in sys.stdin: 
    lines.append(line) 
    if len(lines) >= 100000: 
     pool.map_async(processLine, lines, 2000) 
pool.close() 
pool.join() 

当我必须处理文件,数亿行的,蟒蛇生长过程中的内存数千兆字节。我该如何解决这个问题?

感谢您的帮助:)

回答

3

你的代码有缺陷:

for line in sys.stdin: 
    lines.append(line) 
    if len(lines) >= 100000: 
     pool.map_async(processLine, lines, 2000) 

这要等到lines累计超过10万线。之后,pool.map_async正在100000+行的整个列表中调用,每增加一行

这不正是你真的想要做清楚,但 如果你不想返回值,使用pool.apply_async,不pool.map_async。也许像这样:

import multiprocessing as mp 

def processLine(line): 
    #process something 
    print "result" 

if __name__ == '__main__': 
    pool = mp.Pool(processes = 8) 
    for line in sys.stdin: 
     pool.apply_async(processLine, args = (line,)) 
    pool.close() 
    pool.join() 
0

是的,你是对的。有一些bug

我的意思是:

def processLine(line): 
    #process something 
    print "result" 
    pool = Pool(processes = 8) 

if __name__ == '__main__': 
    for line in sys.stdin: 
    lines.append(line) 
    if len(lines) >= 100000: 
     pool.map_async(processLine, lines, 2000) 
     lines = [] #to clear buffer 
    pool.map_async(processLine, lines, 2000) 
    pool.close() 
    pool.join() 

我用map_async,因为它具有可配置CHUNK_SIZE所以它是更有效的,如果有很多,其处理时间相当短线。