2011-08-16 32 views
8

我想动态创建多个Process es,其中每个实例都有一个来自其他实例的传入消息的队列,并且每个实例也可以创建新实例。所以我们最终得到一个互相发送的进程网络。每个实例都可以发送给其他人。如何在Python多重处理中动态创建每进程队列

下面的代码会做我想做的:它采用了Manager.dict()存储队列,确保更新传播和Lock()保护写访问的队列。但是,当添加新的队列时,它会抛出"RuntimeError: Queue objects should only be shared between processes through inheritance"

问题是,在启动时,我们不知道最终会需要多少队列,所以我们必须动态创建它们。但由于除了施工时我们不能分享队列,所以我不知道该怎么做。

我知道,一个可能性是使queues一个全局变量,而不是管理一个传入到__init__:这个问题的话,我的理解是,增加的queues变量将不会传播到其他进程。

编辑我正在进化算法。 EA是一种机器学习技术。一个EA模拟一个“人口”,它随着适者生存,交叉和变异而演变。在平行 EA,因为在这里,我们也有人口之间的迁移,对应于进程间通信。岛屿也可以产生新的岛屿,所以我们需要一种在动态创建的进程之间发送消息的方式。

import random, time 
from multiprocessing import Process, Queue, Lock, Manager, current_process 
try: 
    from queue import Empty as EmptyQueueException 
except ImportError: 
    from Queue import Empty as EmptyQueueException 

class MyProcess(Process): 
    def __init__(self, queues, lock): 
     super(MyProcess, self).__init__(target=lambda x: self.run(x), 
            args=tuple()) 
     self.queues = queues 
     self.lock = lock 
     # acquire lock and add a new queue for this process 
     with self.lock: 
      self.id = len(list(self.queues.keys())) 
      self.queues[self.id] = Queue() 

    def run(self): 
     while len(list(self.queues.keys())) < 10: 

      # make a new process 
      new = MyProcess(self.lock) 
      new.start() 

      # send a message to a random process 
      dest_key = random.choice(list(self.queues.keys())) 
      dest = self.queues[dest_key] 
      dest.put("hello to %s from %s" % (dest_key, self.id)) 

      # receive messages 
      message = True 
      while message: 
       try: 
        message = self.queues[self.id].get(False) # don't block 
        print("%s received: %s" % (self.id, message)) 
       except EmptyQueueException: 
        break 

      # what queues does this process know about? 
      print("%d: I know of %s" % 
        (self.id, " ".join([str(id) for id in self.queues.keys()]))) 

      time.sleep(1) 

if __name__ == "__main__": 
    # Construct MyProcess with a Manager.dict for storing the queues 
    # and a lock to protect write access. Start. 
    MyProcess(Manager().dict(), Lock()).start() 

回答

3

我不完全确定你的用例实际上在这里。也许如果你详细阐述一下为什么你想让每个进程动态地产生一个连接队列的孩子,它会更清楚地知道在这种情况下什么是正确的解决方案。

无论如何,现在的问题似乎是,现在还没有一种真正的好方法来动态创建带有Multiprocessing的管道或队列。

我认为,如果您愿意在每个流程中产生线程,您可以使用来回通信。我没有产生线程,而是采用了使用网络套接字的方法,并选择在线程之间进行通信。

动态进程产卵和网络套接字可能仍然是片状的,这取决于产卵/分叉新进程时multiprocessing如何清理文件描述符,并且您的解决方案很可能更容易在* nix衍生物上工作。如果您担心套接字开销,则可以使用unix域套接字来增强轻量级功能,但需要增加运行多个工作机上节点的复杂性。

无论如何,这里是一个使用网络套接字和全局进程列表来实现这个目标的例子,因为我无法找到一个好方法来让multiprocessing这样做。

import collections 
import multiprocessing 
import random 
import select 
import socket 
import time 


