我试图修改一个zeromq example以处理后台任务并使其工作。特别是,我有一个xpub/xsub套接字设置,客户端将订阅发布者以接收工作人员的进度更新和结果。zmq/zeromq recv_multipart在大数据上挂起
worker_server.py
proxy = zmq.devices.ThreadDevice(zmq.QUEUE, zmq.XSUB, zmq.XPUB)
proxy.bind_in('tcp://127.0.0.1:5002')
proxy.bind_out('tcp://127.0.0.1:5003')
proxy.start()
client.py
ctx = zmq.Context()
socket = server.create_socket(ctx, 'sub')
socket.setsockopt(zmq.SUBSCRIBE, '')
poller = zmq.Poller()
print 'polling'
poller.register(socket, zmq.POLLIN)
ready = dict(poller.poll())
print 'polling done'
if ready and ready.has_key(socket):
job_id, code, result = socket.recv_multipart()
return {'status': code, 'data': result}
到目前为止,代码工作的小消息,但是当工作人员试图发布任务结果是大,35393030字节,客户端没有收到消息,代码挂在ready = dict(poller.poll())
现在,我刚开始学习使用zmq,但不是send_multipart应该chu消息?是什么原因造成的客户端没有收到效果
worker.py
def worker(logger_name, method, **task_kwargs):
job_id = os.getpid()
ctx = zmq.Context()
socket = create_socket(ctx, 'pub')
time.sleep(1)
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
sh = WSLoggingHandler(socket, job_id)
fh = logging.FileHandler(filename=os.path.join(tmp_folder, 'classifier.log.txt'), encoding='utf-8')
logger.addHandler(ch)
logger.addHandler(sh)
logger.addHandler(fh)
modules_arr = method.split('.')
m = __import__(".".join(modules_arr[:-1]), globals(), locals(), -1)
fn = getattr(m, modules_arr[-1])
try:
results = fn(**task_kwargs)
print 'size of data file %s' %len(results)
data = [
str(job_id),
SUCCESS_CODE,
results
]
tracker = socket.send_multipart(data)
print 'sent!!!'
except Exception, e:
print traceback.format_exc()
socket.send_multipart((
str(job_id),
ERROR_CODE,
str(e)
))
finally:
socket.close()
编辑: 尝试过手动分手了,结果成小块,但避风港取得了成功。
results = fn(**task_kwargs)
print 'size of data file %s' %len(results)
data = [
str(job_id),
SUCCESS_CODE,
] + [results[i: i + 20] for i in xrange(0, len(results), 20)]
print 'list size %s' %len(data)
tracker = socket.send_multipart(data)
print 'sent!!!'
尝试了你的建议,但没有改进。查看更新。 – goh 2015-04-06 03:01:18