[zeromq-dev] Time-reordering queue

Francesco francesco.montorsi at gmail.com
Fri Nov 11 00:10:13 CET 2022


Hi Brett,
thanks, this is really very helpful!
I went through the README and the code... just one question:

As the very first paragraph indicates, "The streams need not be
synchronized in (real) time but must be strictly ordered in each
stream...."... I guess that, translating in ZMQ context, it means that the
data structure is born to sort (by time) packets received over a set of k
different SUB (or similar) sockets having each 1 endpoint; in such context
each ZMQ SUB socket would be 1 stream for the zipper. This is the only way
to satisfy the ordering criteria inside each stream, right?
I'm asking because in my case the application is using just 1 single SUB
that is connected with zmq_connect() to a number N of TCP endpoints. That
makes the order of packets obtained from  zmq_msg_recv()  not
time-ordered...

Thanks!!

Francesco


Il giorno gio 10 nov 2022 alle ore 11:12 Brett Viren <brett.viren at gmail.com>
ha scritto:

> Hi Francesco,
>
> I implemented such an algorithm in C++ which I call "zipper".
>
> The idea is simply to maintain a min-heap priority queue keyed on the
> timestamp and surround that with policy logic to decide when to push
> and pop based on examining the system clock.  I've implemented two
> policies.  Either a maximum latency bound is asserted at the cost of
> possible message loss or the merge is lossless at the risk of unbound
> latency.
>
> It is a rather simple pattern and this description alone may be enough
> to implement it yourself but you may also take a look at this repo
> with code, performance results and other docs.
>
> https://github.com/brettviren/zipper
>
> Though I failed to make it explicit, this code may be considered
> licensed under the LGPL.  Let me know if you wish to use the code and
> I'll add proper license info.
>
> The zipper.hpp implementation is in terms of C++ data objects and
> independent from zeromq per se (only needs C++ standard library).
> But, it was written with the assumption that it would be sandwiched
> between ZeroMQ input and output sockets.  Providing a layer to marshal
> data in to / out from the zipper is then the duty of the application.
>
> Note, my repo was for development purposes.  The zipper.hpp file was
> then copied into a production repository and that copy may have some
> bug fixes which I have not ported back to the stand-alone development
> version.  The production version is here:
>
> https://github.com/DUNE-DAQ/trigger/blob/develop/plugins/zipper.hpp
>
> -Brett.
>
> On Wed, Nov 9, 2022 at 5:20 PM Francesco <francesco.montorsi at gmail.com>
> wrote:
> >
> > Hi all,
> >
> > I have written two applications using ZMQ PUB-SUB pattern (over TCP
> transport).
> > The subscriber application has its SUB socket connected to multiple PUBs
> (multiple tcp endpoints). Each message sent by the PUB encodes the
> timestamp (as obtained from clock_gettime() syscall at TX side using
> monotonically increasing clock) of the event described by the ZMQ message.
> >
> > The subscriber needs to process the data stream _strictly_ in order.
> However the multiple publishers have no coordination and they will emit
> messages at different rates, each with its own timestamp. The only
> guarantee that I have, according to ZMQ docs, is that the SUB socket will
> perform "fair dequeueing", but that's not enough to guarantee that every
> zmq_msg_t received from the SUB socket will have a monotonically increasing
> timestamp: it depends on the filling level of the TCP rx/tx kernel buffers,
> the zmq HWMs, etc.
> >
> > For this reason I'm looking for some algorithm that
> > * allows me to push zmq_msg_t pulled out of the SUB socket (without
> strict time ordering)
> > * allows me to pull out zmq_msg_t that have a timestamp monotonically
> increasing
> > * introduces a fixed max latency of N msecs (configurable)
> >
> > Do you have any pointer for such kind of problem?
> > Anybody already hit a similar issue?
> >
> > Thanks for any help,
> >
> > Francesco Montorsi
> >
> > _______________________________________________
> > zeromq-dev mailing list
> > zeromq-dev at lists.zeromq.org
> > https://lists.zeromq.org/mailman/listinfo/zeromq-dev
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20221111/2f819ed8/attachment.htm>


More information about the zeromq-dev mailing list