2013-04-04 189 views
5

这不是很重要,只是一个愚蠢的实验。我想创建自己的消息传递。 我想有一个队列字典,其中每个键是进程的PID。 因为我想让流程(由Process()创建)交换将消息插入他们想要发送给它的进程的队列中(知道它的pid)。 这是一个愚蠢的代码:python字典之间的进程队列

from multiprocessing import Process, Manager, Queue 
from os import getpid 
from time import sleep 

def begin(dic, manager, parentQ): 
    parentQ.put(getpid()) 
    dic[getpid()] = manager.Queue() 
    dic[getpid()].put("Something...") 

if __name__== '__main__': 
    manager = Manager() 
    dic = manager.dict() 
    parentQ = Queue() 

    p = Process(target = begin, args=(dic, manager, parentQ)) 
    p.start() 
    son = parentQ.get() 
    print son 
    sleep(2) 
    print dic[son].get() 

dic[getpid()] = manager.Queue(),这工作正常。但是,当我执行 dic[son].put()/get()我得到这个消息:

Process Process-2: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap 
    self.run() 
    File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run 
    self._target(*self._args, **self._kwargs) 
    File "mps.py", line 8, in begin 
    dic[getpid()].put("Something...") 
    File "<string>", line 2, in __getitem__ 
    File "/usr/lib/python2.7/multiprocessing/managers.py", line 773, in _callmethod 
    raise convert_to_error(kind, result) 
RemoteError: 
--------------------------------------------------------------------------- 
Unserializable message: ('#RETURN', <Queue.Queue instance at 0x8a92d0c>) 
--------------------------------------------------------------------------- 

你知道什么是应该做的正确方法?

回答

1

我相信你的代码失败了,因为队列不可序列化,就像回溯说的那样。 multiprocessing.Manager()对象可以为你创建一个共享的字典,就像你在这里完成的那样,但是存储在字典中的值仍然需要可序列化(或picklable in Pythonese)。如果没有问题与不具有访问对方的队列的子过程,那么这应该为你工作:

from multiprocessing import Process, Manager, Queue 
from os import getpid 

number_of_subprocesses_i_want = 5 

def begin(myQ): 
    myQ.put("Something sentimental from your friend, PID {0}".format(getpid())) 
    return 

if __name__== '__main__': 
    queue_dic = {} 
    queue_manager = Manager() 

    process_list = [] 

    for i in xrange(number_of_subprocesses_i_want): 
     child_queue = queue_manager.Queue() 

     p = Process(target = begin, args=(child_queue,)) 
     p.start() 
     queue_dic[p.pid] = child_queue 
     process_list.append(p) 

    for p in process_list: 
     print(queue_dic[p.pid].get()) 
     p.join() 

这给你留下一本字典的键是子进程,和值是其各自的队列,可以从主进程中使用。

我不认为你的原始目标是可以通过队列实现的,因为你希望子进程使用的队列在创建时必须传递给进程,所以当你启动更多的进程时,你没有办法给出现有进程访问新队列。

一种可能的方式有进程间通信将让每个人都共享同一个队列回传递消息给你用某种头的,比如在一个元组捆绑的主要过程:

(destination_pid, sender_pid, message) 

..并且主要读取destination_pid并将(sender_pid,消息)指向该子进程的队列。当然,这意味着当需要与新进程通信时,您需要一种通知现有进程的方法。