[zeromq-dev] Publisher side filtering... (draft)

Martin Sustrik sustrik at 250bpm.com
Fri Dec 3 18:03:41 CET 2010

Hi Gerard,

> I've continued a bit with the code to see other complexities. So far, I
> managed to produce an implementation where, on a disconnect, the
> SUB socket re-sends the subscription list, one by one. This seems to be
> working. However, I did this by introducing a change in the socket_base.hpp,
> where 4 methods were introduced:
>          virtual void socket_connected( class reader_t *reader );
>          virtual void socket_connected( class writer_t *writer );
>          virtual void socket_disconnected( class reader_t *reader );
>          virtual void socket_disconnected( class writer_t *writer );
> And then in the "connect_session_t" add calls to
> "socket-base->socket_connected/disconnected", so that socket-base
> derivations can
> be notified of changes at the real socket level (which currently is not
> available?). Is there a better way to do this?

I would say the list of subscription should be stored in the I/O thread 
(session) rather than in the socket as such. That would allow immediate 
re-subscription after the reconnect with no need to synchronise between 
the application thread the socket lives in and the I/O thread the 
session lives in.

> I found that the logical pipes (reader_t / writer_t) do not get events
> during a platform socket disconnect/connect, so there are no events
> available to detect disconnections at that level at this time.
> This is necessary for the SUB, to actively resend connections when
> disconnects are detected and also for the PUB, which should
> discard subscriptions.

See above. By having the subscription list in I/O thread you don't need 
to send connection/disconnection events to the socket.

> Now, the PUB socket lives in a broker in my case, but there's not a very
> good mechanism to notify the broker daemon main process
> of subscription changes, such that it can either forward its own pool of
> subscriptions somewhere else for example.
> I've temporarily used a very nasty method, which uses a callback
> function living in the broker that can be installed using the
> setsockopt functions. The PUB then calls this callback function when any
> updates to the subscriptions occur on any given socket.
> If there are better methods to do this, then I'm all ears.

What I would suggest is separating end-to-end functionality from 
hop-by-hop functionality in the same way as done with REQ/REP 
(end-to-end) and XREQ/REP (hop-by-hop).

Thus we would have XPUB and XSUB sockets that would treat subscriptions 
as standard messages, say "+A" message can mean subscribe to topic 'A' 
and "-A" message can mean unsubscribe from topic 'A'.

Then we would have standard PUB/SUB sockets (derived from XPUB/XSUB) 
that would convert standard sockopts ZMQ_SUBSCRIBE and ZMQ_UNSUBSCRIBE 
to messages.

That way, the intermediate devices (brokers) can use XPUB/XSUB and 
simply resend the subscriptions upstream.

> It's not very effective to get the full list of subscriptions from the
> pub every time something changes. For one, there is still no event to
> indicate that a subscription has changed, which propagates upward to the
> daemon. Second, there is no mechanism available to return
> the subscription list using any of the standard zmq_ method calls.
> (other than the use of a getsockopt and pointer to list or something.)
> So even creating a "get_subscription_list" public method at the pub_t
> class, there is still no way to call it from a daemon and I really
> think one shouldn't want to do that.

See above. XPUB/XSUB should solve the problem. Note that the 
subscriptions would be always sent one by one rather than in batches. 
When reconnection happens, all the subscriptions are sent, but it's just 
a sequence of standard simple subscriptions, nothing special.

> The trie_t structure in use by the SUB is fine for checking, but when a
> resend needs to be done it gets in the way. I don't understand the
> code too well, so temporarily modified the trie_t to a
> std::set<std::string> structure. So maybe trie_t can be modified to
> return this list
> (or all topics one by one).

It can, but it's probably not needed. The trie (filter) is needed at the 
terminal subscriber, ie. sub_t (to discard delayed non-matching 
messages) while the list of topics should be re-sent by session. Thus, 
it looks like you need to store the list of subscriptions twice: once in 
sub_t (in trie_t) and once in session (simple list).

