[zeromq-dev] Publisher side filtering... (draft)

Gerard Toonstra gtoonstra at gmail.com
Fri Dec 3 14:18:43 CET 2010

Hi Martin,

I've continued a bit with the code to see other complexities. So far, I
managed to produce an implementation where, on a disconnect, the
SUB socket re-sends the subscription list, one by one. This seems to be
working. However, I did this by introducing a change in the socket_base.hpp,
where 4 methods were introduced:

        virtual void socket_connected( class reader_t *reader );
        virtual void socket_connected( class writer_t *writer );
        virtual void socket_disconnected( class reader_t *reader );
        virtual void socket_disconnected( class writer_t *writer );

And then in the "connect_session_t" add calls to
"socket-base->socket_connected/disconnected", so that socket-base
derivations can
be notified of changes at the real socket level (which currently is not
available?). Is there a better way to do this?

I found that the logical pipes (reader_t / writer_t) do not get events
during a platform socket disconnect/connect, so there are no events
available to detect disconnections at that level at this time.

This is necessary for the SUB, to actively resend connections when
disconnects are detected and also for the PUB, which should
discard subscriptions.

Now, the PUB socket lives in a broker in my case, but there's not a very
good mechanism to notify the broker daemon main process
of subscription changes, such that it can either forward its own pool of
subscriptions somewhere else for example.
I've temporarily used a very nasty method, which uses a callback function
living in the broker that can be installed using the
setsockopt functions. The PUB then calls this callback function when any
updates to the subscriptions occur on any given socket.
If there are better methods to do this, then I'm all ears.

It's not very effective to get the full list of subscriptions from the pub
every time something changes. For one, there is still no event to
indicate that a subscription has changed, which propagates upward to the
daemon. Second, there is no mechanism available to return
the subscription list using any of the standard zmq_ method calls. (other
than the use of a getsockopt and pointer to list or something.)
So even creating a "get_subscription_list" public method at the pub_t class,
there is still no way to call it from a daemon and I really
think one shouldn't want to do that.

The trie_t structure in use by the SUB is fine for checking, but when a
resend needs to be done it gets in the way. I don't understand the
code too well, so temporarily modified the trie_t to a std::set<std::string>
structure. So maybe trie_t can be modified to return this list
(or all topics one by one).

On Fri, Dec 3, 2010 at 12:48 PM, Martin Sustrik <sustrik at 250bpm.com> wrote:

> Hi Gerard,
> Checked your patch. Pieces are missing, but it's a nice start...
> Now the question is how can we proceed with incorporating
> subscription-forwarding-related code into the mainline in a gradual manner.
> What about say starting with making the pipes between PUB/SUB socket and
> the I/O thread bi-directional? At the moment the pipe in the opposite
> direction won't be used for anything, however, it would allow us to check
> whether there are no related problems (memleaks or such).

Sounds good, but if there's no code in place to interrogate the PUB reader,
this will probably cause bytes to pile up somewhere, possibly eventually
leading into a blocking write (kernel buffer full) or something of the
sorts. So just making the change only really makes sense when the
reads are taking place.

>  The pub socket should periodically check if
>>      anything new was written by subscribers, which indicate topic
>>      changes. At the moment, it checks fq_t whenever any message is
>>      sent by the pub socket. The "if (has_in())" should probably become
>>      a "while (fq.has_in())" call to process all subscription changes
>>      first. fq_t only verifies all currently 'active' sockets, so this
>>      should not be too expensive...?
> First of all we should decide where exactly should the filters reside. Keep
> in mind that there are 2 possible places for each socket -- the socket
> itself and the associated session living in the I/O thread. That gives 4
> possible places in simple PUB/SUB setup. If a device is placed in the
> middle, there are 8 possible places (1 socket in the publisher, 2 in the
> device and 1 in the subscriber).

Yes, so the semantics here change somewhat significantly. The 0MQ API was
all created on the assumption that you either send or receive
messages over some type of transport mechanism / protocol. With
subscriptions in the picture, there seems a need to communicate these
subscription changes at the socket level to daemons as well, such that they
can relay them onwards to other destinations.

Also, it will help a publisher process creating data when it has early
knowledge about the presence of listeners or not. It can then choose to
not produce messages of a certain type, yielding better usage of CPU, memory
and network.

