[zeromq-dev] Single client multiple workers load balancing

Andrew Hume andrew at humeweb.com
Wed Oct 5 19:21:35 CEST 2016


joe,

	i couldn’t see all your code, but the simple answer is you have an overseer process(or task) which
has a bunch of work to do, and waits for workers to ask for work.

	each worker does a REQ/REP to teh overseer asking for a (few?) pieces of work; the reply contains the work.
when the worker is done, it asks for more.

	in this way, each worker spends most of its time working, and the overseer spends most of
its time waiting for someone to request work.

	is that what you wanted?

			andrew

> On Oct 5, 2016, at 8:06 AM, joe meiring <josephmeiring at gmail.com> wrote:
> 
> Suppose I have one master process that divides up data to be processed in parallel. Lets say there are 1000 chunks of data and 100 nodes on which to run the computations. Say something simple, like an array of numbers and we want to square them. 
> 
> Is there some way to do REQ/REP to keep all the workers busy? I've tried to use the load balancer pattern in the guide but with a single client, sock.recv() is going to block until it receives its response from the worker. 
> 
> Is there a different pattern to use in these kind of situations? 
> 
> Here is the code, slightly modified from the zmq guide for a load balancer. Is starts up one client, 10 workers, and a load balancer/broker in the middle. How can I get all those workers working at the same time???
> 
> from __future__ import print_function
> from multiprocessing import Process
> import zmq
> import time
> import uuid
> import random
> 
> def client_task():
>     """Basic request-reply client using REQ socket."""
>     socket = zmq.Context().socket(zmq.REQ)
>     socket.identity = str(uuid.uuid4())
>     socket.connect("ipc://frontend.ipc")
>     # Send request, get reply
>     for i in range(100):
>         print("SENDING: ", i)
>         socket.send('WORK')
>         msg = socket.recv()
>         print(msg)
> 
> def worker_task():
>     """Worker task, using a REQ socket to do load-balancing."""
>     socket = zmq.Context().socket(zmq.REQ)
>     socket.identity = str(uuid.uuid4())
>     socket.connect("ipc://backend.ipc")
>     # Tell broker we're ready for work
>     socket.send(b"READY")
>     while True:
>         address, empty, request = socket.recv_multipart()
>         time.sleep(random.randint(1, 4))
>         socket.send_multipart([address, b"", b"OK : " + str(socket.identity)])
> 
> 
> def broker():
>     context = zmq.Context()
>     frontend = context.socket(zmq.ROUTER)
>     frontend.bind("ipc://frontend.ipc")
>     backend = context.socket(zmq.ROUTER)
>     backend.bind("ipc://backend.ipc")
>     # Initialize main loop state
>     workers = []
>     poller = zmq.Poller()
>     # Only poll for requests from backend until workers are available
>     poller.register(backend, zmq.POLLIN)
> 
>     while True:
>         sockets = dict(poller.poll())
>         if backend in sockets:
>             # Handle worker activity on the backend
>             request = backend.recv_multipart()
>             worker, empty, client = request[:3]
>             if not workers:
>                 # Poll for clients now that a worker is available
>                 poller.register(frontend, zmq.POLLIN)
>             workers.append(worker)
>             if client != b"READY" and len(request) > 3:
>                 # If client reply, send rest back to frontend
>                 empty, reply = request[3:]
>                 frontend.send_multipart([client, b"", reply])
> 
>         if frontend in sockets:
>             # Get next client request, route to last-used worker
>             client, empty, request = frontend.recv_multipart()
>             worker = workers.pop(0)
>             backend.send_multipart([worker, b"", client, b"", request])
>             if not workers:
>                 # Don't poll clients if no workers are available
>                 poller.unregister(frontend)
> 
>     # Clean up
>     backend.close()
>     frontend.close()
>     context.term()
> 
> def main():
>     NUM_CLIENTS = 1
>     NUM_WORKERS = 10
>     # Start background tasks
>     def start(task, *args):
>         process = Process(target=task, args=args)
>         process.start()
>     start(broker)
> 
>     for i in range(NUM_CLIENTS):
>         start(client_task)
> 
>     for i in range(NUM_WORKERS):
>         start(worker_task)
> 
> 
>     # Process(target=broker).start()
> 
> 
> 
> 
> if __name__ == "__main__":
>     main()
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20161005/289c18b4/attachment.htm>


More information about the zeromq-dev mailing list