2016-10-04 81 views
3

假设我有一个主进程将并行处理的数据分开。假设有1000个数据块和100个节点来运行计算。ZeroMQ:许多工人和一个主人的负载均衡

是否有某种方法可以使REQ/REP保持所有员工的工作繁忙?我试过在指南中使用负载平衡器模式,但使用单个客户端时,sock.recv()将阻塞,直到它收到工作人员的响应。

以下是代码,稍微修改了zmq指南中的负载均衡器。启动一名客户,10名员工,以及中间的负载均衡器/经纪人。我怎样才能让所有那些同时工作的员工?

from __future__ import print_function 
from multiprocessing import Process 
import zmq 
import time 
import uuid 
import random 

def client_task(): 
    """Basic request-reply client using REQ socket.""" 
    socket = zmq.Context().socket(zmq.REQ) 
    socket.identity = str(uuid.uuid4()) 
    socket.connect("ipc://frontend.ipc") 
    # Send request, get reply 
    for i in range(100): 
     print("SENDING: ", i) 
     socket.send('WORK') 
     msg = socket.recv() 
     print(msg) 

def worker_task(): 
    """Worker task, using a REQ socket to do load-balancing.""" 
    socket = zmq.Context().socket(zmq.REQ) 
    socket.identity = str(uuid.uuid4()) 
    socket.connect("ipc://backend.ipc") 
    # Tell broker we're ready for work 
    socket.send(b"READY") 
    while True: 
     address, empty, request = socket.recv_multipart() 
     time.sleep(random.randint(1, 4)) 
     socket.send_multipart([address, b"", b"OK : " + str(socket.identity)]) 


def broker(): 
    context = zmq.Context() 
    frontend = context.socket(zmq.ROUTER) 
    frontend.bind("ipc://frontend.ipc") 
    backend = context.socket(zmq.ROUTER) 
    backend.bind("ipc://backend.ipc") 
    # Initialize main loop state 
    workers = [] 
    poller = zmq.Poller() 
    # Only poll for requests from backend until workers are available 
    poller.register(backend, zmq.POLLIN) 

    while True: 
     sockets = dict(poller.poll()) 
     if backend in sockets: 
      # Handle worker activity on the backend 
      request = backend.recv_multipart() 
      worker, empty, client = request[:3] 
      if not workers: 
       # Poll for clients now that a worker is available 
       poller.register(frontend, zmq.POLLIN) 
      workers.append(worker) 
      if client != b"READY" and len(request) > 3: 
       # If client reply, send rest back to frontend 
       empty, reply = request[3:] 
       frontend.send_multipart([client, b"", reply]) 

     if frontend in sockets: 
      # Get next client request, route to last-used worker 
      client, empty, request = frontend.recv_multipart() 
      worker = workers.pop(0) 
      backend.send_multipart([worker, b"", client, b"", request]) 
      if not workers: 
       # Don't poll clients if no workers are available 
       poller.unregister(frontend) 

    # Clean up 
    backend.close() 
    frontend.close() 
    context.term() 

def main(): 
    NUM_CLIENTS = 1 
    NUM_WORKERS = 10 
    # Start background tasks 
    def start(task, *args): 
     process = Process(target=task, args=args) 
     process.start() 
    start(broker) 

    for i in range(NUM_CLIENTS): 
     start(client_task) 

    for i in range(NUM_WORKERS): 
     start(worker_task) 


    # Process(target=broker).start() 




if __name__ == "__main__": 
    main() 

回答

1

我想有不同的方法来做到这一点:

- 你可以,例如,使用threading模块从单一的客户端启动所有请求,喜欢的东西:

result_list = [] # Add the result to a list for the example 
rlock = threading.RLock() 

def client_thread(client_url, request, i): 
    context = zmq.Context.instance() 
    socket = context.socket(zmq.REQ) 

    socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i)) 
    socket.connect(client_url) 

    socket.send(request.encode()) 
    reply = socket.recv() 

    with rlock: 
     result_list.append((i, reply)) 
    return 

def client_task(): 
    # tasks = list with all your tasks 
    url_client = "ipc://frontend.ipc" 
    threads = [] 
    for i in range(len(tasks)): 
     thread = threading.Thread(target=client_thread, 
            args=(url_client, tasks[i], i,)) 
     thread.start() 
     threads.append(thread) 

-you可以采取事件化文库的好处等asyncio(有一个子模块zmq.asyncio和其他库aiozmq,最后一个提供了更高的抽象级别)。在这种情况下,您将按顺序将您的请求发送给工作人员,但不会阻止每个响应(并且不会阻止主循环忙碌),并在返回到主循环时获得结果。这可能是这样的:

import asyncio 
import zmq.asyncio 

async def client_async(request, context, i, client_url): 
    """Basic client sending a request (REQ) to a ROUTER (the broker)""" 
    socket = context.socket(zmq.REQ) 
    socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i)) 
    socket.connect(client_url) 
    await socket.send(request.encode()) 
    reply = await socket.recv() 
    socket.close() 
    return reply 


async def run(loop): 
    # tasks = list full of tasks 
    url_client = "ipc://frontend.ipc" 
    asyncio_tasks = [] 
    ctx = zmq.asyncio.Context() 
    for i in range(len(tasks)): 
     task = asyncio.ensure_future(client_async(tasks[i], ctx, i, url_client)) 
     asyncio_tasks.append(task) 

    responses = await asyncio.gather(*asyncio_tasks) 
    return responses 

zmq.asyncio.install() 
loop = asyncio.get_event_loop() 
results = loop.run_until_complete(run(loop)) 

我没有测试论文两个片段,但他们都从码来(与修改,以适应问题)我已经不是你的问题了类似的配置中使用ZMQ。

+0

我最终在zmq指南中使用了“asynchronous”req/rep模式,最终解决了这个问题。中间代理的轮询比较理想,但对我的用例来说已经足够了 – reptilicus