2011-08-24 62 views
3

什么是使用0mq建立进程之间的双向通信的最正确的方法是什么?我需要创建几个后台进程,它们将等待来自主进程的命令,执行一些计算并将结果返回给主进程。0mq一个一对多的连接

回答

5

有几个方法可以做到这一点。最直接的方法可能是使用REQ/REP套接字。每一个后台进程/工人将有一个REP插座,你就可以使用一个插座REQ与他们沟通:

import zmq 

def worker(addr): 
    context = zmq.Context() 
    socket = context.socket(zmq.REP) 
    socket.bind(addr) 
    while True: 
     # get message from boss 
     msg = socket.recv() 
     # ...do smth 
     # send back results 
     socket.send(msg) 

if __name__ == '__main__': 
    # spawn 5 workers 
    from multiprocessing import Process 
    for i in range(5): 
     Process(target=worker, args=('tcp://127.0.0.1:500%d' % i,)).start() 

你必须连接到每个工人向他们发送消息,并取回结果:

context = zmq.Context() 
socket = context.socket(zmq.REQ) 
socket.connect(worker_addr) 
socket.send('message') 
msg = socket.recv() 

另一种方法是使用PUB/SUB的消息断火工人和PUSH/PULL收获结果:

import zmq 

def worker(worker_id, publisher_addr, results_addr): 
    context = zmq.Context() 
    sub = context.socket(zmq.SUB) 
    sub.connect(publisher_addr) 
    sub.setsockopt(zmq.SUBSCRIBE, worker_id) 
    push = context.socket(zmq.PUSH) 
    push.connect(results_addr) 

    while True: 
     msg = sub.recv_multipart()[1] 
     # do smth, send off results 
     push.send_multipart([worker_id, msg]) 

if __name__ == '__main__': 
    publisher_addr = 'tcp://127.0.0.1:5000' 
    results_addr = 'tcp://127.0.0.1:5001' 

    # launch some workers into space 
    from multiprocessing import Process 
    for i in range(5): 
     Process(target=worker, args=('worker-%d' % i, publisher_addr, results_addr,)).start() 

广播命令到具体工作人员,你会做这样的事情:

context = zmq.Context() 
pub = context.socket(zmq.PUB) 
pub.bind(publisher_addr) 
# send message to worker-1 
pub.send_multipart(['worker-1', 'hello']) 

拉的结果:

context = zmq.Context() 
pull = context.socket(zmq.PULL) 
pull.bind(results_addr) 

while True: 
    worker_id, result = pull.recv_multipart() 
    print worker_id, result 
3

考虑使用Request Reply Broker但交换REQ插座到经销商。 DEALER不会阻止发送,并会自动为您的工作人员平衡流量。

在图片Client将是你的main processService A/B/C是你的background processes (workers)Main process应该绑定到一个端点。 Workers应连接到主进程的端点以接收工作项目。

在工作项目main process保持列表和发送时间。如果一段时间没有答案,只需重新发送工作项目,因为worker可能已经死亡。

Request Reply Broker