2013-03-01 170 views
1

我有两个进程('发送者'和'接收者')需要通过瞬态单向FIFO通信管道进行通信,本地在一台机器上进行通信。下面是我想(用语言更接近于Unix域套接字)来发生的事情:ZeroMQ和本地FIFO

  • 发件人“创建”管道在著名的地址,并立即发送邮件,降低IT
  • 在某些时候(之前或后发送“创建”管),接收器连接到管
  • 阅读器读取消息关闭管的
  • 发件人“关闭”所有的消息都被读出管
  • 阅读器通知(可能的是管道关闭)

我的问题是:我如何使用ZeroMQ实现这一点? “PUB/SUB”,“推/拉”?在ZMQ套接字中检测“数据结束”的机制是什么?是否可以同时允许上述前两项的排序:即发送者或接收者是否首先尝试连接?如果是这样,怎么样?

谢谢。

回答

2

观光了解zeromq:

  1. 结合/连接顺序通常并不重要
  2. 推/拉用于当一个对等应接收的每个消息和/或消息不应当被丢弃
  3. PUB/SUB用于所有对等方都应接收消息和/或发送无人听到时发送的消息。
  4. ZeroMQ故意隐藏从应用程序代码中按设计连接/断开打开/关闭事件,因此您无法检测到实际关闭事件。

您需要知道的一件事情是,您不应该:套接字连接时,它会创建一个管道(对等端不需要存在)。当套接字绑定时,它只在对等体连接时创建管道。这些管道管理套接字的HWM行为。这意味着没有对等连接套接字和没有对等连接套接字的行为是不同的。如果您尝试使用绑定套接字发送邮件,则没有对等方的绑定套接字将被阻止,而连接套接字将愉快地将邮件排队等待到对方到达并开始使用邮件。

基于这几点,你想要做的是:

  1. 使用推/拉
  2. 接收机应该绑定
  3. 发送一个特殊的“关闭”消息,指示队列完成,而不是检测到tcp/ipc级别的关闭事件。

以下是Python中的一个工作示例,它使用IPC套接字(文件)进行通信,接收方在发送方之后开始一段时间。

公共信息双方需要知道:

import time 

import zmq 

# the file used for IPC communication 
PIPE = '/tmp/fifo-pipe' 

# command flags for our tiny message protocol 
DONE = b'\x00' 
MSG = b'\x01' 

接收器(PULL)结合,并占用直到DONE

def receiver(): 
    ctx = zmq.Context() 
    s = ctx.socket(zmq.PULL) 
    s.bind("ipc://%s" % PIPE) 
    while True: 
     parts = s.recv_multipart() 
     cmd = parts[0] 
     if cmd == DONE: 
      print "[R] received DONE" 
      break 
     msg = parts[1] 
     # handle the message 
     print "[R] %.1f consuming %s" % (time.time() - t0, msg) 
    s.close() 
    ctx.term() 
    print "[R] done" 

发送者(PUSH)连接,并发送,发送完成对应信号完成

def sender(): 
    ctx = zmq.Context() 
    s = ctx.socket(zmq.PUSH) 
    s.connect("ipc://%s" % PIPE) 

    for i in range(10): 
     msg = b'msg %i' % i 
     print "[S] %.1f sending %s" % (time.time() - t0, msg) 
     s.send_multipart([MSG, msg]) 
     time.sleep(1) 
    print "[S] sending DONE" 
    s.send(DONE) 
    s.close() 
    ctx.term() 
    print "[S] done" 

还有一个演示脚本,它们一起运行,并且发送者启动荷兰国际集团第一,和接收器启动后发送者已经发送了几条消息:

from threading import Thread 

# global t0, just for keeping times relative to start, rather than 1970 
t0 = time.time() 

# start the sender 
s = Thread(target=sender) 
s.start() 

# start the receiver after a delay 
time.sleep(5) 
r = Thread(target=receiver) 
r.start() 

# wait for them both to finish 
s.join() 
r.join() 

从而可以看出一起here运行。