[zeromq-dev] Thinking out loud ...

Henry Baragar Henry.Baragar at instantiated.ca
Fri Jun 24 18:11:13 CEST 2011

I have followed the 0MQ mailing list for about a year, experimented with 0MQ 
and contributed to the 0MQ adaptor for plack.  I like many of the features of 
0MQ, including asynchronous I/O, multi-language support, fan-out/fan-in 
connections and end-point connection syntax.  But there are a number of things 
that I find frustrating and that hinder my use of 0MQ for more applications, 
the "slow joiner" syndrome, which limits the effectiveness of dynamically 
adding new workers when its observed that the current set of workers needs 
help handling the load (The Guide: Getting the Message Out)
push sockets that block when there are no connections, preventing a ventilator 
from doing useful work while waiting for connections
devices (e.g. routers) that ruin the fair-queuing properties, resulting in the 
possibility that workers connected to a device may be idle while workers 
directly connected to the original source are over-burdened (i.e. have 
messages sitting in their queues)
High Water Mark (HWM) configuration that offers little protection from memory 
exhaustion when the number of anticipated connections is unknown (The Guide: 
(Semi-)Durable Subscribers and High-Water Marks)
Message loss (due to network I/O buffers) even when using durable sockets (The 
Guide: Transient vs. Durable Sockets)
It seems to me that this non-determinism arises from 0MQ using a "push" model 
at its core, where 0MQ tries to push messages out as quickly as possible.

I wonder how things would change if 0MQ used a "pull" model instead?  Here is 
what I mean by a pull model:
Every socket has a queue (SEND, RECV, or both) of configurable length
A producer application continues to generate messages as long as its SEND 
socket queue is not full: the producer blocks when the queue is full and does 
not unblock until a message is delivered to a consumer
When a RECV socket starts up, it asks its connected producers for exactly as 
many messages as would fill up its RECV queue
I don't know enough about 0MQ to determine if this idea could be incorporated 
into 0MQ, or if it describes something else.  Please read on to see how this 
change could alleviate a lot of fustrations, both for me and for others.

