2017-10-12 82 views
0

我试图使用pyzmq的multiprocessing.Process一个领域内PUB/SUB插座里面的原型没有收到消息:pub/sub模型一个multiprocessing.Process

我有一个用户:

import time 
import collections  
import zmq 

context = zmq.Context() 
socket = context.socket(zmq.SUB) 
socket.setsockopt(zmq.SUBSCRIBE, b"") 
socket.connect("tcp://localhost:5000") 

nb_recv = 0 
begin = time.time() 
counter = collections.defaultdict(int) 
while True: 
    msg = socket.recv_json() 
    print(msg) 

和两个不同的发布者实现。

有了这一个,订户接收消息:

import zmq 
from multiprocessing import Process 

class Sender(object): 

    def __init__(self): 
     self._context = zmq.Context() 
     pass 

    def run(self): 
     self._socket = self._context.socket(zmq.PUB) 
     self._socket.bind("tcp://127.0.0.1:5000") 
     seq_num = 0 
     while True: 
      msg = { "sequence": seq_num } 
      self._socket.send_json(msg) 
      seq_num += 1 

if __name__ == "__main__": 
    s = Sender() 
    p = Process(target=s.run) 
    p.start() 
    p.join() 

但与这一个(其中,唯一的区别是的socket的创建是在构造,而不是被所述run()类方法),订户不接收任何消息:

import zmq 
from multiprocessing import Process 

class Sender(object): 

    def __init__(self): 
     self._context = zmq.Context() 
     self._socket = self._context.socket(zmq.PUB) # <--------- 
     pass 

    def run(self): 
     self._socket.bind("tcp://127.0.0.1:5000") 
     seq_num = 0 
     while True: 
      msg = { "sequence": seq_num } 
      self._socket.send_json(msg) 
      seq_num += 1 

if __name__ == "__main__": 
    s = Sender() 
    p = Process(target=s.run) 
    p.start() 
    p.join() 

当我用threading.Thread替换multiprocessing.Process时,这两个类都工作正常,但我没有在文档中找到任何解释。

回答

0

您正在一个进程中创建对象,并试图在另一个进程中执行该方法。

可能createmultiprocessing进程之间甚至share对象与multiprocessing.Manager但因为这个对象持有不可共享的资源(网络接口),你最好的工作进程中创建它,除非你想走走酸洗雷区,并用不可拣拾的田地来使用物体。