2014-09-02 81 views
3

以下代码有效,但由于传递大型数据集而速度很慢。在实际实现中,创建过程和发送数据所需的速度与计算时间几乎相同,因此在创建第二个过程时,第一个过程几乎完成了计算,并行化?无意义。通过共享内存传递一组字节的多处理

代码与此问题Multiprocessing has cutoff at 992 integers being joined as result中的代码相同,建议的更改工作如下,并在以下实施。然而,我假设其他人遇到了常见问题,需要花费很长时间来腌制大量数据。

我看到答案使用multiprocessing.array传递共享内存数组。我有一个〜4000的索引数组,但每个索引都有一个包含200个键/值对的字典。数据仅由每个进程读取,完成一些计算,然后返回矩阵(4000x3)(不含字典)。

这样的答案Is shared readonly data copied to different processes for Python multiprocessing?使用地图。是否有可能维护下面的系统并实现共享内存?有没有一种有效的方法可以将数据发送到每个进程,并使用一系列的字典,比如将字典封装在某个管理器中,然后将其放入multiprocessing.array中?

import multiprocessing 

def main(): 
    data = {} 
    total = [] 
    for j in range(0,3000): 
     total.append(data) 
     for i in range(0,200): 
      data[str(i)] = i 

    CalcManager(total,start=0,end=3000) 

def CalcManager(myData,start,end): 
    print 'in calc manager' 
    #Multi processing 
    #Set the number of processes to use. 
    nprocs = 3 
    #Initialize the multiprocessing queue so we can get the values returned to us 
    tasks = multiprocessing.JoinableQueue() 
    result_q = multiprocessing.Queue() 
    #Setup an empty array to store our processes 
    procs = [] 
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start 
    #Create all the processes while dividing the work appropriately 
    for i in range(nprocs): 
     print 'starting processes' 
     new_end = new_start + interval 
     #Make sure we dont go past the size of the data 
     if new_end > end: 
      new_end = end 
     #Generate a new process and pass it the arguments 
     data = myData[new_start:new_end] 
     #Create the processes and pass the data and the result queue 
     p = multiprocessing.Process(target=multiProcess,args=(data,new_start,new_end,result_q,i)) 
     procs.append(p) 
     p.start() 
     #Increment our next start to the current end 
     new_start = new_end+1 
    print 'finished starting'  

    #Print out the results 
    for i in range(nprocs): 
     result = result_q.get() 
     print result 

    #Joint the process to wait for all data/process to be finished 
    for p in procs: 
     p.join() 

#MultiProcess Handling 
def multiProcess(data,start,end,result_q,proc_num): 
    print 'started process' 
    results = [] 
    temp = [] 
    for i in range(0,22): 
     results.append(temp) 
     for j in range(0,3): 
      temp.append(j) 
    result_q.put(results) 
    return 

if __name__== '__main__': 
    main() 

通过只把字典的列表为管理者解决

,问题解决了。

manager=Manager() 
d=manager.list(myData) 

似乎持有该列表的经理也管理该列表包含的字典。启动时间有点慢,所以看起来数据仍然被复制,但是它在数据开始和之后的数据被分割的过程中一次完成。

import multiprocessing 
import multiprocessing.sharedctypes as mt 
from multiprocessing import Process, Lock, Manager 
from ctypes import Structure, c_double 

def main(): 
    data = {} 
    total = [] 
    for j in range(0,3000): 
     total.append(data) 
     for i in range(0,100): 
      data[str(i)] = i 

    CalcManager(total,start=0,end=500) 

def CalcManager(myData,start,end): 
    print 'in calc manager' 
    print type(myData[0]) 

    manager = Manager() 
    d = manager.list(myData) 

    #Multi processing 
    #Set the number of processes to use. 
    nprocs = 3 
    #Initialize the multiprocessing queue so we can get the values returned to us 
    tasks = multiprocessing.JoinableQueue() 
    result_q = multiprocessing.Queue() 
    #Setup an empty array to store our processes 
    procs = [] 
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start 
    #Create all the processes while dividing the work appropriately 
    for i in range(nprocs): 
     new_end = new_start + interval 
     #Make sure we dont go past the size of the data 
     if new_end > end: 
      new_end = end 
     #Generate a new process and pass it the arguments 
     data = myData[new_start:new_end] 
     #Create the processes and pass the data and the result queue 
     p = multiprocessing.Process(target=multiProcess,args=(d,new_start,new_end,result_q,i)) 
     procs.append(p) 
     p.start() 
     #Increment our next start to the current end 
     new_start = new_end+1 
    print 'finished starting'  

    #Print out the results 
    for i in range(nprocs): 
     result = result_q.get() 
     print len(result) 

    #Joint the process to wait for all data/process to be finished 
    for p in procs: 
     p.join() 

#MultiProcess Handling 
def multiProcess(data,start,end,result_q,proc_num): 
    #print 'started process' 
    results = [] 
    temp = [] 
    data = data[start:end] 
    for i in range(0,22): 
     results.append(temp) 
     for j in range(0,3): 
      temp.append(j) 
    print len(data)   
    result_q.put(results) 
    return 

if __name__ == '__main__': 
    main() 
+0

你的代码似乎表明对于一个输入范围,你只产生一个输出。这是你想要的吗? – 2014-09-02 15:02:44

+0

