2017-09-01 38 views
14

问题描述
我将代码从this answer稍微调整了一下(见下文)。但是,在Linux上运行此脚本(命令行:python script_name.py)时,它将打印所有作业的jobs running: x,但之后似乎卡住了。但是,当我使用spawn方法(mp.set_start_method('spawn'))时,它运行良好,并立即开始打印counter变量的值(请参阅listener方法)。使用Python 3.6.1在Linux/Intel Xeon上使用“fork”上下文块进行多处理?

问题

  • 为什么产卵过程时,它才起作用?
  • 我怎样才能调整代码,以便它与FORC工作(因为它可能更快)

代码

import io 
import csv 
import multiprocessing as mp 

NEWLINE = '\n' 

def file_searcher(file_path): 
    parsed_file = csv.DictReader(io.open(file_path, 'r', encoding='utf-8'), delimiter='\t') 

    manager = mp.Manager() 
    q = manager.Queue() 
    pool = mp.Pool(mp.cpu_count()) 

    # put listener to work first 
    watcher = pool.apply_async(listener, (q,)) 

    jobs = [] 
    for row in parsed_file: 
     print('jobs running: ' + str(len(jobs) + 1)) 
     job = pool.apply_async(worker, (row, q)) 
     jobs.append(job) 

    # collect results from the workers through the pool result queue 
    for job in jobs: 
     job.get() 

    #now we are done, kill the listener 
    q.put('kill') 
    pool.close() 
    pool.join() 

def worker(genome_row, q): 
    complete_data = [] 
    #data processing 
    #ftp connection to retrieve data 
    #etc. 
    q.put(complete_data) 
    return complete_data 

def listener(q): 
    '''listens for messages on the q, writes to file. ''' 
    f = io.open('output.txt', 'w', encoding='utf-8') 
    counter = 0 
    while 1: 
     m = q.get() 
     counter +=1 
     print(counter) 
     if m == 'kill': 
      break 
     for x in m: 
      f.write(x + NEWLINE) 
     f.flush() 
    f.close() 

if __name__ == "__main__": 
    file_searcher('path_to_some_tab_del_file.txt') 

处理器信息

Architecture:   x86_64 
CPU op-mode(s):  32-bit, 64-bit 
Byte Order:   Little Endian 
CPU(s):    20 
On-line CPU(s) list: 0-19 
Thread(s) per core: 1 
Core(s) per socket: 1 
Socket(s):    20 
NUMA node(s):   2 
Vendor ID:    GenuineIntel 
CPU family:   6 
Model:     45 
Model name:   Intel(R) Xeon(R) CPU E5-2660 v3 @ 2.60GHz 
Stepping:    2 
CPU MHz:    2596.501 
BogoMIPS:    5193.98 
Hypervisor vendor:  VMware 
Virtualization type: full 
L1d cache:    32K 
L1i cache:    32K 
L2 cache:    256K 
L3 cache:    25600K 
NUMA node0 CPU(s):  0-19 

的Linux内核版本

3.10.0-514.26.2.el7.x86_64 

Python版本

Python 3.6.1 :: Continuum Analytics, Inc. 

LOG
我添加的代码由@yacc的建议,这将提供以下日志:

[server scripts]$ python main_v3.py 
[INFO/SyncManager-1] child process calling self.run() 
[INFO/SyncManager-1] created temp directory /tmp/pymp-2a9stjh6 
[INFO/SyncManager-1] manager serving at '/tmp/pymp-2a9stjh6/listener-jxwseclw' 
[DEBUG/MainProcess] requesting creation of a shared 'Queue' object 
[DEBUG/SyncManager-1] 'Queue' callable returned object with id '7f0842da56a0' 
[DEBUG/MainProcess] INCREF '7f0842da56a0' 
[DEBUG/MainProcess] created semlock with handle 139673691570176 
[DEBUG/MainProcess] created semlock with handle 139673691566080 
[DEBUG/MainProcess] created semlock with handle 139673691561984 
[DEBUG/MainProcess] created semlock with handle 139673691557888 
[DEBUG/MainProcess] added worker 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0' 
[DEBUG/MainProcess] added worker 
[INFO/ForkPoolWorker-2] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-4] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-3] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-6] child process calling self.run() 
[INFO/ForkPoolWorker-5] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-7] child process calling self.run() 
[INFO/ForkPoolWorker-8] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-9] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-10] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-11] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-12] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-13] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-14] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-15] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-16] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-17] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-18] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-19] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-20] child process calling self.run() 
jobs running: 1 
jobs running: 2 
jobs running: 3 
jobs running: 4 
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-21] child process calling self.run() 
jobs running: 5 
jobs running: 6 
jobs running: 7 
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0' 
jobs running: 8 
written to file 
jobs running: 9 
jobs running: 10 
[DEBUG/ForkPoolWorker-2] thread 'MainThread' does not own a connection 
[DEBUG/ForkPoolWorker-2] making connection to manager 
jobs running: 11 
jobs running: 12 
jobs running: 13 
jobs running: 14 
jobs running: 15 
[DEBUG/SyncManager-1] starting server thread to service 'ForkPoolWorker-2' 
jobs running: 16 
jobs running: 17 
jobs running: 18 
jobs running: 19 
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0' 
+0

你能提供关于Linux,Python和MP包和硬件/处理器的版本的详细信息? – yacc

+0

我添加了您所要求的信息(请参阅编辑)@yacc我无法弄清楚如何获取MP包版本。我希望你能找到问题 – CodeNoob

+0

多处理是核心库的一部分,所以它和其他版本一样。对我来说(Python 3.4.3)代码工作正常(我改变的只是删除csvreader和读取普通文件)。你有没有尝试在其他地方复制它? – MacHala

回答

0

不知道为什么你有这个问题,但不是这个代码是一个简单的事情,做同样的事情?

import csv 
import multiprocessing as mp 

NEWLINE = '\n' 

def file_searcher(file_path): 
    with open(file_path, 'r', encoding='utf-8') as input_file, 
      open('output.txt', 'w', encoding='utf-8') as f, 
      mp.Pool() as pool: 
     parsed_file = csv.DictReader(input_file, delimiter='\t') 
     for result in pool.imap(worker, parsed_file): 
      f.write(result + NEWLINE) 

def worker(genome_row): 
    complete_data = [] 
    #data processing 
    #ftp connection to retrieve data 
    #etc. 
    return complete_data 

if __name__ == "__main__": 
    file_searcher('path_to_some_tab_del_file.txt') 
相关问题