2017-06-21 130 views
0

我想在处理超过2GB的csv文件时使用多处理器。问题在于输入只是在一个进程中被消耗,而其他进程似乎是空闲的。迭代器上的Python多处理器

以下重新创建我遇到的问题。是否有可能使用迭代器使用多进程?将内存全部输入到内存中是不理想的。

import csv 
import multiprocessing 
import time 

def something(row): 
    # print row[0] 
    # pass 
    return row 

def main(): 
    start = time.time() 
    i = open("input.csv") 
    reader = csv.reader(i, delimiter='\t') 

    print reader.next() 

    p = multiprocessing.Pool(16) 
    print "Starting processes" 
    j = p.imap(something, reader, chunksize=10000) 

    count= 1 
    while j: 
     print j.next() 

    print time.time() - start 


if __name__ == '__main__': 
    main() 

回答

1

我认为你很困惑“进程”与“处理器”。

您的程序肯定是同时产生多个进程,您可以在程序运行时在系统或资源监视器中进行验证。主要使用的处理器或CPU内核数量主要取决于操作系统,并且与委派给每个进程的任务密集程度有关。

做一点点修改你的something功能,引入睡眠时间,模拟工作正在该函数来完成:

def something(row): 
    time.sleep(.4) 
    return row 

现在,先在你的文件中运行功能顺序您的每一行,并注意到每个结果都会以每400毫秒一个一个的速度出现。

def main(): 
    with open("input.csv") as i: 
     reader = csv.reader(i) 
     print (next(reader)) 

     # SEQUENTIALLY: 
     for row in reader: 
      result = something(row) 
      print (result) 

现在尝试与工人的池。保持在一个较低的数字,说4名工人,你会看到的结果是每年400毫秒,但是在4(或大致工人池中的数量)的群体:

def main(): 
    with open("input.csv") as i: 
     reader = csv.reader(i) 
     print (next(reader)) 

     # IN PARALLEL 
     print ("Starting processes") 
     p = multiprocessing.Pool(4) 
     results = p.imap(something, reader) 
     for result in results: 
      print(result) # one result is the processing of 4 rows... 

虽然并行运行,检查系统监视器并查找正在执行多少“python”进程。应该是一个加上工人的数量。

我希望这个解释很有用。

+0

你的回答非常有帮助,但它似乎提出了几个问题。当我运行代码而没有在()中加入睡眠时,一个进程会消耗大量内存。当我将睡眠(0.4)添加到某物中时,此问题不存在。这有什么理由吗? – BHa

+0

我能说什么?这些过程确实并行运行。但是如果不知道关于你的数据和你的功能的其他信息,我不能告诉你其他任何事情。如果您的进程比CPU密集型的更多,那么多处理可能无济于事。 – chapelo