2016-11-23 68 views
1

我看到某处如何处理大型数据集(说的文本行)与多快模块暗示,像这样:python3 multiprocessing.Process方法未能

... (form batch_set = nump batches [= lists of lines to process], batch_set 
    is a list of lists of strings (batches)) 
nump = len(batch_set) 
output = mp.Queue() 
processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)] 

for p in processes: 
    p.start() 
for p in processes: 
    p.join() 

results = sorted([output.get() for p in processes]) 
... (do something with the processed outputs, ex print them in order, 
    given that each proc_lines function returns a couple (i, out_batch)) 

然而,当我运行代码有少量的行/批次,它工作正常 [例如:'./code.py -x 4:10'为nump = 4和numb = 10(行/批次)],而在 某些行/批处理挂起[例如:'./code.py -x 4:4000'],当我中断它时,我看到一个关于_wait_for_tstate_lock和系统 线程库的追踪提示。看来,代码没有达到最后的代码所示以上 行...

我提供了下面的代码,如果有人需要它来回答这是为什么 发生,如何解决它。

#!/usr/bin/env python3 

import sys 
import multiprocessing as mp 


def fabl(numb, nump): 
    ''' 
    Form And Batch Lines: form nump[roc] groups of numb[atch] indexed lines 
    '<idx> my line here' with indexes from 1 to (nump x numb). 
    ''' 
    ret = [] 
    idx = 1 
    for _ in range(nump): 
     cb = [] 
     for _ in range(numb): 
      cb.append('%07d my line here' % idx) 
      idx += 1 
     ret.append(cb) 
    return ret 


def proc_lines(i, output, rows_in): 
    ret = [] 
    for row in rows_in: 
     row = row[0:8] + 'some other stuff\n' # replacement for the post-idx part 
     ret.append(row) 

    output.put((i,ret)) 
    return 


def mp_proc(batch_set): 
    'given the batch, disperse it to the number of processes and ret the results' 
    nump = len(batch_set) 
    output = mp.Queue() 
    processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)] 

    for p in processes: 
     p.start() 
    for p in processes: 
     p.join() 

    print('waiting for procs to complete...') 
    results = sorted([output.get() for p in processes]) 
    return results 


def write_set(proc_batch_set, fout): 
    'write p[rocessed]batch_set' 
    for _, out_batch in proc_batch_set: 
     for row in out_batch: 
      fout.write(row) 
    return 


def main(): 
    args = sys.argv 
    if len(args) < 2: 
     print(''' 
    run with args: -x [ NumProc:BatchSize ] 
     (ex: '-x' | '-x 4:10' (default values) | '-x 4:4000' (hangs...)) 
     ''') 
     sys.exit(0) 

    numb = 10 # suppose we need this number of lines/batch : BatchSize 
    nump = 4 # number of processes to use.    : NumProcs 
    if len(args) > 2 and ':' in args[2]: # use another np:bs 
     nump, numb = map(int, args[2].split(':')) 

    batch_set = fabl(numb, nump) # proc-batch made in here: nump (groups) x numb (lines) 
    proc_batch_set = mp_proc(batch_set) 

    with open('out-min', 'wt') as fout: 
     write_set(proc_batch_set, fout) 

    return 

if __name__ == '__main__': 
    main() 

回答

1

Queue有一定的能力,如果你不清空,而Process运行能得到充分。这不会阻止您的流程执行,但如果put未完成,您将无法加入Process

所以我只是修改mp_proc功能使得:

def mp_proc(batch_set): 
    'given the batch, disperse it to the number of processes and ret the results' 
    n_process = len(batch_set) 
    output = mp.Queue() 
    processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) 
       for i in range(process)] 

    for p in processes: 
     p.start() 

    # Empty the queue while the processes are running so there is no 
    # issue with uncomplete `put` operations. 
    results = sorted([output.get() for p in processes]) 

    # Join the process to make sure everything finished correctly 
    for p in processes: 
     p.join() 

    return results 
+0

所以,基本上,这将意味着,我正在试图通过过多添加到它,从而阻止系统滥用队列。我发现这篇文章说的相同:http://stackoverflow.com/questions/31665328/python-3-multiprocessing-queue-deadlock-when-calling-join-before-the-queue-is-em,以便可能是问题。谢谢! – vuvu

+0

我用p.join()语句替换了一个稍大的块,在p.start()之后,只要有足够的项存储在那里,并且在最后,所有的子过程都已经完成,即使是非常大的批次,这个过程也是成功的。 – vuvu