我有两个线程。一个是Worker Thread
,另一个是Communication Thread
。ZMQ对(用于信令)由于连接不好而阻塞
Worker Thread
正在从串口读取数据,进行一些处理,然后将结果排队发送到服务器。
Communication Tthread
正在读取队列中的结果并发送它。我们面临的挑战是连接是无线的,虽然通常会出现,但它可能很缺乏(在几分钟内掉入和退出范围),如果我失去连接,我不想阻止Worker Thread
。
我选择这种情况的模式,如下所示:
Worker Thread
具有enqueue
方法,其将消息添加到一个Queue
,然后使用zmq.PAIR
发送信号到inproc://signal
。
Communication Thread
使用zmq.DEALER
传达给服务器(zmq.ROUTER
),但轮询inproc://signal
对上,以寄存器是否有新的消息发送需要与否。
下面是模式的一个简单的例子:
import Queue
import zmq
import time
import threading
import simplejson
class ZmqPattern():
def __init__(self):
self.q_out = Queue.Queue()
self.q_in = Queue.Queue()
self.signal = None
self.API_KEY = 'SOMETHINGCOMPLEX'
self.zmq_comm_thr = None
def start_zmq_signal(self):
self.context = zmq.Context()
# signal socket for waking the zmq thread to send messages to the relay
self.signal = self.context.socket(zmq.PAIR)
self.signal.bind("inproc://signal")
def enqueue(self, msg):
print("> pre-enqueue")
self.q_out.put(msg)
print("< post-enqueue")
print(") send sig")
self.signal.send(b"")
print("(sig sent")
def communication_thread(self, q_out):
poll = zmq.Poller()
self.endpoint_url = 'tcp://' + '127.0.0.1' + ':' + '9001'
wake = self.context.socket(zmq.PAIR)
wake.connect("inproc://signal")
poll.register(wake, zmq.POLLIN)
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self.API_KEY)
self.socket.connect(self.endpoint_url)
poll.register(self.socket, zmq.POLLIN)
while True:
sockets = dict(poll.poll())
if self.socket in sockets:
message = self.socket.recv()
message = simplejson.loads(message)
# Incomming messages which need to be handled on the worker thread
self.q_in.put(message)
if wake in sockets:
wake.recv()
while not q_out.empty():
print(">> Popping off Queue")
message = q_out.get()
print(">>> Popped off Queue")
message = simplejson.dumps(message)
print("<<< About to be sent")
self.socket.send(message)
print("<< Sent")
def start(self):
self.start_zmq_signal()
# ZMQ Thread
self.zmq_comm_thr = threading.Thread(target=self.communication_thread, args=([self.q_out]))
self.zmq_comm_thr.daemon = True
self.zmq_comm_thr.name = "ZMQ Thread"
self.zmq_comm_thr.start()
if __name__ == '__main__':
test = ZmqPattern()
test.start()
print '###############################################'
print '############## Starting comms #################'
print "###############################################"
last_debug = time.time()
test_msg = {}
for c in xrange(1000):
key = 'something{}'.format(c)
val = 'important{}'.format(c)
test_msg[key] = val
while True:
test.enqueue(test_msg)
if time.time() - last_debug > 1:
last_debug = time.time()
print "Still alive..."
如果你运行它,你会看到庄家块,因为是在另一端没有路由器,不久,一对块因为Communication Thread
未收到
我应该如何最好地设置inproc zmq而不是阻止Worker Thread
。仅供参考,整个系统需要缓冲的最多的是大约200k个消息,每个消息大约为256个字节。
让你在使用一个ZMQ INPROC仲裁访问蟒蛇'Queue'。为什么不直接使用ZMQ的排队?想到PUSH-PULL。 – engineerC 2015-02-11 20:49:37
这不就是以同样的方式阻止吗? – Jono 2015-02-11 22:57:13
是的,这是更多的一般性评论。回答下面为什么你阻止。 – engineerC 2015-02-11 23:09:03