观光了解zeromq:
- 结合/连接顺序通常并不重要
- 推/拉用于当一个对等应接收的每个消息和/或消息不应当被丢弃
- PUB/SUB用于所有对等方都应接收消息和/或发送无人听到时发送的消息。
- ZeroMQ故意隐藏从应用程序代码中按设计连接/断开打开/关闭事件,因此您无法检测到实际关闭事件。
您需要知道的一件事情是,您不应该:套接字连接时,它会创建一个管道(对等端不需要存在)。当套接字绑定时,它只在对等体连接时创建管道。这些管道管理套接字的HWM行为。这意味着没有对等连接套接字和没有对等连接套接字的行为是不同的。如果您尝试使用绑定套接字发送邮件,则没有对等方的绑定套接字将被阻止,而连接套接字将愉快地将邮件排队等待到对方到达并开始使用邮件。
基于这几点,你想要做的是:
- 使用推/拉
- 接收机应该绑定
- 发送一个特殊的“关闭”消息,指示队列完成,而不是检测到tcp/ipc级别的关闭事件。
以下是Python中的一个工作示例,它使用IPC套接字(文件)进行通信,接收方在发送方之后开始一段时间。
公共信息双方需要知道:
import time
import zmq
# the file used for IPC communication
PIPE = '/tmp/fifo-pipe'
# command flags for our tiny message protocol
DONE = b'\x00'
MSG = b'\x01'
接收器(PULL)结合,并占用直到DONE
def receiver():
ctx = zmq.Context()
s = ctx.socket(zmq.PULL)
s.bind("ipc://%s" % PIPE)
while True:
parts = s.recv_multipart()
cmd = parts[0]
if cmd == DONE:
print "[R] received DONE"
break
msg = parts[1]
# handle the message
print "[R] %.1f consuming %s" % (time.time() - t0, msg)
s.close()
ctx.term()
print "[R] done"
发送者(PUSH)连接,并发送,发送完成对应信号完成
def sender():
ctx = zmq.Context()
s = ctx.socket(zmq.PUSH)
s.connect("ipc://%s" % PIPE)
for i in range(10):
msg = b'msg %i' % i
print "[S] %.1f sending %s" % (time.time() - t0, msg)
s.send_multipart([MSG, msg])
time.sleep(1)
print "[S] sending DONE"
s.send(DONE)
s.close()
ctx.term()
print "[S] done"
还有一个演示脚本,它们一起运行,并且发送者启动荷兰国际集团第一,和接收器启动后发送者已经发送了几条消息:
from threading import Thread
# global t0, just for keeping times relative to start, rather than 1970
t0 = time.time()
# start the sender
s = Thread(target=sender)
s.start()
# start the receiver after a delay
time.sleep(5)
r = Thread(target=receiver)
r.start()
# wait for them both to finish
s.join()
r.join()
从而可以看出一起here运行。