2017-02-21 65 views
1

我试图让多处理ServerApp在Windows上工作。我想这个问题是缺少os.fork()功能,所以我必须通过socket不知何故这是不可pickleable(?!)。Python3 Windows多处理传递套接字来处理

我已经看到这可能是reduce_handlerebuild_handlemultiprocessing.reductionhere所示,但这些方法在Python 3(?!)中不可用。虽然我有可用的duplicatesteal_handle可用我找不到一个示例如何使用它们,或者我是否需要它们。

此外,我想知道logging是否会在创建新流程时出现问题?

这里是我的ServerApp样本:

import logging 
import socket 

from select import select 
from threading import Thread 
from multiprocessing import Queue 
from multiprocessing import Process 
from sys import stdout 
from time import sleep 


class ServerApp(object): 

    logger = logging.getLogger(__name__) 
    logger.setLevel(logging.DEBUG) 
    handler = logging.StreamHandler(stdout) 
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') 
    handler.setFormatter(formatter) 
    logger.addHandler(handler) 


    def conn_handler(self, connection, address, buffer): 

     self.logger.info("[%d] - Connection from %s:%d", self.id, address[0], address[1]) 

     try: 
      while True: 

       command = None 
       received_data = b'' 
       readable, writable, exceptional = select([connection], [], [], 0) # Check for client commands 

       if readable: 
        # Get Command ... There is more code here 
        command = 'Something' 


       if command == 'Something': 
        connection.sendall(command_response) 
       else: 
        print(':(') 

     except Exception as e: 
      print(e) 
     finally: 
      connection.close() 
      self.client_buffers.remove(buffer) 
      self.logger.info("[%d] - Connection from %s:%d has been closed.", self.id, address[0], address[1]) 


    def join(self): 

     while self.listener.is_alive(): 
      self.listener.join(0.5) 


    def acceptor(self): 

     while True: 
      self.logger.info("[%d] - Waiting for connection on %s:%d", self.id, self.ip, self.port) 

      # Accept a connection on the bound socket and fork a child process to handle it. 
      conn, address = self.socket.accept() 

      # Create Queue which will represent buffer for specific client and add it o list of all client buffers 
      buffer = Queue() 
      self.client_buffers.append(buffer) 

      process = Process(target=self.conn_handler, args=(conn, address, buffer)) 
      process.daemon = True 
      process.start() 
      self.clients.append(process) 

      # Close the connection fd in the parent, since the child process has its own reference. 
      conn.close() 


    def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp', buffer_size=2048): 

     self.id = id 
     self.port = port 
     self.ip = ip 

     self.socket = None 
     self.listener = None 
     self.buffer_size = buffer_size 

     # Additional attributes here.... 

     self.clients = [] 
     self.client_buffers = [] 


    def run(self): 

     # Create TCP socket, bind port and listen for incoming connections 
     self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.socket.bind((self.ip, self.port)) 
     self.socket.listen(5) 

     self.listener = Thread(target=self.acceptor) # Run acceptor thread to handle new connection 
     self.listener.daemon = True 
     self.listener.start() 
+0

你写了一些代码,但看不到任何'协议'定义。如果已经接受,则无法定义任何接受规则(什么是过滤器?)。 – dsgdfg

+0

@dsgdfg不知道我是否正确,但每个连接都应该被单独的进程接受和处理。 – sstevan

回答

3

允许连接酸洗(包括插座)为python3,你应该使用mulitprocessing.allow_connection_pickling。它在ForkingPickler中注册套接字减速器。例如:

import socket 
import multiprocessing as mp 
mp.allow_connection_pickling() 


def _test_connection(conn): 
    msg = conn.recv(2) 
    conn.send(msg) 
    conn.close() 
    print("ok") 

if __name__ == '__main__': 
    server, client = socket.socketpair() 

    p = mp.Process(target=_test_connection, args=(server,)) 
    p.start() 

    client.settimeout(5) 

    msg = b'42' 
    client.send(msg) 
    assert client.recv(2) == msg 

    p.join() 
    assert p.exitcode == 0 

    client.close() 
    server.close() 

我还注意到,您有unrealted到socket酸洗其他一些问题。

  • 使用时self.conn_handler作为目标时,多会尝试以酸洗整个对象self。这是一个问题,因为您的对象包含一些不能被酸洗的Thread。因此,您应该从关闭目标函数中删除self。可以通过使用@staticmethod修饰器并删除函数中所有提及的self来完成。

  • 此外,logging模块不处理多个进程。基本上,启动的Process的所有日志都将丢失您的当前代码。要解决这个问题,您可以在启动第二个Process(在conn_handler的开头)或使用multiprocessing日志工具启动新的logging

