[zeromq-dev] Single client multiple workers load balancing

Michal Vyskocil michal.vyskocil at gmail.com
Wed Oct 5 19:50:51 CEST 2016


Hi,

Take a look on malamute broker. The service patten looks exactly like what
you need. It's build on top of czmq, so fully compatible with zeromq.

There is only one limitation, broker don't handle a case when worker will
disappear. We know how to fix it, just no one did that.

See selftest in
https://github.com/zeromq/malamute/blob/master/src/mlm_client.c

Dne 5. 10. 2016 7:22 PM napsal uživatel "Andrew Hume" <andrew at humeweb.com>:

> joe,
>
> i couldn’t see all your code, but the simple answer is you have an
> overseer process(or task) which
> has a bunch of work to do, and waits for workers to ask for work.
>
> each worker does a REQ/REP to teh overseer asking for a (few?) pieces of
> work; the reply contains the work.
> when the worker is done, it asks for more.
>
> in this way, each worker spends most of its time working, and the overseer
> spends most of
> its time waiting for someone to request work.
>
> is that what you wanted?
>
> andrew
>
> On Oct 5, 2016, at 8:06 AM, joe meiring <josephmeiring at gmail.com> wrote:
>
> Suppose I have one master process that divides up data to be processed in
> parallel. Lets say there are 1000 chunks of data and 100 nodes on which to
> run the computations. Say something simple, like an array of numbers and we
> want to square them.
>
> Is there some way to do REQ/REP to keep all the workers busy? I've tried
> to use the load balancer pattern in the guide but with a single client,
> sock.recv() is going to block until it receives its response from the
> worker.
>
> Is there a different pattern to use in these kind of situations?
>
> Here is the code, slightly modified from the zmq guide for a load
> balancer. Is starts up one client, 10 workers, and a load balancer/broker
> in the middle. How can I get all those workers working at the same time???
>
> from __future__ import print_functionfrom multiprocessing import Processimport zmqimport timeimport uuidimport random
> def client_task():
>     """Basic request-reply client using REQ socket."""
>     socket = zmq.Context().socket(zmq.REQ)
>     socket.identity = str(uuid.uuid4())
>     socket.connect("ipc://frontend.ipc")
>     # Send request, get reply
>     for i in range(100):
>         print("SENDING: ", i)
>         socket.send('WORK')
>         msg = socket.recv()
>         print(msg)
> def worker_task():
>     """Worker task, using a REQ socket to do load-balancing."""
>     socket = zmq.Context().socket(zmq.REQ)
>     socket.identity = str(uuid.uuid4())
>     socket.connect("ipc://backend.ipc")
>     # Tell broker we're ready for work
>     socket.send(b"READY")
>     while True:
>         address, empty, request = socket.recv_multipart()
>         time.sleep(random.randint(1, 4))
>         socket.send_multipart([address, b"", b"OK : " + str(socket.identity)])
>
> def broker():
>     context = zmq.Context()
>     frontend = context.socket(zmq.ROUTER)
>     frontend.bind("ipc://frontend.ipc")
>     backend = context.socket(zmq.ROUTER)
>     backend.bind("ipc://backend.ipc")
>     # Initialize main loop state
>     workers = []
>     poller = zmq.Poller()
>     # Only poll for requests from backend until workers are available
>     poller.register(backend, zmq.POLLIN)
>
>     while True:
>         sockets = dict(poller.poll())
>         if backend in sockets:
>             # Handle worker activity on the backend
>             request = backend.recv_multipart()
>             worker, empty, client = request[:3]
>             if not workers:
>                 # Poll for clients now that a worker is available
>                 poller.register(frontend, zmq.POLLIN)
>             workers.append(worker)
>             if client != b"READY" and len(request) > 3:
>                 # If client reply, send rest back to frontend
>                 empty, reply = request[3:]
>                 frontend.send_multipart([client, b"", reply])
>
>         if frontend in sockets:
>             # Get next client request, route to last-used worker
>             client, empty, request = frontend.recv_multipart()
>             worker = workers.pop(0)
>             backend.send_multipart([worker, b"", client, b"", request])
>             if not workers:
>                 # Don't poll clients if no workers are available
>                 poller.unregister(frontend)
>
>     # Clean up
>     backend.close()
>     frontend.close()
>     context.term()
> def main():
>     NUM_CLIENTS = 1
>     NUM_WORKERS = 10
>     # Start background tasks
>     def start(task, *args):
>         process = Process(target=task, args=args)
>         process.start()
>     start(broker)
>
>     for i in range(NUM_CLIENTS):
>         start(client_task)
>
>     for i in range(NUM_WORKERS):
>         start(worker_task)
>
>
>     # Process(target=broker).start()
>
>
>
> if __name__ == "__main__":
>     main()
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20161005/f3257f7b/attachment.htm>


More information about the zeromq-dev mailing list