[zeromq-dev] Single client multiple workers load balancing
joe meiring
josephmeiring at gmail.com
Wed Oct 5 19:34:17 CEST 2016
yeah, I've seen that pattern before, I was just curious if there is a way
to async send the messages in batches to a load balancer, then wait till
all the replies come back or something.
cheers,
joe
On Wed, Oct 5, 2016 at 12:21 PM, Andrew Hume <andrew at humeweb.com> wrote:
> joe,
>
> i couldn’t see all your code, but the simple answer is you have an
> overseer process(or task) which
> has a bunch of work to do, and waits for workers to ask for work.
>
> each worker does a REQ/REP to teh overseer asking for a (few?) pieces of
> work; the reply contains the work.
> when the worker is done, it asks for more.
>
> in this way, each worker spends most of its time working, and the overseer
> spends most of
> its time waiting for someone to request work.
>
> is that what you wanted?
>
> andrew
>
> On Oct 5, 2016, at 8:06 AM, joe meiring <josephmeiring at gmail.com> wrote:
>
> Suppose I have one master process that divides up data to be processed in
> parallel. Lets say there are 1000 chunks of data and 100 nodes on which to
> run the computations. Say something simple, like an array of numbers and we
> want to square them.
>
> Is there some way to do REQ/REP to keep all the workers busy? I've tried
> to use the load balancer pattern in the guide but with a single client,
> sock.recv() is going to block until it receives its response from the
> worker.
>
> Is there a different pattern to use in these kind of situations?
>
> Here is the code, slightly modified from the zmq guide for a load
> balancer. Is starts up one client, 10 workers, and a load balancer/broker
> in the middle. How can I get all those workers working at the same time???
>
> from __future__ import print_functionfrom multiprocessing import Processimport zmqimport timeimport uuidimport random
> def client_task():
> """Basic request-reply client using REQ socket."""
> socket = zmq.Context().socket(zmq.REQ)
> socket.identity = str(uuid.uuid4())
> socket.connect("ipc://frontend.ipc")
> # Send request, get reply
> for i in range(100):
> print("SENDING: ", i)
> socket.send('WORK')
> msg = socket.recv()
> print(msg)
> def worker_task():
> """Worker task, using a REQ socket to do load-balancing."""
> socket = zmq.Context().socket(zmq.REQ)
> socket.identity = str(uuid.uuid4())
> socket.connect("ipc://backend.ipc")
> # Tell broker we're ready for work
> socket.send(b"READY")
> while True:
> address, empty, request = socket.recv_multipart()
> time.sleep(random.randint(1, 4))
> socket.send_multipart([address, b"", b"OK : " + str(socket.identity)])
>
> def broker():
> context = zmq.Context()
> frontend = context.socket(zmq.ROUTER)
> frontend.bind("ipc://frontend.ipc")
> backend = context.socket(zmq.ROUTER)
> backend.bind("ipc://backend.ipc")
> # Initialize main loop state
> workers = []
> poller = zmq.Poller()
> # Only poll for requests from backend until workers are available
> poller.register(backend, zmq.POLLIN)
>
> while True:
> sockets = dict(poller.poll())
> if backend in sockets:
> # Handle worker activity on the backend
> request = backend.recv_multipart()
> worker, empty, client = request[:3]
> if not workers:
> # Poll for clients now that a worker is available
> poller.register(frontend, zmq.POLLIN)
> workers.append(worker)
> if client != b"READY" and len(request) > 3:
> # If client reply, send rest back to frontend
> empty, reply = request[3:]
> frontend.send_multipart([client, b"", reply])
>
> if frontend in sockets:
> # Get next client request, route to last-used worker
> client, empty, request = frontend.recv_multipart()
> worker = workers.pop(0)
> backend.send_multipart([worker, b"", client, b"", request])
> if not workers:
> # Don't poll clients if no workers are available
> poller.unregister(frontend)
>
> # Clean up
> backend.close()
> frontend.close()
> context.term()
> def main():
> NUM_CLIENTS = 1
> NUM_WORKERS = 10
> # Start background tasks
> def start(task, *args):
> process = Process(target=task, args=args)
> process.start()
> start(broker)
>
> for i in range(NUM_CLIENTS):
> start(client_task)
>
> for i in range(NUM_WORKERS):
> start(worker_task)
>
>
> # Process(target=broker).start()
>
>
>
> if __name__ == "__main__":
> main()
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20161005/bcca0b35/attachment.htm>
More information about the zeromq-dev
mailing list