Basically, this means that I think there's a need to define a mechanism in
which a list of all unique subscriptions on a PUB eventually find their
way to a process requiring such a socket. Ideally, this should be done on an
"event" basis, with subscription removals when sockets disconnect and
subscription additions when new ZMQ_SUBSCRIBEs come in.  (and keep count of
the number of those subscriptions currently active, because a single
SUBSCRIBE will already subscribe to something, but an unsubscribe on a
single channel should not necessarily lead to a removal in the daemon,
there may still be other channels having that subscription active ).

> My thinking at the moment is that there's should be a  filter at the
> ultimate publisher (not in the intermediary devices) to limit the overall
> number of messages as soon as possible and another filter at ultimate
> consumer (not in the intermediary devices) to filter out any stray messages
> that may be received because of delayed unsubscription.


> Aside of that there should be a "dispatcher" (as opposed to the filter) at
> PUB side of every intermediary device that would send a message only to the
> relevant subscribers.
Hmm... if a process gets notified of changes in subscriptions at the pub
socket level, then it can use this knowledge for optimization.
The naive approach is just to call "pub" anyway, since it will figure out if
there are any transmissions necessary on a channel for that pub.
But I suppose that having the full list of all known active subscriptions on
a PUB socket in the daemon can be used proactively already for optimization.

> The matter is complex. We should think about it in more detail, draw some
> diagrams etc.
>    2. The current implementation relies on the reader_t object from fq_t
>>      to find the identity, then find the writer socket through the
>>      identity and update the subscription list accordingly. There
>>      should be better ways to do this.
> Yes, presumably.

The session is probably the best place to keep things in:

1. It already exists and has access to reader, writer and socket_base
2. It's only useful at the PUB side (SUB needs no changes).
3. It's about the association between reading a subscription change on the
"in" channel and adjusting a different structure elsewhere.

So maybe the PUB can be modified to use a list of sessions for publications,
giving direct access to both
reader, writer and subscription list for write actions. This is at the
expense of having to go through a list of sessions when
activated/terminated events occur.

Also, somewhere a bit of state per session should be recorded for multipart
messages. The pub socket is used by sending messages one at a time.
If a message should not be relayed, then any multiparts that follow should
also be discarded, until the last multipart, then the state should be reset.

>    3. The pub socket now stores pipes three times, so there are three
>>      lists. One is to keep track of readers, so that subscription
>>      requests can be handled. This requires some correlation between
>>      the reader list and a writer list, so that the actual subscription
>>      list can be found and updated. I think a separate structure which
>>      associates the reader and writer in a single structure together
>>      with the subscription list and whatever else is needed for
>>      multipart functionality is a better way of doing this. This means
>>      that the class is essentially reimplemented. There's also a need
>>      for a separate list to keep track of all readers that have data to
>>      be read.
> Yes. Lot of work to do still.

I think the session list approach will reduce this already to one single
list at a slight expense of some extra
work in the activated/terminated overloaded functions. Since xsend is
probably the majority of calls here, that
sounds like a much better approach than traversing 3 lists simultaneously.

>    4. For subscriptions to be sent upstream, the sub should connect
>>      first. Subscribes that took place prior to connecting are not
>>      propagated yet. This should probably change.
> Yes. The connecting side should keep a list of subscriptions are re-issue
> them after each reconnection.

As stated first, I managed to cobble up an implementation that does this,
but requires events to be propagated from the platform socket
level up to the socket-base level.

Final complication is code to actually do the reading on the pub sockets,
ideally in an event-based way
by trapping events when data is ready to be read on the fq_t sockets. The
current implementation relies on
xsend being called on the pub to actually do something, which is both bad
for performance and because fq_t
will only process one message at a time in the "while" loop.

The latter is best explained by an example:

1. SUB x unsubscribes from topic A.
2. SUB x subscribes to topic A.
3. PUB daemon requests data to be published on topic A.
4. PUB socket processes subscriptions, finds unsubscribe.
5. PUB socket unsubscribes from topic A.
6. PUB socket writes data on A to all channels subscribed.
7. SUB x does not get data
8. On the next bit of data, the SUB x subscribe is found, the list is
updated again.
9. End result: one message is lost and extra bit of overhead of processing
subscriptions for every message published.

is there a better way to start executing code when data is ready to be read
by fq_t?


Gerard Toonstra
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20101203/471973ca/attachment.htm>

More information about the zeromq-dev mailing list