class MessagePassingProcess(multiprocessing.Process): 
    def __init__(self, id_, processes): 
     self.id = id_ 
     self.processes = processes 
     self.queue = collections.deque() 
     super(MessagePassingProcess, self).__init__() 

    def run(self): 
     print "Running" 
     inputs = [] 
     outputs = [] 
     server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     address = self.processes[self.id]["address"] 
     print "Process %s binding to %s"%(self.id, address) 
     server.bind(address) 
     server.listen(5) 
     inputs.append(server) 
     process = self.processes[self.id] 
     process["listening"] = True 
     self.processes[self.id] = process 
     print "Process %s now listening!(%s)"%(self.id, process) 
     while inputs: 
      readable, writable, exceptional = select.select(inputs, 
                  outputs, 
                  inputs, 
                  0.1) 
      for sock in readable: 
       print "Process %s has a readable scoket: %s"%(self.id, 
                   sock) 
       if sock is server: 
        print "Process %s has a readable server scoket: %s"%(self.id, 
                   sock) 
        conn, addr = sock.accept() 
        conn.setblocking(0) 
        inputs.append(conn) 
       else: 
        data = sock.recv(1024) 
        if data: 
         self.queue.append(data) 
         print "non server readable socket with data" 
        else: 
         inputs.remove(sock) 
         sock.close() 
         print "non server readable socket with no data" 

      for sock in exceptional: 
       print "exception occured on socket %s"%(sock) 
       inputs.remove(sock) 
       sock.close() 

      while len(self.queue) >= 1: 
       print "Received:", self.queue.pop() 

      # send a message to a random process: 
      random_id = random.choice(list(self.processes.keys())) 
      print "%s Attempting to send message to %s"%(self.id, random_id) 
      random_process = self.processes[random_id] 
      print "random_process:", random_process 
      if random_process["listening"]: 
       random_address = random_process["address"] 
       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
       try: 
        s.connect(random_address) 
       except socket.error: 
        print "%s failed to send to %s"%(self.id, random_id) 
       else: 
        s.send("Hello World!")      
       finally: 
        s.close() 

      time.sleep(1) 

if __name__=="__main__": 
    print "hostname:", socket.getfqdn() 
    print dir(multiprocessing) 
    manager = multiprocessing.Manager() 
    processes = manager.dict() 
    joinable = [] 
    for n in xrange(multiprocessing.cpu_count()): 
     mpp = MessagePassingProcess(n, processes) 
     processes[n] = {"id":n, 
         "address":("127.0.0.1",7000+n), 
         "listening":False, 
         } 
     print "processes[%s] = %s"%(n, processes[n]) 
     mpp.start() 
     joinable.append(mpp) 
    for process in joinable: 
     process.join() 

随着波兰和测试的爱,因为这似乎一样,如果它在标准库是可用一些人会使用,这可能是一个合乎逻辑的延伸到multiprocessing.Process和/或multiprocessing.Pool。创建一个使用套接字可被其他队列发现的DynamicQueue类也是合理的。

无论如何,希望它有帮助。如果你想出一个更好的方法来完成这项工作,请更新。

+0

哇,这真的很有用,谢谢。我已经在用例上添加了一些关于原始问题的信息。队列的优点是不必考虑缓冲区,酸洗,不完整的“recv”等等。但是现在我可以看到,连接队列不是必需的:套接字可以很好地工作,只需要一些额外的工作。我把你的代码和改变了它:现在可以通过pickle发送对象,可以创建新的进程,并且每个新进程现在都会将*自身*添加到字典中。我将添加我的代码作为一个单独的答案。 – jmmcd

+0

我真的很高兴能帮到你。 – stderr

3

此代码基于接受的答案。它在Python 3中,因为OSX Snow Leopard在多处理内容的某些用途上存在段错误。

#!/usr/bin/env python3 

import collections 
from multiprocessing import Process, Manager, Lock, cpu_count 
import random 
import select 
import socket 
import time 
import pickle 

class Message: 
    def __init__(self, origin): 
     self.type = "long_msg" 
     self.data = "X" * 3000 
     self.origin = origin 
    def __str__(self): 
     return "%s %d" % (self.type, self.origin) 

