2015-02-11 62 views
1

我有两个线程。一个是Worker Thread,另一个是Communication ThreadZMQ对(用于信令)由于连接不好而阻塞

Worker Thread正在从串口读取数据,进行一些处理,然后将结果排队发送到服务器。

Communication Tthread正在读取队列中的结果并发送它。我们面临的挑战是连接是无线的,虽然通常会出现,但它可能很缺乏(在几分钟内掉入和退出范围),如果我失去连接,我不想阻止Worker Thread

我选择这种情况的模式,如下所示:

Worker Thread具有enqueue方法,其将消息添加到一个Queue,然后使用zmq.PAIR发送信号到inproc://signal

Communication Thread使用zmq.DEALER传达给服务器(zmq.ROUTER),但轮询inproc://signal对上,以寄存器是否有新的消息发送需要与否。

下面是模式的一个简单的例子:

import Queue 
import zmq 
import time 
import threading 
import simplejson 


class ZmqPattern(): 
    def __init__(self): 
     self.q_out = Queue.Queue() 
     self.q_in = Queue.Queue() 
     self.signal = None 
     self.API_KEY = 'SOMETHINGCOMPLEX' 
     self.zmq_comm_thr = None 

    def start_zmq_signal(self): 
     self.context = zmq.Context() 

     # signal socket for waking the zmq thread to send messages to the relay 
     self.signal = self.context.socket(zmq.PAIR) 
     self.signal.bind("inproc://signal") 

    def enqueue(self, msg): 
     print("> pre-enqueue") 
     self.q_out.put(msg) 
     print("< post-enqueue") 

     print(") send sig") 
     self.signal.send(b"") 
     print("(sig sent") 

    def communication_thread(self, q_out): 
     poll = zmq.Poller() 

     self.endpoint_url = 'tcp://' + '127.0.0.1' + ':' + '9001' 

     wake = self.context.socket(zmq.PAIR) 
     wake.connect("inproc://signal") 
     poll.register(wake, zmq.POLLIN) 

     self.socket = self.context.socket(zmq.DEALER) 
     self.socket.setsockopt(zmq.IDENTITY, self.API_KEY) 
     self.socket.connect(self.endpoint_url) 
     poll.register(self.socket, zmq.POLLIN) 

     while True: 
      sockets = dict(poll.poll()) 

      if self.socket in sockets: 
       message = self.socket.recv() 
       message = simplejson.loads(message) 

       # Incomming messages which need to be handled on the worker thread 
       self.q_in.put(message) 

      if wake in sockets: 
       wake.recv() 
       while not q_out.empty(): 
        print(">> Popping off Queue") 
        message = q_out.get() 
        print(">>> Popped off Queue") 
        message = simplejson.dumps(message) 
        print("<<< About to be sent") 
        self.socket.send(message) 
        print("<< Sent") 

    def start(self): 
     self.start_zmq_signal() 
     # ZMQ Thread 
     self.zmq_comm_thr = threading.Thread(target=self.communication_thread, args=([self.q_out])) 
     self.zmq_comm_thr.daemon = True 
     self.zmq_comm_thr.name = "ZMQ Thread" 
     self.zmq_comm_thr.start() 


if __name__ == '__main__': 
    test = ZmqPattern() 
    test.start() 

    print '###############################################' 
    print '############## Starting comms #################' 
    print "###############################################" 

    last_debug = time.time() 
    test_msg = {} 
    for c in xrange(1000): 
     key = 'something{}'.format(c) 
     val = 'important{}'.format(c) 
     test_msg[key] = val 

    while True: 
     test.enqueue(test_msg) 
     if time.time() - last_debug > 1: 
      last_debug = time.time() 
      print "Still alive..." 

如果你运行它,你会看到庄家块,因为是在另一端没有路由器,不久,一对块因为Communication Thread未收到

我应该如何最好地设置inproc zmq而不是阻止Worker Thread。仅供参考,整个系统需要缓冲的最多的是大约200k个消息,每个消息大约为256个字节。

+0

让你在使用一个ZMQ INPROC仲裁访问蟒蛇'Queue'。为什么不直接使用ZMQ的排队?想到PUSH-PULL。 – engineerC 2015-02-11 20:49:37

+0

这不就是以同样的方式阻止吗? – Jono 2015-02-11 22:57:13

+0

是的,这是更多的一般性评论。回答下面为什么你阻止。 – engineerC 2015-02-11 23:09:03

回答

1

经销商插座对其将存储的消息数量有限制,称为高水位标记。正好在您的经销商插座创建的下方,请尝试:

self.socket = self.context.socket(zmq.DEALER) 
    self.socket.setsockopt(zmq.SNDHWM, 200000) 

并将该数字设置得和您一样高;限制是你机器的内存。

编辑:

高水痕在这个问题上有一些很好的讨论:

Majordomo broker: handling large number of connections

+0

感谢您的建议。我想在采取行动之前提出问题,因为我从来没有完全确定这种影响。如果我在经销商套接字上设置HWM,它仍然会从套接字接收所有inproc信号,并且一切都会按预期继续进行吗?你觉得更好的解决方案是将它缓存在发送者队列/内核缓冲区中,而不是在Worker Thread和Communication Thread之间使用信号量 - 将消息缓存到队列中,但不发送inproc信号? – Jono 2015-02-12 18:06:18

+0

假设你永远不想丢弃消息,并假设你的消费者可以随时消失,你必须要么1)有一个无限的队列来存储它们,或2)当你达到极限时暂停正在创建它们的进程你的队列。有一件事情尚不清楚:工作人员是否需要为了某种其他原因而工作,而不是产生这些信息;也就是说,无论有没有人在倾听,它是否会执行一些必要的功能? – engineerC 2015-02-12 18:13:25

+0

关于信号,我觉得这是完全没有必要的,因为如果你在排队,你总是可以在队列本身做一个阻塞等待,当新项目被添加到队列时它将立即解除阻塞。 – engineerC 2015-02-12 18:14:37

相关问题