2013-03-20 127 views
6

我知道Stack Exchange上有很多帖子与将多处理结果写入单个文件相关,并且在阅读完这些帖子后我开发了自己的代码。我试图实现的是并行运行“RevMapCoord”函数,并使用multiprocess.queue将其结果写入单个文件中。但是我排队工作时遇到问题。我的代码:使用队列写入同一文件的Python多重处理

def RevMapCoord(list): 
    "Read a file, Find String and Do something" 

def feed(queue, parlist): 
    for par in parlist: 
     print ('Echo from Feeder: %s' % (par)) 
     queue.put(par) 
    print ('**Feeder finished queing**') 

def calc(queueIn, queueOut): 
    print ('Worker function started') 
    while True: 
     try: 
      par = queueIn.get(block = False) 
      res = RevMapCoord(final_res) 
      queueOut.put((par,res)) 
     except: 
      break 

def write(queue, fname): 
    fhandle = open(fname, "w") 
    while True: 
     try: 
      par, res = queue.get(block = False) 
      print >>fhandle, par, res 
     except: 
      break 
    fhandle.close() 


feedProc = Process(target = feed , args = (workerQueue, final_res)) 
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nproc)] 
writProc = Process(target = write, args = (writerQueue, sco_inp_extend_geno)) 

feedProc.start() 
print ('Feeder is joining') 
feedProc.join() 
for p in calcProc: 
    p.start() 
for p in calcProc: 
    p.join() 
writProc.start() 
writProc.join() 

当我运行这个代码脚本在“feedProc.start()”步骤时,从屏幕上的最后几行输出显示打印语句从结束“feedProc.start()”:

Echo from Feeder: >AK779,AT61680,50948-50968,50959,6,0.406808,Ashley,Dayne 
Echo from Feeder: >AK832,AT30210,1091-1111,1102,7,0.178616,John,Caine 
**Feeder finished queing** 

但在执行下一行“feedProc.join()”前挂起。代码没有错误并继续运行,但什么都不做(挂起)。请告诉我我正在犯什么错误。

回答

0

我实现写作成绩。从uing在Python3“map_async”功能多处理单个文件下面是我写的函数:

def PPResults(module,alist):##Parallel processing 
    npool = Pool(int(nproc))  
    res = npool.map_async(module, alist) 
    results = (res.get())###results returned in form of a list 
    return results 

所以,我提供的“的a_list”和“模块”参数列表该功能是在函数进行处理并返回结果。上述函数继续以列表的形式收集结果,并在所有段落返回时返回来自'a_list'的米已经被处理。结果可能不是正确的顺序,但顺序对我来说并不重要,这很好。 “结果”列表中可以重复和单独的结果写在文件中,如:

fh_out = open('./TestResults', 'w') 
for i in results:##Write Results from list to file 
    fh_out.write(i) 

为了保持到我在我的问题(上)中提到,我们可能需要使用“队列”类似的结果的顺序。虽然我可以修复代码,但我相信这里不需要提及。

感谢

AK

9

我认为你应该将你的例子缩减为基础。例如:

from multiprocessing import Process, Queue 

def f(q): 
    q.put('Hello') 
    q.put('Bye') 
    q.put(None) 

if __name__ == '__main__': 
    q = Queue() 
    p = Process(target=f, args=(q,)) 
    p.start() 
    with open('file.txt', 'w') as fp: 
     while True: 
      item = q.get() 
      print(item) 
      if item is None: 
       break 
      fp.write(item) 
    p.join() 

这里我有两个进程(主进程,一个p)。 p将字符串放入由主进程检索的队列中。当主进程仍然没有找到(我使用指示哨兵:“我做了”它打破了环

扩展这一许多进程(或线程)是微不足道的

+2

你应该尝试运行您的示例(它给出了一个错误)。这种方式不能在队列中放置多个项目。实际上你只是把一个项目放在一个列表上。 – Gerrat 2013-05-08 17:27:22

+0

'TypeError:期望一个字符缓冲区对象'我有错误:| – nk9 2013-08-05 18:16:11

+1

@ b1- * new *(和正确的,谢谢Gerrat)版本与python 2.7.5和3.2.3一起使用。试一试! – Hernan 2013-08-07 02:08:05