2017-12-18 93 views
1

我玩弄了解多线程处理一个客户端的消息,所以我写了下面的客户机/服务器应用程序,其中,服务器发送命令给客户端,客户端检查此命令,如果它等于到'a'它发送一个回复到服务器。Python的多线程服务器可以在同一时间

在我创建了两个插槽和一个线程的服务器代码;第一个套接字将命令发送(发布)到所有连接(订阅)的客户端。在线程第二插座等待来自客户的任何答复,但因为线程执行一些阻塞的操作(如存储客户端在数据库中发送的信息),它可以在同一时间,即使插座(REQ-REP处理一个客户端套接字)可以同时接收多条消息。

server.py

import zmq 
import logging 
import threading 
import time 

logging.basicConfig(level=logging.DEBUG) 


class Server(object): 
    def __init__(self): 
     self.context = zmq.Context() 
     self.pub_port = 7777 
     self.rep_port = 7778 

     self.pub_socket = None 
     self.rep_socket = None 
     self.interface = "*" 

    def bind_ports(self): 
     logging.debug("[bind_ports] binding the ports....") 
     self.pub_socket = self.context.socket(zmq.PUB) 
     pub_bind_str = "tcp://{}:{}".format(self.interface, self.pub_port) 
     self.pub_socket.bind(pub_bind_str) 

     self.rep_socket = self.context.socket(zmq.REP) 
     rep_bind_str = "tcp://{}:{}".format(self.interface, self.rep_port) 
     self.rep_socket.bind(rep_bind_str) 

    def received_info(self): 
     while True: 
      # logging.debug("[received_flow] ") 
      cl_data = self.rep_socket.recv_json() 
      logging.info("[received_data] data <{}>".format(flow)) 
      self.rep_socket.send(b"\x00") 
      self.blocking_op(cl_data) 

    def blocking_op(self, data): 
     time.sleep(1) # simulating some blocking operations e.g. storing info in a database 

    def push_instruction(self, cmd): 
     logging.debug("[push_inst] Sending the instruction <%s> to the clients...", 
     # logging.debug("[push_inst] Sending the instruction <%s> to the agents ...", 
     cmd) 
     instruction = {"cmd": cmd} 
     self.pub_socket.send_json(instruction) 

    def create_thread(self): 
     thread = threading.Thread(target=self.received_info) 
     thread.daemon = True 
     thread.start() 
     logging.debug("[create_thread] Thread created <{}>".format(
                 thread.is_alive())) 

    def start_main_loop(self): 
     logging.debug("[start_main_loop] Loop started....") 
     self.bind_ports() 
     self.create_thread() 

     while True: 
      cmd = input("Enter your command: ") 
      self.push_instruction(cmd) 

if __name__ == "__main__": 
    Server().start_main_loop() 

client.py

import zmq 
import logging 
import random 
import time 

logging.basicConfig(level=logging.DEBUG) 

class Client(object): 
    def __init__(self): 
     self.context = zmq.Context() 
     self.sub_socket = None 
     self.req_socket = None 

     self.pub_port = 7777 
     self.req_port = 7778 
     self.server_ip = 'localhost' 

     self.client_id = "" 

    def connect_to_server(self): 
     logging.debug("[conn_to_serv] Connecting to the server ....") 
     self.sub_socket = self.context.socket(zmq.SUB) 
     self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, "") 
     conn_str = "tcp://{}:{}".format(self.server_ip, self.pub_port) 
     self.sub_socket.connect(conn_str) 

     self.req_socket = self.context.socket(zmq.REQ) 
     req_conn_str = "tcp://{}:{}".format(self.server_ip, self.req_port) 
     self.req_socket.connect(req_conn_str) 

    def get_instruction(self): 
     inst = self.sub_socket.recv_json() 
     logging.debug("[get_inst] Server sent inst") 
     cmd = inst["cmd"] 
     return cmd 
    def send_flow(self, x, y): 
     flow = { 
      "client_id": self.client_id, 
      "x": x, 
      "y": y 
     } 
     self.req_socket.send_json(flow) 

    def start_main_loop(self): 
     logging.debug("starting the main loop ....") 
     self.client_id = input("What is your id: ") 
     self.connect_to_server() 

     while True: 
      inst = self.get_instruction() 
      logging.info("[Main_loop] inst<{}>".format(inst)) 
      if inst == "a": 
       # time.sleep(random.uniform(.6, 1.5)) 
       self.send_flow("xxx", "yyy") 
       self.req_socket.recv() 
       logging.debug("[main_loop] server received the flow") 

if __name__ == "__main__": 
    Client().start_main_loop() 

我将不胜感激,如果有人能帮助我提高了服务器,以便它可以在为多个客户的信息同时。

+0

如果你的反应处理块或需要很长的时间,那么一个好办法是在你的'receive_info()'应对读取,然后启动一个线程来完成实际的处理。执行此线程需要花费的时间,但它不会阻止您的主循环。 – Hannu

回答

1

我是不是能够运行您的代码和测试,但如果你的问题是receive_info()拦截,你会绕过通过启动一个线程来处理实际的响应。像这样的东西(可能包含错别字,我不能与你的代码来测试 - 例如不知道什么flow是)

def handle_response(self, data): 
    logging.info("[received_data] data <{}>".format(flow)) 
    self.rep_socket.send(b"\x00") 
    self.blocking_op(data) 

def received_info(self): 
     while True: 
      # logging.debug("[received_flow] ") 
      cl_data = self.rep_socket.recv_json() 
      _t = threading.Thread(target=self.handle_response, args=(cl_data,)) 
      _t.start() 

这有你的received_info()循环,因为它是,而是做着处理在那里,启动一个新线程来处理响应。它需要花费什么才能完成,然后线程死亡,但是您的received_info()将立即准备好等待新的响应。

+0

非常感谢Hannu,它工作。顺便说一句,在args =(cl_data,)cl_data之后为什么会有昏迷? 一个额外的问题:你想,如果我要处理的说1000个是客户更好使用线程或使用GEVENT(或ASYNCIO)? – Corey

+0

逗号在那里,因为你只传递一个参数,'args'必须是一个元组。如果你传递了多个参数,你可以声明args =(a,b,c)而不用尾随的逗号,但它是从一个项目中创建一个元组的最简单的方法。 – Hannu

+0

我不是asyncio的专家,所以无法评论性能。由于GIL,Python并不是最有效的并行处理语言。尝试线程,如果有问题,请调查。线程可能是绝对好的。 – Hannu