[zeromq-dev] Publisher side filtering... (draft)

Gerard Toonstra gtoonstra at gmail.com
Sat Dec 4 00:02:30 CET 2010

Hi Martin,

Shortening the mail a bit in this process, comments inline...

On Fri, Dec 3, 2010 at 6:03 PM, Martin Sustrik <sustrik at 250bpm.com> wrote:

>> And then in the "connect_session_t" add calls to
>> "socket-base->socket_connected/disconnected", so that socket-base
>> derivations can
>> be notified of changes at the real socket level (which currently is not
>> available?). Is there a better way to do this?
> I would say the list of subscription should be stored in the I/O thread
> (session) rather than in the socket as such. That would allow immediate
> re-subscription after the reconnect with no need to synchronise between the
> application thread the socket lives in and the I/O thread the session lives
> in.

I was not familiar enough with 0mq to understand that the session_t lives in
I/O threads.  Pushing this towards the session makes sense.
The downside is that the session has no clue about semantics like the
socket_bases have. Would this require subclassing the connect_session_t and
creating that in socket_base::connect() instead of the regular
connect_session ?   or rely on the fact that lists are empty have nothing to
do anyway?

>  Now, the PUB socket lives in a broker in my case, but there's not a very
>> good mechanism to notify the broker daemon main process
>> of subscription changes, ....<snip>
> What I would suggest is separating end-to-end functionality from hop-by-hop
> functionality in the same way as done with REQ/REP (end-to-end) and XREQ/REP
> (hop-by-hop).
> Thus we would have XPUB and XSUB sockets that would treat subscriptions as
> standard messages, say "+A" message can mean subscribe to topic 'A' and "-A"
> message can mean unsubscribe from topic 'A'.
> Then we would have standard PUB/SUB sockets (derived from XPUB/XSUB) that
> would convert standard sockopts ZMQ_SUBSCRIBE and ZMQ_UNSUBSCRIBE to
> messages.
> That way, the intermediate devices (brokers) can use XPUB/XSUB and simply
> resend the subscriptions upstream.

That sounds like the best way of handling things that avoids the ugliness of
callback functions and modifications to the core API.

I'm not sure about the need for XPUB/XSUB  and how these differ from
PUB/SUB, why they are needed.

Let's see if I understand this correctly:

Subscriptions are generated by the SUB through the setsockopt call. These
subscriptions are sent upstream to the PUB. When the PUB receives
any message, it knows it's a subscription change. A device implementing a
standard SUB/PUB combination receives subscription change message from
its own PUB and relay these to any SUB sockets (setsockopt) it is
maintaining. This causes subscriptions to be sent further upstream.
The device will have to do some counting to correctly send unsubscribes.

1. Modify sub_t to store a list of sessions and allow bidi traffic
2. Modify sub_t to create and send a message when ZMQ_SUBSCRIBE or
   for all sessions in the list add/remove the topic.
3. Modify pub_t to allow incoming messages.
4. Modify pub_t to maintain an array of sessions instead of an array of
writer_t's. adjust terminated/activated implementation to seek session in
list by writer. (session->writer is private though)
5. The message (2) is sent upstream to the PUB
6. The PUB receives the message. Just before returning the message, update
the session (available from array) with the subscription change.
7. The daemon process (broker) receives the message and new subscribes are
set on the SUB socket the broker implements (setsockopt).
     Unsubscribes are only set (setsockopt on SUB) when a counter reaches 0
(no more subscribers).
8. Add session_t->filtered_write( ... )

When broker disconnects and reappears:

1. subscriber's SUB socket after reconnect resends subscription list from
session_t (process_attached event).
    ( can socket block on write? )

When sub disconnect happens:

1. PUB socket should generate unsubscribe messages from sublist in session
for processing in daemon (broker).
    How to get these messages from the session -> broker process?   Does
this work equally well for zmq_poll ?
    ( SUBS with identity never seem to generate disconnect event in the
pub_t ).

When PUB must publish a message:
1. The process calls "zmq_send( pub, ... )"
2. pub_t->xsend( .. ) gets called.
3. the implementation calls "session_t->filtered_write() " for each session
in the array it knows about.
4. The session_t->filtered_write( ... ) verifies the subscription list and
actually performs the write if the subscription is found.
5. After last multipart write, each session is flushed.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20101204/a5664b42/attachment.htm>

More information about the zeromq-dev mailing list