The PUB/SUB scenario is probably the easiest one to describe:
A publisher application sets up a PUB socket with a queue long enough to cover 
subscriber start up times and acceptable subscriber outages.  The subscriber 
continously writes messages to the PUB socket:  when the queue gets full, the 
socket removes the oldest message to make room for a new message.  The 
publisher is never blocked and never faces memory exhaustion (unless too large 
of a queue was specified)
A subscriber application connects to the publisher (passing in its filters) 
and the SUB socket asks for its queue length number of messages from the 
publisher.  Later, when space becomes available on its queue, the SUB socket 
sends the message number of the most recent message on its queue (to the PUB 
socket) and asks for the next (filtered) message with a higher number.  If the 
message number sent by the SUB socket refers to a message that no longer is in 
the PUB sockets queue, then the SUB socket can be notified that messages (may) 
have been missed:  the PUB queue may need to be lengthened if this happens too 
A proxy (a SUB/PUB device) is a very simple device: it has a SUB socket with a 
very short queue (possibly 0 length) connected to a PUB socket queue with 
length set as desired (probably set to the same length used for the 
publisher's PUB socket).  From the subscribers point of view, it can't tell 
the difference between the proxy and the publisher (other than latency).  In 
fact, the subscriber's SUB socket could be connected to both the publisher and 
the proxy, and alternate getting messages from one and the other (provided 
both the publisher and proxy have the same session ID).
If a subscriber dies or is restarted, there are two possibilities.  If the 
subscriber "remembered" the number of the last message it processed (e.g. if 
it was written to disk), then it can pass this number to the SUB socket at 
creation time, which would use it as a starting point for building its queue.  
If the subscriber is restarted within the time period covered by the PUB 
sockets queue, then no messages would have been missed.  If the subscriber did 
not remember the message number, then the SUB socket would build its queue 
starting from the beginning.  This means that the subscriber could end up 
processing the same message twice.
If the publisher dies or is restarted, then the PUB socket queue would be lost 
(or not - see the next point) and there would be two possibilities.  If the 
publisher "remembered" the number of the last message it pushed onto the PUB 
socket, then it could continue publishing from where it left off (by passing 
the session ID and this number to the PUB socket at creation time).  If it did 
not remember the number of the last message, then the PUB socket would 
generate a new session ID and restart message numbering from 0.  When a SUB 
socket receives a message with a new session ID, a configuration setting would 
specifiy if it should keep the messages associated with the previous session 
ID, or discard them.
It is very simple to preserve a publisher's PUB socket queue across publisher 
restarts:  simply put a proxy in front of the publisher.  If the architecture 
must be resilient to a proxy device dying or being restarted, then put a 
second proxy in front of the publisher and configure subscribers to connect to 
both proxies.
I don't know how these ideas will work over (E)PGM.

The PUSH/PULL scenario is a little more complicated than the PUB/SUB scenario:
A ventilator application sets up a PUSH socket with an application specific 
queue size.  The ventilator pushes messages onto the socket until the queue is 
full, at which point it blocks, and then unblocks once a message is sent to a 
worker (or if messages have shelf lives, until a message expires).
A worker application sets up a PULL socket with a very short queue length, 
typically of length 1 or 0.  There is no waiting to co-ordinate with other 
workers, and because there is at most one message in the worker's queue, other 
workers can pick up other messages when they come on line, effectively 
preventing the "slow joiner" syndrome.
A worker application typically also sets up a PUSH socket with a queue length 
that is application specific.  If it is important to keep busy all the 
downstream workers (across all pipe lines), then the PUSH queue should be kept 
short so that the worker application blocks and forces the ventilator to 
distribute more work to other pipelines.  If this does not matter, then the 
queue length could be considerably longer.
A sink application sets up a PULL socket with an application specific queue 
size:  a short queue will regulate the message flow through the system 
(similar to the effect of setting a short queue on a worker's PUSH socket), 
whereas a long queue lets the upstream workers operate as quickly as possible.
A streamer (a PUSH/PULL device) is interesting because it has PUSH socket with 
a queue length 0 and PULL socket with a queue that varies in length depending 
on the number of outstanding upstream requests, which is the number of 
messages it requests from the ventilator (or upstream streamer).  Now, the 
ventilator's PUSH socket has the information necessary (the number of requests 
from each connection) to make intelligent decisions about how to distribute 
its messages, resulting in all workers getting their fair share of work (even 
when they are connected to the ventilator through a streamer or two).  In 
other words, a streamer device does not perterb the end-to-end fair-queuing 
Preserving message queues when components die or are restarted in this 
scenario is more complicated than it is for PUB/SUB.  It can be done by having 
sockets "shadow" the queues of sockets at the other end of their connections.  
For example, instead of discarding a message after it has been delivered 
downstream, a PUSH socket could move the message to a "pending delivery" queue 
(a shadow queue) which would hold the message until there is a confirmation 
that the message was removed from the PULL queue of a worker application 
(streamer devices are not considered workers). If the worker (or streamer) 
dies or is restarted, then the ventilator can resend the messages in the 
pending delivery shadow queue; if the worker is not restarted quickly enough 
(configurable to the application), then the PUSH socket will move the message 
out of the shadow queue back onto the PUSH queue for delivery to another 
worker.  On the flip side, if there is no worker (nor streamer) ready to 
receive a message on the ventilator's PUSH queue, then the PUSH socket selects 
the PULL socket most likely to request that message and sends it to that 
socket to be put on its "future delivery" shadow queue while leaving it on its 
PUSH queue (which could cause the ventilator to block):  if the PULL socket 
does not request a message before another PULL socket is ready, then the PUSH 
socket could send the message to another PULL socket and tell the first PULL 
socket to remove it from its future delivery queue; otherwise, if it is the 
first PULL socket to request another message, then the PUSH socket could tell 
it to take the message from its future delivery queue.  Now if a ventilator is 
restarted, and it has remembered the last message it sent to its PUSH socket, 
then the PUSH socket could recreate its queue based on information sent back 
from the PULL sockets that re-establish connections:  the messages from a 
connected PULL socket's queue would go onto the pending delivery queue and the 
messages from the PULL socket's future delivery queue would go onto the PUSH 
socket's (primary) queue.  If a streamer device is restarted, then it would 
recreate its state from re-established connections on both sides. 
Even with the shadow queues, it is possible to lose the message being worked 
on by a worker when it dies or is restarted.  If this situation is not 
tolerable, then messages could be kept on the ventilator's (and intermidiary 
streamer devices) pending delivery queue until the worker has explicitly 
confirmed that is has processed the message (for example, after it sends 
something on to the sink application).  This implies that the worker's PULL 
socket also has a "pending confirmation" queue (of length 1) to keep track of 
where to send the confirmation.
Note that even though a ventilator might have a small queue, its shadow queue 
grows with the number of workers.  However, it is a linear growth that should 
easily be handled by the ventilator.  If not, then the "pending delivery" 
could be configured to have a maximum size, in which case the PUSH socket 
would block when the queue reaches the maximum length.
Also note that this approach makes it easy for a PUSH socket to reassign a 
"pending delivery" message to another PULL socket if there is another PULL 
socket waiting for messages and the original PULL socket has not delivered the 
message to a worker within a specified period of time (as is done when a 
worker dies).
Fundamentally, I am suggesting all the PUSH/PULL sockets between the 
ventilator and the workers cooperatively maintain a single distributed queue 
that distributes work evenly across all workers and is resilient to component 
failures.  Similarly, there would be another distributed queue between the 
workers and the sink (and between workers and workers in a pipeline).

