2017-06-02 61 views
0

我想将数据发送到多个进程。每个进程都会对该数据执行其他操作,并等待下一个数据。我有这样的事情:将数据发送到Python中的多个进程2

from multiprocessing import Process, Manager 

def do_work1(in_queue): 
    while True: 
     item = in_queue.get() 
     # exit signal 
     if item == None: 
      return 
     print "worker 1 : {}".format(item) 

def do_work2(in_queue): 
    while True: 
     item = in_queue.get() 
     # exit signal 
     if item == None: 
      return 
     print "worker 2: {}".format(item) 



if __name__ == "__main__": 
    num_workers = 2 

    manager = Manager() 

    work = manager.Queue(num_workers) 

    # start for workers 
    pool = [] 
    p = Process(target=do_work1, args=(work,)) 
    p.start() 
    pool.append(p) 
    p2 = Process(target=do_work2, args=(work,)) 
    p2.start() 
    pool.append(p2) 



    work.put("1") 
    work.put("2") 

    for p in pool: 
     p.join() 

但运行此代码后,我得到:

worker 1 : 1 

worker 1 : 2 

或者:

worker 2 : 1 

worker 1 : 2 

我希望这样的事情:

worker 1 : 1 

worker 2 : 1 

worker 1 : 2 

worker 2 : 2 

如果我想获得什么,我应该改变结果如上?

回答

0

你可以使用多管道将数据发送到每一道工序:

https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Pipe

假设数据能够被腌制:

https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled

你就可以有进程等待管道末端的数据,并在发送数据时遍历所有管道的列表。

+0

嗯消除手工处理队列,numpy.ndarray似乎无法腌制 – John

+0

它可能不是文档中列出,因为它不是默认库在Python中。它能够被腌制。 – njoosse

+0

它适用于ndarray。非常感谢 – John

0

您可以通过使用multiprocessing.Pool

import multiprocessing 


def do_work1(item): 
    print "worker 1: {}".format(item) 


def do_work2(item): 
    print "worker 2: {}".format(item) 


if __name__ == '__main__': 
    pool = multiprocessing.Pool(2) 

    pool.apply_async(do_work1, (1,)) 
    pool.apply_async(do_work2, (1,)) 
    pool.apply_async(do_work1, (2,)) 
    pool.apply_async(do_work2, (2,)) 

    pool.close() 
    pool.join()