> On Fri, Dec 3, 2010 at 12:48 PM, Martin Sustrik <sustrik at 250bpm.com
> <mailto:sustrik at 250bpm.com>> wrote:
>     Hi Gerard,
>     Checked your patch. Pieces are missing, but it's a nice start...
>     Now the question is how can we proceed with incorporating
>     subscription-forwarding-related code into the mainline in a gradual
>     manner.
>     What about say starting with making the pipes between PUB/SUB socket
>     and the I/O thread bi-directional? At the moment the pipe in the
>     opposite direction won't be used for anything, however, it would
>     allow us to check whether there are no related problems (memleaks or
>     such).
> Sounds good, but if there's no code in place to interrogate the PUB
> reader, this will probably cause bytes to pile up somewhere, possibly
> eventually
> leading into a blocking write (kernel buffer full) or something of the
> sorts. So just making the change only really makes sense when the
> reads are taking place.

What I meant was just to establish the pipe in the opposite direction 
and don't write or read from it. Pushing that kind of patch to the 
master would make all the people working with master to get the change 
and report eventual problems.

>         The pub socket should periodically check if
>               anything new was written by subscribers, which indicate topic
>               changes. At the moment, it checks fq_t whenever any message is
>               sent by the pub socket. The "if (has_in())" should
>         probably become
>               a "while (fq.has_in())" call to process all subscription
>         changes
>               first. fq_t only verifies all currently 'active' sockets,
>         so this
>               should not be too expensive...?
>     First of all we should decide where exactly should the filters
>     reside. Keep in mind that there are 2 possible places for each
>     socket -- the socket itself and the associated session living in the
>     I/O thread. That gives 4 possible places in simple PUB/SUB setup. If
>     a device is placed in the middle, there are 8 possible places (1
>     socket in the publisher, 2 in the device and 1 in the subscriber).
> Yes, so the semantics here change somewhat significantly. The 0MQ API
> was all created on the assumption that you either send or receive
> messages over some type of transport mechanism / protocol. With
> subscriptions in the picture, there seems a need to communicate these
> subscription changes at the socket level to daemons as well, such that
> they can relay them onwards to other destinations.

Yes. This can be solved by "subscription is a message" paradigm as 
outlined above.

> Also, it will help a publisher process creating data when it has early
> knowledge about the presence of listeners or not. It can then choose to
> not produce messages of a certain type, yielding better usage of CPU,
> memory and network.

Exactly. That's the main point: Push the subscription as far upstream as 
possible. Forwarding the subscription just a single hop upstream would 
be insufficient in large topologies.

> Basically, this means that I think there's a need to define a mechanism
> in which a list of all unique subscriptions on a PUB eventually find their
> way to a process requiring such a socket. Ideally, this should be done
> on an "event" basis, with subscription removals when sockets disconnect and
> subscription additions when new ZMQ_SUBSCRIBEs come in.  (and keep count
> of the number of those subscriptions currently active, because a single
> SUBSCRIBE will already subscribe to something, but an unsubscribe on a
> single channel should not necessarily lead to a removal in the daemon,
> because
> there may still be other channels having that subscription active ).
>     My thinking at the moment is that there's should be a  filter at the
>     ultimate publisher (not in the intermediary devices) to limit the
>     overall number of messages as soon as possible and another filter at
>     ultimate consumer (not in the intermediary devices) to filter out
>     any stray messages that may be received because of delayed
>     unsubscription.
> agreed.
>     Aside of that there should be a "dispatcher" (as opposed to the
>     filter) at PUB side of every intermediary device that would send a
>     message only to the relevant subscribers.
> Hmm... if a process gets notified of changes in subscriptions at the pub
> socket level, then it can use this knowledge for optimization.
> The naive approach is just to call "pub" anyway, since it will figure
> out if there are any transmissions necessary on a channel for that pub.
> But I suppose that having the full list of all known active
> subscriptions on a PUB socket in the daemon can be used proactively
> already for optimization.

What I meant is that when the subscription matching is moved to the 
publisher, the publisher has to do something more complex than simple 

