2016-11-24 78 views
1

我有一个multiprocessing脚本与pool.map工作。问题是并非所有进程都需要很长时间才能完成,因此有些进程会因为等待所有进程完成而睡着(与this question中的问题相同)。一些文件在不到一秒钟内完成,其他文件需要几分钟(或几小时)。Python多处理池地图和imap

如果我正确理解了手册(and this post),则pool.imap并未等待所有进程完成,如果完成,它将提供一个新文件进行处理。当我尝试这些时,脚本正在加速处理文件,小文件按预期处理,大文件(需要更多时间处理)直到最后才会结束(在没有通知的情况下死亡?)。这是pool.imap的正常行为,还是我需要添加更多命令/参数?当我在else部分中添加time.sleep(100)作为测试时,它正在处理更大的文件,但其他进程睡着了。有什么建议么 ?谢谢

def process_file(infile): 
    #read infile 
    #compare things in infile 
    #acquire Lock, save things in outfile, release Lock 
    #delete infile 

def main(): 
    #nprocesses = 8 
    global filename 
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9'] 
    for d in pathlist: 
     os.chdir(d)  
     todolist = [] 
     for infile in os.listdir(): 
      todolist.append(infile) 
     try: 
      p = Pool(processes=nprocesses) 
      p.imap(process_file, todolist) 
     except KeyboardInterrupt:     
      print("Shutting processes down") 
      # Optionally try to gracefully shut down the worker processes here.  
      p.close() 
      p.terminate() 
      p.join() 
     except StopIteration: 
      continue  
     else: 
      time.sleep(100) 
      os.chdir('..') 
     p.close() 
     p.join() 

if __name__ == '__main__': 
    main()  
+0

我一直在思考的'imap'问题。 'Map'正在等待所有进程完成以返回结果。 Imap'在第一个过程完成后立即返回结果,并可能终止其他过程并给出所有新的工作。这是正确的吗? – avierstr

回答

1

由于您已将所有文件放入列表中,因此可以将它们直接放入队列中。然后,队列将与您的子进程共享,从队列中获取文件名并执行其操作。不需要两次(首先进入列表,然后通过Pool.imap进入pickle列表)。 Pool.imap正在做同样的事情,但没有你知道它。

todolist = [] 
for infile in os.listdir(): 
    todolist.append(infile) 

可以被替换为:

todolist = Queue() 
for infile in os.listdir(): 
    todolist.put(infile) 

然后将完整的解决方案看起来像:

def process_file(inqueue): 
    for infile in iter(inqueue.get, "STOP"): 
     #do stuff until inqueue.get returns "STOP" 
    #read infile 
    #compare things in infile 
    #acquire Lock, save things in outfile, release Lock 
    #delete infile 

def main(): 
    nprocesses = 8 
    global filename 
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9'] 
    for d in pathlist: 
     os.chdir(d)  
     todolist = Queue() 
     for infile in os.listdir(): 
      todolist.put(infile) 
     process = [Process(target=process_file, 
         args=(todolist) for x in range(nprocesses)] 
     for p in process: 
      #task the processes to stop when all files are handled 
      #"STOP" is at the very end of queue 
      todolist.put("STOP") 
     for p in process: 
      p.start() 
     for p in process: 
      p.join()  
if __name__ == '__main__': 
    main() 
+0

非常感谢RaJa!现在它按我的意图工作。为了完整性:'范围(nprocesses)]中的'args =(todolist)]'必须是'范围(nprocesses)中的x'的'args =(todolist,))'''。那天晚上我一直在Queue尝试,但到目前为止发生了很多错误。现在我很清楚它是如何工作的! – avierstr