[zeromq-dev] Implementing persistent queues

Brian Candler B.Candler at pobox.com
Thu Apr 15 17:52:31 CEST 2010

On Wed, Apr 14, 2010 at 05:21:22PM +0200, Pieter Hintjens wrote:
> Right now our simplest conceptual design is for point-to-point
> reliability, with store/forward handled in the sender as a separate
> thread or process.  You'll see the sketch on that blog page.

Sounds sensible to me.

> Our goal is to build this initially as a 0MQ application (so a
> different API) and later integrate that into the 0MQ core once it's
> clear what patterns work best.
> Thoughts?

Also sensible. Taking my RADIUS server example again, here are a few

* In the API, I want to be able to send a message and then either recv() or
get a callback when the message has been successfully delivered or queued. 
This is so that I can hold off sending my RADIUS Acct-Response until the
request has been safely processed or stored.

(RADIUS Accounting runs over UDP, and has a very simple assured delivery
mechanism; the NAS sends an Accounting-Request packet with an 8-bit sequence
number, and resends periodically until it gets the corresponding
Accounting-Response.  See RFC 2865/2866)

(A good way to build this is OpenRADIUS, which lets you plug in an external
process over stdin/stdout to receive and response to RADIUS messages)

* I think it should be possible to plug in queues where needed, without
changing the code at either end.  Consider a remote SQL writer process which
returns an ACK after committing to the database.  In the simplest case,
these ACKs could be used to return the RADIUS Acct-Responses to the NAS:

         +------------+           data            +----------+
NAS -----|-> RADIUS --|---------------------------|-> SQL    |--> DB
    <----|-- server <-|---------------------------|-- writer |
         +------------+           ack             +----------+

Then I would like to be able to decouple them by inserting a persistent
queue process, with no code changes.

         +----------------------+                 +----------+
NAS -----|-> RADIUS --> Queue --|-----------------|-> SQL    |
    <----|-- server <-- (disk)<-|-----------------|-- writer |
         +----------------------+                 +----------+

This ensures the NAS gets its Acct-Response quickly, even if the remote
SQL writer is down or unreachable.

I might also want to insert an inbound Queue at the SQL writer side too -
for example, to be able to batch up a number of RADIUS packets into a single
SQL update.

* So basically, adding a queue means that the protocol is unchanged but you
get your ACKs back quicker.

          message              message
       --------------> QUEUE ----------->
       <--------------       <-----------
           ack(1)                ack(2)

ack(1) can be returned once the message is queued, and ack(2) will cause it
to be removed from the queue.

* I would like the queue process to be able to perform batching: appending
multiple messages to a file, fsyncing it after a configured time (say
100ms), and then returning acks after the fsync.  This gives a potential
throughput of tens of thousands of messages per second even on a single
commodity hard drive (M10K), depending on what latency tradeoff you're
prepared to accept.

* There is an even bigger potential benefit from this batching. If the
message is forwarded before it is written to disk, and the ack comes back
before the timeout expires, then there is no need to write it to disk at
all! You thus get the benefit of persistent storage without any performance
penalty in the usual case.

* Ideally I'd like to get this benefit if the queues are chained too. The
message should propagate as far as it can and only get written to disk at
the last point.  This may be awkward to arrange if each queue has its own
independent timeout.

      ----------> QUEUE ----------> QUEUE ---------> receiver ---> SQL
      <----------       <----------       <--------- process
          ack               ack               ack

* In this application I don't actually care about in-order delivery, but
it may be simplest to implement it as such. Then all you need is a TCP-like
message sequence number and ack number, and you get duplicate-prevention

However this becomes complicated if there are multiple concurrent
connections between the same client-server pair, as I believe 0MQ
allows/encourages.  In that case perhaps a globally-unique message ID would
work better?

* If you have a queue feeding multiple recipients in a round-robin way, you
want to have some decent flow control - or be able to configure a window
size of 1 message.

                   ,------> rx1
    -------> QUEUE -------> rx2
                   `------> rx3

Consider that if you commit a large burst of messages to rx1, rx2 and rx3,
it might be that rx2 and rx3 become idle while rx1 is still working through
its backlog.

* Do we want an API to poll for messages in a queue, or should a queue
always "push" messages to the recipient(s)?

* We may want to attach a number of queues to a pub/sub stream, each with
its own filter. Similarly, the output of a queue may be a publish stream.

Sorry, that was more of a braindump than I was intending :-)



More information about the zeromq-dev mailing list