[zeromq-dev] Time-reordering queue

Jeff Shanab jshanab at jfs-tech.com
Fri Nov 11 00:23:02 CET 2022


This topic reminds me of how rtsp/rtp streams operate.
In that case it is inefficient to try and reorder on the send side so the
client(s) reorder and toss if not recoverable within a window.
each packet has a monotonically increasing seq number. The reorder buffer
is in the client.

This is especially important in the multicast case where single sender
multiple subscribers, You cannot have one bad subscriber bring down
everyone else.

On Thu, Nov 10, 2022 at 6:11 PM Francesco <francesco.montorsi at gmail.com>
wrote:

> Hi Brett,
> thanks, this is really very helpful!
> I went through the README and the code... just one question:
>
> As the very first paragraph indicates, "The streams need not be
> synchronized in (real) time but must be strictly ordered in each
> stream...."... I guess that, translating in ZMQ context, it means that the
> data structure is born to sort (by time) packets received over a set of k
> different SUB (or similar) sockets having each 1 endpoint; in such context
> each ZMQ SUB socket would be 1 stream for the zipper. This is the only way
> to satisfy the ordering criteria inside each stream, right?
> I'm asking because in my case the application is using just 1 single SUB
> that is connected with zmq_connect() to a number N of TCP endpoints. That
> makes the order of packets obtained from  zmq_msg_recv()  not
> time-ordered...
>
> Thanks!!
>
> Francesco
>
>
> Il giorno gio 10 nov 2022 alle ore 11:12 Brett Viren <
> brett.viren at gmail.com> ha scritto:
>
>> Hi Francesco,
>>
>> I implemented such an algorithm in C++ which I call "zipper".
>>
>> The idea is simply to maintain a min-heap priority queue keyed on the
>> timestamp and surround that with policy logic to decide when to push
>> and pop based on examining the system clock.  I've implemented two
>> policies.  Either a maximum latency bound is asserted at the cost of
>> possible message loss or the merge is lossless at the risk of unbound
>> latency.
>>
>> It is a rather simple pattern and this description alone may be enough
>> to implement it yourself but you may also take a look at this repo
>> with code, performance results and other docs.
>>
>> https://github.com/brettviren/zipper
>>
>> Though I failed to make it explicit, this code may be considered
>> licensed under the LGPL.  Let me know if you wish to use the code and
>> I'll add proper license info.
>>
>> The zipper.hpp implementation is in terms of C++ data objects and
>> independent from zeromq per se (only needs C++ standard library).
>> But, it was written with the assumption that it would be sandwiched
>> between ZeroMQ input and output sockets.  Providing a layer to marshal
>> data in to / out from the zipper is then the duty of the application.
>>
>> Note, my repo was for development purposes.  The zipper.hpp file was
>> then copied into a production repository and that copy may have some
>> bug fixes which I have not ported back to the stand-alone development
>> version.  The production version is here:
>>
>> https://github.com/DUNE-DAQ/trigger/blob/develop/plugins/zipper.hpp
>>
>> -Brett.
>>
>> On Wed, Nov 9, 2022 at 5:20 PM Francesco <francesco.montorsi at gmail.com>
>> wrote:
>> >
>> > Hi all,
>> >
>> > I have written two applications using ZMQ PUB-SUB pattern (over TCP
>> transport).
>> > The subscriber application has its SUB socket connected to multiple
>> PUBs (multiple tcp endpoints). Each message sent by the PUB encodes the
>> timestamp (as obtained from clock_gettime() syscall at TX side using
>> monotonically increasing clock) of the event described by the ZMQ message.
>> >
>> > The subscriber needs to process the data stream _strictly_ in order.
>> However the multiple publishers have no coordination and they will emit
>> messages at different rates, each with its own timestamp. The only
>> guarantee that I have, according to ZMQ docs, is that the SUB socket will
>> perform "fair dequeueing", but that's not enough to guarantee that every
>> zmq_msg_t received from the SUB socket will have a monotonically increasing
>> timestamp: it depends on the filling level of the TCP rx/tx kernel buffers,
>> the zmq HWMs, etc.
>> >
>> > For this reason I'm looking for some algorithm that
>> > * allows me to push zmq_msg_t pulled out of the SUB socket (without
>> strict time ordering)
>> > * allows me to pull out zmq_msg_t that have a timestamp monotonically
>> increasing
>> > * introduces a fixed max latency of N msecs (configurable)
>> >
>> > Do you have any pointer for such kind of problem?
>> > Anybody already hit a similar issue?
>> >
>> > Thanks for any help,
>> >
>> > Francesco Montorsi
>> >
>> > _______________________________________________
>> > zeromq-dev mailing list
>> > zeromq-dev at lists.zeromq.org
>> > https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20221110/7391405c/attachment.htm>


More information about the zeromq-dev mailing list