class MessagePassingProcess(Process): 
    def __init__(self, processes, lock): 
     self.lock = lock 
     self.processes = processes 
     with self.lock: 
      self.id = len(list(processes.keys())) 
      process_dict = {"id": self.id, 
          "address": ("127.0.0.1", 7000 + self.id), 
          "listening": False 
          } 
      self.processes[self.id] = process_dict 
     print("new process: processes[%s] = %s" % (self.id, processes[self.id])) 
     self.queue = collections.deque() 
     super(MessagePassingProcess, self).__init__() 

    def run(self): 
     print("Running") 
     self.processes[self.id]["joinable"] = True 
     inputs = [] 
     outputs = [] 
     server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     address = self.processes[self.id]["address"] 
     print("Process %s binding to %s" % (self.id, address)) 
     server.bind(address) 
     server.listen(5) 
     inputs.append(server) 
     process = self.processes[self.id] 
     process["listening"] = True 
     self.processes[self.id] = process 
     print("Process %s now listening!(%s)" % (self.id, process)) 
     while inputs and len(list(self.processes.keys())) < 10: 
      readable, writable, exceptional = select.select(inputs, 
                  outputs, 
                  inputs, 
                  0.1) 
      # read incoming messages 
      for sock in readable: 
       print("Process %s has a readable socket: %s" % (self.id, sock)) 
       if sock is server: 
        print("Process %s has a readable server socket: %s" % 
          (self.id, sock)) 
        conn, addr = sock.accept() 
        conn.setblocking(0) 
        inputs.append(conn) 
       else: 
        data = True 
        item = bytes() # empty bytes object, to be added to 
        recvs = 0 
        while data: 
         data = sock.recv(1024) 
         item += data 
         recvs += 1 
        if len(item): 
         self.queue.append(item) 
         print("non server readable socket: recvd %d bytes in %d parts" 
           % (len(item), recvs)) 
        else: 
         inputs.remove(sock) 
         sock.close() 
         print("non server readable socket: nothing to read") 

      for sock in exceptional: 
       print("exception occured on socket %s" % (sock)) 
       inputs.remove(sock) 
       sock.close() 

      while len(self.queue): 
       msg = pickle.loads(self.queue.pop()) 
       print("received:" + str(msg)) 

      # send a message to a random process: 
      random_id = random.choice(list(self.processes.keys())) 
      print("%s attempting to send message to %s" % (self.id, random_id)) 
      random_process = self.processes[random_id] 
      if random_process["listening"]: 
       random_address = random_process["address"] 
       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
       try: 
        s.connect(random_address) 
       except socket.error: 
        print("%s failed to send to %s"%(self.id, random_id)) 
       else: 
        item = pickle.dumps(Message(self.id)) 
        print("sending a total of %d bytes" % len(item)) 
        s.sendall(item) 
       finally: 
        s.close() 

      # make a new process 
      if random.random() < 0.1: 
       mpp = MessagePassingProcess(self.processes, self.lock) 
       mpp.start() 
      else: 
       time.sleep(1.0) 
     print("process %d finished looping" % self.id) 


if __name__=="__main__": 
    manager = Manager() 
    processes = manager.dict() 
    lock = Lock() 
    # make just one process: it will make more 
    mpp = MessagePassingProcess(processes, lock) 
    mpp.start() 
    # this doesn't join on all the other processes created 
    # subsequently 
    mpp.join() 
1

提供了标准库socketserver,以避免手动编程select()。在这个版本中,我们在一个单独的线程中启动一个套接字服务器,以便每个进程可以在其主循环中执行(当然,假装做)计算。

#!/usr/bin/env python3 

# Each Node is an mp.Process. It opens a client-side socket to send a 
# message to another Node. Each Node listens using a separate thread 
# running a socketserver (so avoiding manual programming of select()), 
# which itself starts a new thread to handle each incoming connection. 
# The socketserver puts received messages on an mp.Queue, where they 
# are picked up by the Node for processing once per loop. This setup 
# allows the Node to do computation in its main loop. 

import multiprocessing as mp 
import threading, random, socket, socketserver, time, pickle, queue 

class Message: 
    def __init__(self, origin): 
     self.type = "long_message" 
     self.data = "X" * random.randint(0, 2000) 
     self.origin = origin 
    def __str__(self): 
     return "Message of type %s, length %d from %d" % (
      self.type, len(self.data), self.origin) 