这可以让这样的事情:

import logging 
import socket 

from select import select 
from threading import Thread 
from multiprocessing import util, get_context 
from sys import stdout 
from time import sleep 

util.log_to_stderr(20) 
ctx = get_context("spawn") 


class ServerApp(object): 

    logger = logging.getLogger(__name__) 
    logger.setLevel(logging.DEBUG) 
    handler = logging.StreamHandler(stdout) 
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') 
    handler.setFormatter(formatter) 
    logger.addHandler(handler) 

    def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp', 
       buffer_size=2048): 

     self.id = id 
     self.port = port 
     self.ip = ip 

     self.socket = None 
     self.listener = None 
     self.buffer_size = buffer_size 

     # Additional attributes here.... 

     self.clients = [] 
     self.client_buffers = [] 

    @staticmethod 
    def conn_handler(id, connection, address, buffer): 

     print("test") 
     util.info("[%d] - Connection from %s:%d", id, address[0], address[1]) 

     try: 
      while True: 

       command = None 
       received_data = b'' 
       # Check for client commands 
       readable, writable, exceptional = select([connection], [], [], 
                 0) 

       if readable: 
        # Get Command ... There is more code here 
        command = 'Something' 

       if command == 'Something': 
        connection.sendall(b"Coucouc") 
        break 
       else: 
        print(':(') 
       sleep(.1) 

     except Exception as e: 
      print(e) 
     finally: 
      connection.close() 
      util.info("[%d] - Connection from %s:%d has been closed.", id, 
        address[0], address[1]) 
      print("Close") 

    def join(self): 

     while self.listener.is_alive(): 
      self.listener.join(0.5) 

    def acceptor(self): 

     while True: 
      self.logger.info("[%d] - Waiting for connection on %s:%d", self.id, 
          self.ip, self.port) 

      # Accept a connection on the bound socket and fork a child process 
      # to handle it. 
      conn, address = self.socket.accept() 

      # Create Queue which will represent buffer for specific client and 
      # add it o list of all client buffers 
      buffer = ctx.Queue() 
      self.client_buffers.append(buffer) 

      process = ctx.Process(target=self.conn_handler, 
           args=(self.id, conn, address, buffer)) 
      process.daemon = True 
      process.start() 
      self.clients.append(process) 

      # Close the connection fd in the parent, since the child process 
      # has its own reference. 
      conn.close() 

    def run(self): 

     # Create TCP socket, bind port and listen for incoming connections 
     self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.socket.bind((self.ip, self.port)) 
     self.socket.listen(5) 

     # Run acceptor thread to handle new connection 
     self.listener = Thread(target=self.acceptor) 
     self.listener.daemon = True 
     self.listener.start() 

     self.listener.join() 


def main(): 
    app = ServerApp(0) 
    app.run() 


if __name__ == '__main__': 
    main() 

我只测试了它在Unix和python3.6但正如我在Windows中使用的菌种方面, which should behave like the Process`它不应该有太多的行为不同。

+0

抱歉,延迟。我已经测试了'allow_connection_pickling',但它没有效果。但是,我注意到我收到两个不同的错误(运行相同的代码两次)。这是[第一个](http://pastebin.com/zyS2Sbtm),这里是[第二个](http://pastebin.com/7FQ1nvxN)。当我没有将侦听器分配为ServerApp属性(只是'listener'而不是'self.listener')时,我没有错误,但处理程序进程也不会执行。 – sstevan

+0

这不是'套接字酸洗问题。如果你阅读这些错误,它不会腌制一个'_thread.Lock'和一些'io'对象。我会说这与整个'ServerApp'对象的酸洗有关,当你使用实例方法来启动新的'Process'时需要。您应该为'conn_handler'使用'@ staticmethod'装饰器,或者将其从类中移除。这也是一个很好的做法,因为腌制整个对象是不安全的(例如,如果它处理一些密码)。另外,请尝试提供一些重现错误的基本脚本以允许测试。 –

+0

另外,作为一个很好的例子,你应该使用'multiprocessing.utils.log_to_stderr'和'multiprocessing.utils.debug/info'来获得与你的'Process'相一致的日志记录。如果您需要使用自定义日志记录,则应该在目标函数的开始处启动它,因为'logging'只能用于一个'Process'。 –