[zeromq-dev] Single client multiple workers load balancing

joe meiring josephmeiring at gmail.com
Wed Oct 5 19:34:17 CEST 2016


yeah, I've seen that pattern before, I was just curious if there is a way
to async send the messages in batches to a load balancer, then wait till
all the replies come back or something.

cheers,
joe

On Wed, Oct 5, 2016 at 12:21 PM, Andrew Hume <andrew at humeweb.com> wrote:

> joe,
>
> i couldn’t see all your code, but the simple answer is you have an
> overseer process(or task) which
> has a bunch of work to do, and waits for workers to ask for work.
>
> each worker does a REQ/REP to teh overseer asking for a (few?) pieces of
> work; the reply contains the work.
> when the worker is done, it asks for more.
>
> in this way, each worker spends most of its time working, and the overseer
> spends most of
> its time waiting for someone to request work.
>
> is that what you wanted?
>
> andrew
>
> On Oct 5, 2016, at 8:06 AM, joe meiring <josephmeiring at gmail.com> wrote:
>
> Suppose I have one master process that divides up data to be processed in
> parallel. Lets say there are 1000 chunks of data and 100 nodes on which to
> run the computations. Say something simple, like an array of numbers and we
> want to square them.
>
> Is there some way to do REQ/REP to keep all the workers busy? I've tried
> to use the load balancer pattern in the guide but with a single client,
> sock.recv() is going to block until it receives its response from the
> worker.
>
> Is there a different pattern to use in these kind of situations?
>
> Here is the code, slightly modified from the zmq guide for a load
> balancer. Is starts up one client, 10 workers, and a load balancer/broker
> in the middle. How can I get all those workers working at the same time???
>
> from __future__ import print_functionfrom multiprocessing import Processimport zmqimport timeimport uuidimport random
> def client_task():
>     """Basic request-reply client using REQ socket."""
>     socket = zmq.Context().socket(zmq.REQ)
>     socket.identity = str(uuid.uuid4())
>     socket.connect("ipc://frontend.ipc")
>     # Send request, get reply
>     for i in range(100):
>         print("SENDING: ", i)
>         socket.send('WORK')
>         msg = socket.recv()
>         print(msg)
> def worker_task():
>     """Worker task, using a REQ socket to do load-balancing."""
>     socket = zmq.Context().socket(zmq.REQ)
>     socket.identity = str(uuid.uuid4())
>     socket.connect("ipc://backend.ipc")
>     # Tell broker we're ready for work
>     socket.send(b"READY")
>     while True:
>         address, empty, request = socket.recv_multipart()
>         time.sleep(random.randint(1, 4))
>         socket.send_multipart([address, b"", b"OK : " + str(socket.identity)])
>
> def broker():
>     context = zmq.Context()
>     frontend = context.socket(zmq.ROUTER)
>     frontend.bind("ipc://frontend.ipc")
>     backend = context.socket(zmq.ROUTER)
>     backend.bind("ipc://backend.ipc")
>     # Initialize main loop state
>     workers = []
>     poller = zmq.Poller()
>     # Only poll for requests from backend until workers are available
>     poller.register(backend, zmq.POLLIN)
>
>     while True:
>         sockets = dict(poller.poll())
>         if backend in sockets:
>             # Handle worker activity on the backend
>             request = backend.recv_multipart()
>             worker, empty, client = request[:3]
>             if not workers:
>                 # Poll for clients now that a worker is available
>                 poller.register(frontend, zmq.POLLIN)
>             workers.append(worker)
>             if client != b"READY" and len(request) > 3:
>                 # If client reply, send rest back to frontend
>                 empty, reply = request[3:]
>                 frontend.send_multipart([client, b"", reply])
>
>         if frontend in sockets:
>             # Get next client request, route to last-used worker
>             client, empty, request = frontend.recv_multipart()
>             worker = workers.pop(0)
>             backend.send_multipart([worker, b"", client, b"", request])
>             if not workers:
>                 # Don't poll clients if no workers are available
>                 poller.unregister(frontend)
>
>     # Clean up
>     backend.close()
>     frontend.close()
>     context.term()
> def main():
>     NUM_CLIENTS = 1
>     NUM_WORKERS = 10
>     # Start background tasks
>     def start(task, *args):
>         process = Process(target=task, args=args)
>         process.start()
>     start(broker)
>
>     for i in range(NUM_CLIENTS):
>         start(client_task)
>
>     for i in range(NUM_WORKERS):
>         start(worker_task)
>
>
>     # Process(target=broker).start()
>
>
>
> if __name__ == "__main__":
>     main()
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20161005/bcca0b35/attachment.htm>


More information about the zeromq-dev mailing list