[zeromq-dev] Publisher side filtering... (draft)

Martin Sustrik sustrik at 250bpm.com
Sat Dec 4 09:47:06 CET 2010

Hi Gerard,

> Shortening the mail a bit in this process, comments inline...
> On Fri, Dec 3, 2010 at 6:03 PM, Martin Sustrik <sustrik at 250bpm.com
> <mailto:sustrik at 250bpm.com>> wrote:
>         And then in the "connect_session_t" add calls to
>         "socket-base->socket_connected/disconnected", so that socket-base
>         derivations can
>         be notified of changes at the real socket level (which currently
>         is not
>         available?). Is there a better way to do this?
>     I would say the list of subscription should be stored in the I/O
>     thread (session) rather than in the socket as such. That would allow
>     immediate re-subscription after the reconnect with no need to
>     synchronise between the application thread the socket lives in and
>     the I/O thread the session lives in.
> I was not familiar enough with 0mq to understand that the session_t
> lives in I/O threads.  Pushing this towards the session makes sense.
> The downside is that the session has no clue about semantics like the
> socket_bases have. Would this require subclassing the connect_session_t
> and then
> creating that in socket_base::connect() instead of the regular
> connect_session ?   or rely on the fact that lists are empty have
> nothing to do anyway?

Dunno. Maybe we will have to have socket-type-specific session classes 
in the future. At the moment I would just rely on the 'options.type' to 
find out what socket type the particular session belongs to.

Also note that aside of connect_session_t there are transient_session_t 
and named_session_t which should be taken care of as well. This can be 
done by modifying session_t instead which is a base class for all the three.

>         Now, the PUB socket lives in a broker in my case, but there's
>         not a very
>         good mechanism to notify the broker daemon main process
>         of subscription changes, ....<snip>
>     What I would suggest is separating end-to-end functionality from
>     hop-by-hop functionality in the same way as done with REQ/REP
>     (end-to-end) and XREQ/REP (hop-by-hop).
>     Thus we would have XPUB and XSUB sockets that would treat
>     subscriptions as standard messages, say "+A" message can mean
>     subscribe to topic 'A' and "-A" message can mean unsubscribe from
>     topic 'A'.
>     Then we would have standard PUB/SUB sockets (derived from XPUB/XSUB)
>     that would convert standard sockopts ZMQ_SUBSCRIBE and
>     ZMQ_UNSUBSCRIBE to messages.
>     That way, the intermediate devices (brokers) can use XPUB/XSUB and
>     simply resend the subscriptions upstream.
> That sounds like the best way of handling things that avoids the
> ugliness of callback functions and modifications to the core API.


> I'm not sure about the need for XPUB/XSUB  and how these differ from
> PUB/SUB, why they are needed.

It is to separate functionality that happens on each node (XPUB/XSUB) 
and the additional functionality that happens only on edges of the 
network (PUB/SUB).

See how XREQ/XREP and REQ/REP work to get an idea.

Alternatively, think of IP (every node) vs. TCP (endpoints only).

Example: XPUB returns received subscription to the user (aside from 
using it for its own purposes) so that it can be forwarded further, 
while PUB never returns a message. etc.

> Let's see if I understand this correctly:
> Subscriptions are generated by the SUB through the setsockopt call.
> These subscriptions are sent upstream to the PUB. When the PUB receives
> any message, it knows it's a subscription change. A device implementing
> a standard SUB/PUB combination receives subscription change message from
> its own PUB and relay these to any SUB sockets (setsockopt) it is
> maintaining. This causes subscriptions to be sent further upstream.
> The device will have to do some counting to correctly send unsubscribes.


> 1. Modify sub_t to store a list of sessions and allow bidi traffic

You don't really need a list of sessions. You have to enable outgoing 
pipes. Each outgoing pipe (writer_t) would lead to a session.

