[zeromq-dev] Not receiving unsubscription messages in XPUB socket with ZMQ_XPUB_VERBOSE when using a proxy

Pieter Hintjens ph at imatix.com
Mon Jul 20 20:43:55 CEST 2015


Hi Ricardo,

Thanks for this analysis and proposal. In general we welcome patches
(pull requests). There are some rules, which you can read. Above all,
don't break existing test cases, and if necessary, add new tests for
new features.

http://zeromq.org/docs:contributing

-Pieter

On Mon, Jul 20, 2015 at 2:31 PM, Ricardo Catalinas Jiménez
<jimenezrick at gmail.com> wrote:
> Hi,
>
> I wanted to propose a solution to this issue I just open:
>
> https://github.com/zeromq/libzmq/issues/1478
>
> Apologies because it's a very long description of the problem but I was
> a bit hard to explain without giving enough context and also I propose a
> solution that I could implement if people agree is the way to go.
>
> Basically it seems there is a bug when using ZMQ_XPUB_VERBOSE with XPUB
> sockets and it's specifically triggered when using a proxy in front of
> the producers, which is our use case.
>
> We're using ZMQ_XPUB_VERBOSE flag with XPUB sockets because we want to
> be notified whenever there is new consumers on certain topic. And we
> expect to receive a unsubscribe message when there aren't consumers of
> that topic any more.
>
> - Our set up like this:
>
>         :                               /--> SUB :
> App     : XPUB ---> XSUB/proxy/XPUB --> +--> SUB :     App
>         :                               \--> SUB :
>
> The bug is manifested in the next way: with the ZMQ_XPUB_VERBOSE you
> are expected to receive N subscription messages for N consumers
> listening certain topic, and to receive a single unsubscription message
> when the last of those consumers disconnects. But what really happens is
> that when using the proxy (without the proxy it works fine) is that that
> unsubscription message never arrives to the producer application.
>
> The problem lies in that we filter (un)subscription messages in both,
> xpub_t and xsub_t and this doesn't play well with the verbose flag and a
> proxy.
>
> The next diagram explains what happens with the verbose flag but without
> the proxy:
>
> - Here we have a direct connection between one publisher and three
> consumers. Each consumer will send a subscription message when they join
> the same topic and a unsubscription message when they stop following
> that topic. The application using the XPUB socket in verbose mode will
> see three subscription messages (1st frame \0x01) and only one
> unsubscription message (1st frame \0x00) for the last disconnect. This
> works as intended:
>
>         :                           /--> SUB :
> App     : XPUB -------------------> +--> SUB :     App
>         :                           \--> SUB :
>
> <-sub   : <---------------------------sub    :
> <-sub   : <---------------------------sub    :
> <-sub   : <---------------------------sub    :
>
>         : <---------------------------unsub  :
>         : <---------------------------unsub  :
> <-unsub : <---------------------------unsub  :
>
> (xpub_t filters all unsubcription msgs except the last one to the app)
>
> - The problematic scenario is when we put a proxy in the middle where we
> we also enable the verbose flag in its XPUB socket.
>
>         :                              /--> SUB :
> App     : XPUB ---> XSUB/proxy/XPUB -->+--> SUB :    App
>         :                              \--> SUB :
>
> <-sub   :     <-sub     <---sub    <-----sub    :
> <-sub   :     <-sub     <---sub    <-----sub    :
> <-sub   :     <-sub     <---sub    <-----sub    :
>
>         :                          <-----unsub  :
>         :                          <-----unsub  :
>         :               <---unsub  <-----unsub  :
>
> (xpub_t filters all unsubcription msgs except the last one in the proxy
> and also in the app, but the app never gets to see that unsubscription
> message)
>
> - The cause of this bug is:
>
> Basically, the XSUB socket in the proxy doesn't see enough
> unsubscription messages to consider that the last subscriber has been
> disconnected and propagate this to the XPUB in the publisher
> application. It's expecting to see three unsubscribe messages before
> passing the last one to the XPUB in the producer application.
>
> - Two possible solution for his bug:
>
> (a) Stop filtering in XSUB socket at all and rely on this in XPUB
>     sockets.  So that we modify the trie_t to not to have a refcnt field
>     and xsub_t forwards all the (un)subscription messages it received.
>
>     This is my preferred solution although it could break applications
>     that subscribe multiple times to the same topic and expect to stop
>     receiving messages only when they unsubscribe the same number of
>     times.  Although I'm not aware that this behavior is documented
>     which could mean it isn't really a problem.
>
> (b) Add a ZMQ_XSUB_VERBOSE flag specific flag to the XSUB socket to
>     forwards all the unsubscribe messages that should be enabled when
>     XSUB is used in a proxy.
>
>     I don't like the idea of adding a new flag, but if keeping the old
>     semantics is necessary, this might be the only way.
>
> - A bit of backgroun of the current logic implemented in libzmq 4.x:
>
> Both xpub_t and xsub_t contain a trie to keep track of the subscribers.
> And xpub_t only sends to the application a subscription message when the
> first subscriber connects to certain topic and an unsubscription message
> when the last subscriber disconnects from that topic. When the
> ZMQ_XPUB_VERBOSE is enabled, all subscription messages are passed to the
> application unfiltered.
>
> The funny bit is that xsub_t keeps track of the same on its own with
> another trie, but doesn't filter subscription messages at all, and
> sending only a unsubscription message when the last subscriber
> disconnects.
>
> The code snippets below shows the described logic of XSUB and XPUB:
>
> ---8< xsub.cpp ---------------------------------------------------------
> int zmq::xsub_t::xsend (msg_t *msg_)
> {
>     size_t size = msg_->size ();
>     unsigned char *data = (unsigned char *) msg_->data ();
>
>     if (size > 0 && *data == 1) {
>         //  Process subscribe message
>         //  This used to filter out duplicate subscriptions,
>         //  however this is alread done on the XPUB side and
>         //  doing it here as well breaks ZMQ_XPUB_VERBOSE
>         //  when there are forwarding devices involved.
>         subscriptions.add (data + 1, size - 1);
>         return dist.send_to_all (msg_);
>     }
>     else
>     if (size > 0 && *data == 0) {
>         //  Process unsubscribe message
>         if (subscriptions.rm (data + 1, size - 1))
>             return dist.send_to_all (msg_);
>     }
>     else
>         //  User message sent upstream to XPUB socket
>         return dist.send_to_all (msg_);
>
>     [...]
> }
> ------------------------------------------------------------------------
>
> ---8< xpub.cpp ---------------------------------------------------------
> void zmq::xpub_t::xread_activated (pipe_t *pipe_)
> {
>     //  There are some subscriptions waiting. Let's process them.
>     msg_t sub;
>     while (pipe_->read (&sub)) {
>         //  Apply the subscription to the trie
>         unsigned char *const data = (unsigned char *) sub.data ();
>         const size_t size = sub.size ();
>         if (size > 0 && (*data == 0 || *data == 1)) {
>
>                         [...]
>
>                         else
>                         {
>                                 bool unique;
>                                 if (*data == 0)
>                                         unique = subscriptions.rm(data + 1, size - 1, pipe_);
>                                 else
>                                         unique = subscriptions.add(data + 1, size - 1, pipe_);
>
>                                 //  If the subscription is not a duplicate store it so that it can be
>                                 //  passed to used on next recv call. (Unsubscribe is not verbose.)
>                                 if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) {
>                                         pending_data.push_back(blob_t(data, size));
>                                         pending_flags.push_back(0);
>                                 }
>                         }
>         }
>         else {
>
>                 [...]
>
>         }
>         sub.close ();
>     }
> }
> ------------------------------------------------------------------------
>
> /Ricardo
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev



More information about the zeromq-dev mailing list