class Node(mp.Process): 
    def __init__(self, nodes, lock): 
     super().__init__() 

     # Add this node to the Manager.dict of node descriptors. 
     # Write-access is protected by a Lock. 
     self.nodes = nodes 
     self.lock = lock 
     with self.lock: 
      self.id = len(list(nodes.keys())) 
      host = "127.0.0.1" 
      port = 7022 + self.id 
      node = {"id": self.id, "address": (host, port), "listening": False} 
      self.nodes[self.id] = node 
     print("new node: nodes[%s] = %s" % (self.id, nodes[self.id])) 

     # Set up socketserver. 

     # don't know why collections.deque or queue.Queue don't work here. 
     self.queue = mp.Queue() 

     # This MixIn usage is directly from the python.org 
     # socketserver docs 
     class ThreadedTCPServer(socketserver.ThreadingMixIn, 
           socketserver.TCPServer): 
      pass 
     class HandlerWithQueue(socketserver.BaseRequestHandler): 
      # Something of a hack: using class variables to give the 
      # Handler access to this Node-specific data 
      handler_queue = self.queue 
      handler_id = self.id 
      def handle(self): 
       # could receive data in multiple chunks, so loop and 
       # concatenate 
       item = bytes() 
       recvs = 0 
       data = True 
       if data: 
        data = self.request.recv(4096) 
        item += data 
        recvs += 1 
       if len(item): 
        # Receive a pickle here and put it straight on 
        # queue. Will be unpickled when taken off queue. 
        print("%d: socketserver received %d bytes in %d recv()s" 
          % (self.handler_id, len(item), recvs)) 
        self.handler_queue.put(item) 

     self.server = ThreadedTCPServer((host, port), HandlerWithQueue) 
     self.server_thread = threading.Thread(target=self.server.serve_forever) 
     self.server_thread.setDaemon(True) # Tell it to exit when Node exits. 
     self.server_thread.start() 
     print("%d: server loop running in thread %s" % 
       (self.id, self.server_thread.getName())) 

     # Now ready to receive 
     with self.lock: 
      # Careful: if we assign directly to 
      # self.nodes[self.id]["listening"], the new value *won't* 
      # be propagated to other Nodes by the Manager.dict. Have 
      # to use this hack to re-assign the Manager.dict key. 
      node = self.nodes[self.id] 
      node["listening"] = True 
      self.nodes[self.id] = node 

    def send(self): 
     # Find a destination. All listening nodes are eligible except self. 
     dests = [node for node in self.nodes.values() 
       if node["id"] != self.id and node["listening"]] 
     if len(dests) < 1: 
      print("%d: no node to send to" % self.id) 
      return 
     dest = random.choice(dests) 
     print("%d: sending to %s" % (self.id, dest["id"])) 

     # send 
     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     try: 
      s.connect(dest["address"]) 
     except socket.error: 
      print("%s: failed to send to %s" % (self.id, dest["id"])) 
     else: 
      item = pickle.dumps(Message(self.id)) 
      s.sendall(item) 
     finally: 
      s.close() 

    # Check our queue for incoming messages. 
    def receive(self): 
     while True: 
      try: 
       message = pickle.loads(self.queue.get(False)) 
       print("%d: received %s" % (self.id, str(message))) 
      except queue.Empty: 
       break 

    def run(self): 
     print("%d: in run()" % self.id) 
     # Main loop. Loop until at least 10 Nodes exist. Because of 
     # parallel processing we might get a few more 
     while len(list(self.nodes.keys())) < 10: 
      time.sleep(random.random() * 0.5) # simulate heavy computation 
      self.send() 
      time.sleep(random.random() * 0.5) # simulate heavy computation 
      self.receive() 
      # maybe make a new node 
      if random.random() < 0.1: 
       new = Node(self.nodes, self.lock) 
       new.start() 
     # Seems natural to call server_thread.shutdown() here, but it 
     # hangs. But since we've set the thread to be a daemon, it 
     # will exit when this process does. 
     print("%d: finished" % self.id) 

if __name__=="__main__": 
    manager = mp.Manager() 
    nodes = manager.dict() 
    lock = mp.Lock() 
    # make just one node: it will make more 
    node0 = Node(nodes, lock) 
    node0.start() 
    # This doesn't join on all the other nodes created subsequently. 
    # But everything seems to work out ok. 
    node0.join() 
+0

请注意,select/socketserver是非常不同的模型:socketserver使用线程/分叉和阻塞套接字,而select使用带有非阻塞套接字的单个进程。 – stderr

+0

@Mike好的,谢谢。我希望我的理解是正确的:从监听者的角度来看,在进程中使用非阻塞套接字与在专用进程/线程中阻塞套接字的效果大致相同:在这两种情况下,消息最终都会在每次迭代中处理一次。然而,从发送者的角度来看,对一个* blocking *套接字发送一个“即发即忘”发送更容易,因为它是“保证的”(实际上,没有保证)是在监听而不是计算在其循环的其他部分。 – jmmcd