> 2. Modify sub_t to create and send a message when ZMQ_SUBSCRIBE or
> ZMQ_UNSUBSCRIBE is set and
>     for all sessions in the list add/remove the topic.


> 3. Modify pub_t to allow incoming messages.


> 4. Modify pub_t to maintain an array of sessions instead of an array of
> writer_t's. adjust terminated/activated implementation to seek session
> in list by writer. (session->writer is private though)

No need for array of sessions. What you need is array of <reader,writer> 
pairs (see xrep_t which already does that). Reader will be used to get 
subscriptions/unsubscriptions, writer will be used to send messages.

It would be good to abstract the storage of <reader,writer> pairs so 
that it can be used in both xrep_t and xpub_t.

> 5. The message (2) is sent upstream to the PUB

Yes. Standard way. No special code needed.

> 6. The PUB receives the message. Just before returning the message,
> update the session (available from array) with the subscription change.


What you presumably have in mind is that the subscription is stored in 
XPUB socket's matching engine before being returned to the user.

> 7. The daemon process (broker) receives the message and new subscribes
> are set on the SUB socket the broker implements (setsockopt).

The intermediate node (broker) would use XPUB/XSUB. Thus it doesn't have 
to mess with sockopt/message translations. It simply handles 
subscriptions as messages. I think we can copy the code from queue 
device to forwarder device literally to make it work.

>       Unsubscribes are only set (setsockopt on SUB) when a counter
> reaches 0 (no more subscribers).

This is an optimisation, but yes. The matching engine (in XPUB socket) 
has to store a reference count with each subscription and thus it can 
simply not forward the subscriptions that are already present. Other way 
round, it can send unsubscription only when reference count drops to zero.

> 8. Add session_t->filtered_write( ... )

For performance reasons there should be a single 'matching' engine in 
xpub_t rather than a filter in each session.

Jon Dyte have already done some work on implementing such a matching engine.

> When broker disconnects and reappears:
> 1. subscriber's SUB socket after reconnect resends subscription list
> from session_t (process_attached event).
>      ( can socket block on write? )

Yes, it can. What has to be done is to store the subscriptions to send 
in a special array and when protocol engine asks for data return next 
message from the array. When there are no more messages in the array the 
standard behaviour is resumed, i.e. messages are retrieved from the pipe 

> When sub disconnect happens:
> 1. PUB socket should generate unsubscribe messages from sublist in
> session for processing in daemon (broker).
>      How to get these messages from the session -> broker process?

What should happen is that session terminates. xpub_t on session 
termination (the <reader,writer> pair is being destroyed) removes the 
corresponding subscriptions from the matching engine. When the refcount 
for particular topic drops to zero, it sends an unsubscribe message to 
the user.

> Does this work equally well for zmq_poll ?

Yes. At this level, (un)subscriptions are just messages. There's nothing 
special about them.

>      ( SUBS with identity never seem to generate disconnect event in the
> pub_t ).

That's what they are for. These are "long-lived session" i.e. sessions 
that can survive a reconnection. It's OK to store messages while this 
kind of session is disconnected. They will be sent after reconnection 

> When PUB must publish a message:
> 1. The process calls "zmq_send( pub, ... )"
> 2. pub_t->xsend( .. ) gets called.
> 3. the implementation calls "session_t->filtered_write() " for each
> session in the array it knows about.

As already said, there should be a single matching engine in xpub_t 
instead of a filter in each session.

> 4. The session_t->filtered_write( ... ) verifies the subscription list
> and actually performs the write if the subscription is found.
> 5. After last multipart write, each session is flushed.

I would love to help with this effort. I don't have enough free time to 
do all of it, but I can at least put some infrastructure pieces in 
place. Say I can separate XPUB/XSUB from PUB/SUB. What do you think?

Also, there were couple of other people who already expressed their 
interest in helping with publisher-side subscriptions implementation. 
Anyone out there still interested?



More information about the zeromq-dev mailing list