2015-04-02 749 views
1

我试图修改一个zeromq example以处理后台任务并使其工作。特别是,我有一个xpub/xsub套接字设置,客户端将订阅发布者以接收工作人员的进度更新和结果。zmq/zeromq recv_multipart在大数据上挂起

worker_server.py

proxy = zmq.devices.ThreadDevice(zmq.QUEUE, zmq.XSUB, zmq.XPUB) 
proxy.bind_in('tcp://127.0.0.1:5002') 
proxy.bind_out('tcp://127.0.0.1:5003') 
proxy.start() 

client.py

ctx = zmq.Context() 
socket = server.create_socket(ctx, 'sub') 
socket.setsockopt(zmq.SUBSCRIBE, '') 
poller = zmq.Poller() 
print 'polling' 
poller.register(socket, zmq.POLLIN) 
ready = dict(poller.poll()) 
print 'polling done' 
if ready and ready.has_key(socket): 
    job_id, code, result = socket.recv_multipart() 

return {'status': code, 'data': result} 

到目前为止,代码工作的小消息,但是当工作人员试图发布任务结果是大,35393030字节,客户端没有收到消息,代码挂在ready = dict(poller.poll())现在,我刚开始学习使用zmq,但不是send_multipart应该chu消息?是什么原因造成的客户端没有收到效果

worker.py

def worker(logger_name, method, **task_kwargs): 
    job_id = os.getpid() 

    ctx = zmq.Context() 
    socket = create_socket(ctx, 'pub') 
    time.sleep(1) 

    logger = logging.getLogger(logger_name) 
    logger.setLevel(logging.DEBUG) 

    ch = logging.StreamHandler() 
    sh = WSLoggingHandler(socket, job_id) 
    fh = logging.FileHandler(filename=os.path.join(tmp_folder, 'classifier.log.txt'), encoding='utf-8') 

    logger.addHandler(ch) 
    logger.addHandler(sh) 
    logger.addHandler(fh) 

    modules_arr = method.split('.') 
    m = __import__(".".join(modules_arr[:-1]), globals(), locals(), -1) 
    fn = getattr(m, modules_arr[-1]) 

    try: 
     results = fn(**task_kwargs) 
     print 'size of data file %s' %len(results) 
     data = [ 
      str(job_id), 
      SUCCESS_CODE, 
      results 
     ] 
     tracker = socket.send_multipart(data) 

     print 'sent!!!' 
    except Exception, e: 
     print traceback.format_exc() 
     socket.send_multipart((
      str(job_id), 
      ERROR_CODE, 
      str(e) 
     )) 
    finally: 
     socket.close() 

编辑: 尝试过手动分手了,结果成小块,但避风港取得了成功。

results = fn(**task_kwargs) 
    print 'size of data file %s' %len(results) 
    data = [ 
     str(job_id), 
     SUCCESS_CODE, 
    ] + [results[i: i + 20] for i in xrange(0, len(results), 20)] 
    print 'list size %s' %len(data) 
    tracker = socket.send_multipart(data) 

    print 'sent!!!' 

回答

0

从pyzmq文档: https://zeromq.github.io/pyzmq/api/zmq.html#zmq.Socket.send_multipart

msg_parts:迭代

对象序列来发送作为多部分消息。每个元素可以是任何可发送对象(帧,字节,缓冲区提供者)

消息不会自动分块,您传递的迭代中的每个元素都是块。所以你设置它的方式,你的所有结果数据将是一个块。您需要使用将您的结果分块为适当大小的迭代器。

+0

尝试了你的建议,但没有改进。查看更新。 – goh 2015-04-06 03:01:18