[zeromq-dev] trying to understand zeromq high water mark behaviour

Shaobo Hou shaobohou at gmail.com
Sun Jan 26 13:28:49 CET 2014


There is probably a very simple explanation, but I have been playing around 
with pyzmq and simple load balancing using HWM, and I don't quite 
understand some of the behaviour I am seeing.

(I previously posted this on stackoverflow late at night then realised it 
is probably more suited to this group)

I have set up a simple multithreading test, where clients connect to two 
workers via a ROUTER to DEALER pattern with HWM set to 1 on the backend. 
One of the worker is very fast and the other is very slow. There is only 
one client and all it does is spam 100 messages to the server using a 
DEALER socket. This generally seems to work and the faster worker processes 
many more messages than the slow worker.

However, even if I set the slow worker to be so slow, such that the fast 
worker should be able to process 99 messages before the slow worker 
finished even one, the slow worker still seems to receive at least 2 or 3 
messages.

Are high water mark behaviour inexact or am I missing something?


The server code is as follows:

import re, sys, time, string, zmq, threading, signal
>
> def worker_routine(worker_url, worker_id, context=None):
>     # socket to talk to dispatcher
>     context = context or zmq.Context.instance()
>     socket = context.socket(zmq.REP)
>     socket.set_hwm(1)
>     socket.connect(worker_url)
>     print "worker ", worker_id, " ready ..."
>     while True:
>         x = socket.recv()
>         if worker_id==1:
>             time.sleep(3)
>         print worker_id, x
>         sys.stdout.flush()
>         socket.send(b'world')
>
> context = zmq.Context().instance()
> # socket facing clients
> frontend = context.socket(zmq.ROUTER)
> frontend.bind("tcp://*:5559")
> # socket facing services
> backend  = context.socket(zmq.DEALER)
> url_worker = "inproc://workers"
> backend.set_hwm(1)
> backend.bind(url_worker)
> # launch pool of worker threads
> for i in range(2):
>     thread = threading.Thread(target=worker_routine, args=(url_worker,i,))
>     thread.start()
>     time.sleep(0.1)
> try:
>     zmq.device(zmq.QUEUE, frontend, backend)
> except:
>     print "terminating!"
> # we never get here
> frontend.close()
> backend.close()
> context.term()


 
The client code is as follows:

import zmq, random, string, time, threading, signal
>  
> # prepare our context and sockets
> context = zmq.Context()
> socket = context.socket(zmq.DEALER)
> socket.connect("tcp://localhost:5559")
>  
> for x in xrange(100):
>    socket.send_multipart([b'', str(x)])
>  
> print "finished!"




Example output:

...
> worker: 0    message: 80
> worker: 0    message: 81
> worker: 0    message: 82
> worker: 0    message: 83
> worker: 0    message: 84
> worker: 0    message: 85
> worker: 0    message: 86
> worker: 0    message: 87
> worker: 0    message: 88
> worker: 0    message: 89
> worker: 0    message: 90
> worker: 0    message: 91
> worker: 0    message: 92
> worker: 0    message: 93
> worker: 0    message: 94
> worker: 0    message: 95
> worker: 0    message: 96
> worker: 0    message: 97
> worker: 0    message: 98
> worker: 0    message: 99
> worker: 1    message: 1
> worker: 1    message: 3
> worker: 1    message: 6



Any help would be appreciated.

Shaobo
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20140126/4f783daf/attachment.htm>


More information about the zeromq-dev mailing list