Say it has 2 connected subscribers. One of them is subscribed for topics 
"A" and "B", another one is subcribed for "B" and "C". Publisher should 
take each message and decide whether to drop it, send it first 
subscriber, send it to second subscriber or send it to both.

In short, the result from the matching function is a list of peers to 
send the message to rather than simple true/false as is the case with 
subscriber-side filtering.

>     The matter is complex. We should think about it in more detail, draw
>     some diagrams etc.
>            2. The current implementation relies on the reader_t object
>         from fq_t
>               to find the identity, then find the writer socket through the
>               identity and update the subscription list accordingly. There
>               should be better ways to do this.
>     Yes, presumably.
> The session is probably the best place to keep things in:
> 1. It already exists and has access to reader, writer and socket_base
> structure.
> 2. It's only useful at the PUB side (SUB needs no changes).
> 3. It's about the association between reading a subscription change on
> the "in" channel and adjusting a different structure elsewhere.
> So maybe the PUB can be modified to use a list of sessions for
> publications, giving direct access to both
> reader, writer and subscription list for write actions. This is at the
> expense of having to go through a list of sessions when
> activated/terminated events occur.

Sorry, I don't follow here...

> Also, somewhere a bit of state per session should be recorded for
> multipart messages. The pub socket is used by sending messages one at a
> time.
> If a message should not be relayed, then any multiparts that follow
> should also be discarded, until the last multipart, then the state
> should be reset.

Yes, the bit should be collocated with the filtering engine.

>            3. The pub socket now stores pipes three times, so there are
>         three
>               lists. One is to keep track of readers, so that subscription
>               requests can be handled. This requires some correlation
>         between
>               the reader list and a writer list, so that the actual
>         subscription
>               list can be found and updated. I think a separate
>         structure which
>               associates the reader and writer in a single structure
>         together
>               with the subscription list and whatever else is needed for
>               multipart functionality is a better way of doing this.
>         This means
>               that the class is essentially reimplemented. There's also
>         a need
>               for a separate list to keep track of all readers that have
>         data to
>               be read.
>     Yes. Lot of work to do still.
> I think the session list approach will reduce this already to one single
> list at a slight expense of some extra
> work in the activated/terminated overloaded functions. Since xsend is
> probably the majority of calls here, that
> sounds like a much better approach than traversing 3 lists simultaneously.

I don't really follow. Are you suggesting the socket should access the 
session it owns directly? To make things clear:

1. Socket can't access the session directly because it lives in another 
thread and there's no locking in place.

2. Socket can send a command to session though.

3. Still, the only entity to be passed AFAIU is a subscription. Thinking 
in terms of "subscription is a message" paradigm, you can pass the 
subscription via a pipe. No additional commands are needed.

>            4. For subscriptions to be sent upstream, the sub should connect
>               first. Subscribes that took place prior to connecting are not
>               propagated yet. This should probably change.
>     Yes. The connecting side should keep a list of subscriptions are
>     re-issue them after each reconnection.
> As stated first, I managed to cobble up an implementation that does
> this, but requires events to be propagated from the platform socket
> level up to the socket-base level.

I would store the subscription list in the session as explained earlier.

> Final complication is code to actually do the reading on the pub
> sockets, ideally in an event-based way
> by trapping events when data is ready to be read on the fq_t sockets.
> The current implementation relies on
> xsend being called on the pub to actually do something, which is both
> bad for performance and because fq_t
> will only process one message at a time in the "while" loop.
> The latter is best explained by an example:
> 1. SUB x unsubscribes from topic A.
> 2. SUB x subscribes to topic A.
> 3. PUB daemon requests data to be published on topic A.
> 4. PUB socket processes subscriptions, finds unsubscribe.
> 5. PUB socket unsubscribes from topic A.
> 6. PUB socket writes data on A to all channels subscribed.
> 7. SUB x does not get data
> 8. On the next bit of data, the SUB x subscribe is found, the list is
> updated again.
> 9. End result: one message is lost and extra bit of overhead of
> processing subscriptions for every message published.

What's the problem with that? SUB x have unsubscribed for a while so it 
should expect some messages may be missing.


More information about the zeromq-dev mailing list