[zeromq-dev] Single client multiple workers load balancing

joe meiring josephmeiring at gmail.com
Wed Oct 5 17:06:39 CEST 2016


Suppose I have one master process that divides up data to be processed in
parallel. Lets say there are 1000 chunks of data and 100 nodes on which to
run the computations. Say something simple, like an array of numbers and we
want to square them.

Is there some way to do REQ/REP to keep all the workers busy? I've tried to
use the load balancer pattern in the guide but with a single client,
sock.recv() is going to block until it receives its response from the
worker.

Is there a different pattern to use in these kind of situations?

Here is the code, slightly modified from the zmq guide for a load balancer.
Is starts up one client, 10 workers, and a load balancer/broker in the
middle. How can I get all those workers working at the same time???

from __future__ import print_functionfrom multiprocessing import
Processimport zmqimport timeimport uuidimport random
def client_task():
    """Basic request-reply client using REQ socket."""
    socket = zmq.Context().socket(zmq.REQ)
    socket.identity = str(uuid.uuid4())
    socket.connect("ipc://frontend.ipc")
    # Send request, get reply
    for i in range(100):
        print("SENDING: ", i)
        socket.send('WORK')
        msg = socket.recv()
        print(msg)
def worker_task():
    """Worker task, using a REQ socket to do load-balancing."""
    socket = zmq.Context().socket(zmq.REQ)
    socket.identity = str(uuid.uuid4())
    socket.connect("ipc://backend.ipc")
    # Tell broker we're ready for work
    socket.send(b"READY")
    while True:
        address, empty, request = socket.recv_multipart()
        time.sleep(random.randint(1, 4))
        socket.send_multipart([address, b"", b"OK : " + str(socket.identity)])

def broker():
    context = zmq.Context()
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("ipc://frontend.ipc")
    backend = context.socket(zmq.ROUTER)
    backend.bind("ipc://backend.ipc")
    # Initialize main loop state
    workers = []
    poller = zmq.Poller()
    # Only poll for requests from backend until workers are available
    poller.register(backend, zmq.POLLIN)

    while True:
        sockets = dict(poller.poll())
        if backend in sockets:
            # Handle worker activity on the backend
            request = backend.recv_multipart()
            worker, empty, client = request[:3]
            if not workers:
                # Poll for clients now that a worker is available
                poller.register(frontend, zmq.POLLIN)
            workers.append(worker)
            if client != b"READY" and len(request) > 3:
                # If client reply, send rest back to frontend
                empty, reply = request[3:]
                frontend.send_multipart([client, b"", reply])

        if frontend in sockets:
            # Get next client request, route to last-used worker
            client, empty, request = frontend.recv_multipart()
            worker = workers.pop(0)
            backend.send_multipart([worker, b"", client, b"", request])
            if not workers:
                # Don't poll clients if no workers are available
                poller.unregister(frontend)

    # Clean up
    backend.close()
    frontend.close()
    context.term()
def main():
    NUM_CLIENTS = 1
    NUM_WORKERS = 10
    # Start background tasks
    def start(task, *args):
        process = Process(target=task, args=args)
        process.start()
    start(broker)

    for i in range(NUM_CLIENTS):
        start(client_task)

    for i in range(NUM_WORKERS):
        start(worker_task)


    # Process(target=broker).start()



if __name__ == "__main__":
    main()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20161005/6ed140b1/attachment.htm>


More information about the zeromq-dev mailing list