[zeromq-dev] Time-reordering queue

Brett Viren brett.viren at gmail.com
Fri Nov 11 12:17:49 CET 2022


Hi Francesco,

It is up to the zipper-using application to satisfy the stream
"identity" and "ordering" requirements.  But, no restriction is placed
on how that is done and in particular you may have a pattern of N PUBs
sending to 1 SUB.  You may even allow each PUB to provide multiple
streams.

The only practical way I know to do this is to require the messages
sent from PUB to SUB to explicitly carry stream "identity" information
in some form.

If ZeroMQ provided sender identity for SUB messages like it does for
ROUTER and SERVER then we could exploit that but I don't know of any
method that provides this info.  Even with that the system would be
limited to each PUB providing at most a single identified stream.

-Brett.

On Thu, Nov 10, 2022 at 6:12 PM Francesco <francesco.montorsi at gmail.com> wrote:
>
> Hi Brett,
> thanks, this is really very helpful!
> I went through the README and the code... just one question:
>
> As the very first paragraph indicates, "The streams need not be synchronized in (real) time but must be strictly ordered in each stream...."... I guess that, translating in ZMQ context, it means that the data structure is born to sort (by time) packets received over a set of k different SUB (or similar) sockets having each 1 endpoint; in such context each ZMQ SUB socket would be 1 stream for the zipper. This is the only way to satisfy the ordering criteria inside each stream, right?
> I'm asking because in my case the application is using just 1 single SUB that is connected with zmq_connect() to a number N of TCP endpoints. That makes the order of packets obtained from  zmq_msg_recv()  not time-ordered...
>
> Thanks!!
>
> Francesco
>
>
> Il giorno gio 10 nov 2022 alle ore 11:12 Brett Viren <brett.viren at gmail.com> ha scritto:
>>
>> Hi Francesco,
>>
>> I implemented such an algorithm in C++ which I call "zipper".
>>
>> The idea is simply to maintain a min-heap priority queue keyed on the
>> timestamp and surround that with policy logic to decide when to push
>> and pop based on examining the system clock.  I've implemented two
>> policies.  Either a maximum latency bound is asserted at the cost of
>> possible message loss or the merge is lossless at the risk of unbound
>> latency.
>>
>> It is a rather simple pattern and this description alone may be enough
>> to implement it yourself but you may also take a look at this repo
>> with code, performance results and other docs.
>>
>> https://github.com/brettviren/zipper
>>
>> Though I failed to make it explicit, this code may be considered
>> licensed under the LGPL.  Let me know if you wish to use the code and
>> I'll add proper license info.
>>
>> The zipper.hpp implementation is in terms of C++ data objects and
>> independent from zeromq per se (only needs C++ standard library).
>> But, it was written with the assumption that it would be sandwiched
>> between ZeroMQ input and output sockets.  Providing a layer to marshal
>> data in to / out from the zipper is then the duty of the application.
>>
>> Note, my repo was for development purposes.  The zipper.hpp file was
>> then copied into a production repository and that copy may have some
>> bug fixes which I have not ported back to the stand-alone development
>> version.  The production version is here:
>>
>> https://github.com/DUNE-DAQ/trigger/blob/develop/plugins/zipper.hpp
>>
>> -Brett.
>>
>> On Wed, Nov 9, 2022 at 5:20 PM Francesco <francesco.montorsi at gmail.com> wrote:
>> >
>> > Hi all,
>> >
>> > I have written two applications using ZMQ PUB-SUB pattern (over TCP transport).
>> > The subscriber application has its SUB socket connected to multiple PUBs (multiple tcp endpoints). Each message sent by the PUB encodes the timestamp (as obtained from clock_gettime() syscall at TX side using monotonically increasing clock) of the event described by the ZMQ message.
>> >
>> > The subscriber needs to process the data stream _strictly_ in order. However the multiple publishers have no coordination and they will emit messages at different rates, each with its own timestamp. The only guarantee that I have, according to ZMQ docs, is that the SUB socket will perform "fair dequeueing", but that's not enough to guarantee that every zmq_msg_t received from the SUB socket will have a monotonically increasing timestamp: it depends on the filling level of the TCP rx/tx kernel buffers, the zmq HWMs, etc.
>> >
>> > For this reason I'm looking for some algorithm that
>> > * allows me to push zmq_msg_t pulled out of the SUB socket (without strict time ordering)
>> > * allows me to pull out zmq_msg_t that have a timestamp monotonically increasing
>> > * introduces a fixed max latency of N msecs (configurable)
>> >
>> > Do you have any pointer for such kind of problem?
>> > Anybody already hit a similar issue?
>> >
>> > Thanks for any help,
>> >
>> > Francesco Montorsi
>> >
>> > _______________________________________________
>> > zeromq-dev mailing list
>> > zeromq-dev at lists.zeromq.org
>> > https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev


More information about the zeromq-dev mailing list