要使用共享内存,您必须将您的数组字符串转换为“ctypes”对象,然后使用['multiprocessing.sharedctypes'](https://docs.python.org/2.7/library/multiprocessing。 HTML#模块multiprocessing.sharedctypes)。我不确定这对您的用例是否可行。 – dano 2014-09-02 17:03:03

+0

@HaiVu它为每个进程返回一个浮点数组。结果=范围(0,(992))只是样本列表。 – user1938107 2014-09-02 23:43:46

回答

2

您可以通过使用multiprocessing.Manager存储在管理服务器的列表,并在各子看到一些改善通过从一个共享列表拉他们,而不是复制切片,以每个子进程从字典进程访问项目:

def CalcManager(myData,start,end): 
    print 'in calc manager' 
    print type(myData[0]) 

    manager = Manager() 
    d = manager.list(myData) 

    nprocs = 3 
    result_q = multiprocessing.Queue() 
    procs = [] 

    interval = (end-start)/nprocs 
    new_start = start 

    for i in range(nprocs): 
     new_end = new_start + interval 
     if new_end > end: 
      new_end = end 
     p = multiprocessing.Process(target=multiProcess, 
            args=(d, new_start, new_end, result_q, i)) 
     procs.append(p) 
     p.start() 
     #Increment our next start to the current end 
     new_start = new_end+1 
    print 'finished starting'   

    for i in range(nprocs): 
     result = result_q.get() 
     print len(result) 

    #Joint the process to wait for all data/process to be finished 
    for p in procs: 
     p.join() 

建立在此之前拷贝整个data列表到Manager过程你的任何工人。 Manager返回一个Proxy对象,该对象允许共享访问list。然后您只需将Proxy传递给工作人员,这意味着他们的启动时间将大大减少,因为不再需要复制data列表的切片。这里的缺点是访问这个列表的速度会比较慢,因为访问需要通过IPC进入管理进程。无论这是否真的有助于提高性能,都取决于您在工作流程中对list所做的工作,但值得一试,因为它只需要很少的代码更改。

2

看你的问题,我认为以下几点:

  • 对于每个项目在myData,你想返回的输出(某种形式的矩阵)
  • 您创建了一个JoinableQueue( tasks)可能是保持输入,但不知道如何使用它

守则

import logging 
import multiprocessing 


def create_logger(logger_name): 
    ''' Create a logger that log to the console ''' 
    logger = logging.getLogger(logger_name) 
    logger.setLevel(logging.DEBUG) 

    # create console handler and set appropriate level 
    ch = logging.StreamHandler() 
    formatter = logging.Formatter("%(processName)s %(funcName)s() %(levelname)s: %(message)s") 
    ch.setFormatter(formatter) 
    logger.addHandler(ch) 
    return logger 

def main(): 
    global logger 
    logger = create_logger(__name__) 
    logger.info('Main started') 
    data = [] 
    for i in range(0,100): 
     data.append({str(i):i}) 

    CalcManager(data,start=0,end=50) 
    logger.info('Main ended') 

def CalcManager(myData,start,end): 
    logger.info('CalcManager started') 
    #Initialize the multiprocessing queue so we can get the values returned to us 
    tasks = multiprocessing.JoinableQueue() 
    results = multiprocessing.Queue() 

    # Add tasks 
    for i in range(start, end): 
     tasks.put(myData[i]) 

    # Create processes to do work 
    nprocs = 3 
    for i in range(nprocs): 
     logger.info('starting processes') 
     p = multiprocessing.Process(target=worker,args=(tasks,results)) 
     p.daemon = True 
     p.start() 

    # Wait for tasks completion, i.e. tasks queue is empty 
    try: 
     tasks.join() 
    except KeyboardInterrupt: 
     logger.info('Cancel tasks') 

    # Print out the results 
    print 'RESULTS' 
    while not results.empty(): 
     result = results.get() 
     print result 

    logger.info('CalManager ended') 

def worker(tasks, results): 
    while True: 
     try: 
      task = tasks.get() # one row of input 
      task['done'] = True # simular work being done 
      results.put(task) # Save the result to the output queue 
     finally: 
      # JoinableQueue: for every get(), we need a task_done() 
      tasks.task_done() 


if __name__== '__main__': 
    main() 

讨论

  • 对于多过程的情况下,我建议使用logging模块,因为它提供了几个优点:
    • 这是thread-与过程安全;这意味着你将不会有情况,其中一个进程的输出混杂在一起
    • 您可以配置日志记录,以显示进程名称,函数名称 - 非常方便的进行调试
  • CalcManager本质上是一个任务管理器,它不下面
    1. 创建三个过程
    2. 填充输入队列,tasks
    3. 等待任务完成
    4. 打印出来的结果
  • 注意,创建进程时,我将它们标记为守护,这意味着他们将杀死主程序退出时。你不必担心杀死他们
  • worker就是工作完成
    • 他们每个人都始终运行(while True循环)
    • 通过每一次循环中,他们将获得输入的一个单元,做一些处理,然后将结果放到输出中
    • 任务完成后,它会调用task_done(),以便主进程知道所有作业何时完成。我把task_donefinally子句中,以确保其运行,即使在处理过程中发生了错误
+0

谢谢你的回答,我想你没有看到我的更新,这就是为什么你不断问关于回报的问题。它会改变这个策略吗? – user1938107 2014-09-03 01:10:40

+0

也适用于大数据,这似乎不起作用。当任务和连接切换并且数据卡住时,它看起来好像与我的原始代码有相同的问题。 ...并不是说我真的很明白发生了什么。 – user1938107 2014-09-03 01:22:23