The REQ/REP scenario is similar to the PUSH/PULL scenario, but more 
A client application sets up a REQ socket that has an output queue of length 
0, and a input queue (called the "pending delivery" queue in the PUSH/PULL 
scenario) of length 1.  This implies that the client blocks until there is a 
server ready to take the request message.
A server application sets up REP sockets that has an input queue of medium 
length (application configurable) and a "future delivery" queue of length 0.  
If the client is connected to two or more servers, then the client will be 
informed of how many requests each server is prepared to handle, enabling the 
client to direct request messages to the server with the most available 
capacity. It also means that request messages could be lost if the client dies 
because there are no "future delivery" queues holding copies of request 
The difference between this scenario and the PUSH/PULL scenario is that the 
server attaches its response to the confirmation message sent back to the 
client's input queue.  This implies that the "pending confirmation" queues 
must be set up more like a PUSH queue (complete with confirmation pending 
shadow queues) to ensure that the response message does not get lost in its 
journey back to the client application.
A router (a REQ/REP device) behaves just like a streamer device that is 
configured to pass confirmation messages back upstream (because a confirmation 
message contains the response).  Also like the streamer, a router does not 
perterb the end-to-end fair queueing policy.
Notice that there is no need for XREQ and XREP separate sockets because their 
functionality already is included in these REQ and REP sockets.  This suggests 
that "single outstanding request" limitation for client applications could be 
removed (by increasing the size of the REP socket's output queue).
As discussed for the PUSH/PULL scenario, I am suggesting that all the REP/REQ 
sockets between the client and the server cooperatively maintain a single 
distributed queue that distributes work evenly across all servers and is 
resilient to component failures.

I have not analyzed the PAIR pattern because it is "a  low-level pattern for 
specific, advanced use-cases" (The Guide: Core Messaging Patterns).

I realize that moving to a "pull" model is a major change to the internal 
architecture of 0MQ and the suggestions here mean a change to the interface.  
But, I think that 0MQ would be easier to explain and more attractive to a 
wider audience with these changes.  My question is:  Is there any possibility 
of these suggestions making it into 0MQ?


Henry Baragar
Instantiated Software
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20110624/4d19209d/attachment.htm>

More information about the zeromq-dev mailing list