[zeromq-dev] Not receiving unsubscription messages in XPUB socket with ZMQ_XPUB_VERBOSE when using a proxy
Ricardo Catalinas Jiménez
jimenezrick at gmail.com
Mon Jul 20 14:31:17 CEST 2015
Hi,
I wanted to propose a solution to this issue I just open:
https://github.com/zeromq/libzmq/issues/1478
Apologies because it's a very long description of the problem but I was
a bit hard to explain without giving enough context and also I propose a
solution that I could implement if people agree is the way to go.
Basically it seems there is a bug when using ZMQ_XPUB_VERBOSE with XPUB
sockets and it's specifically triggered when using a proxy in front of
the producers, which is our use case.
We're using ZMQ_XPUB_VERBOSE flag with XPUB sockets because we want to
be notified whenever there is new consumers on certain topic. And we
expect to receive a unsubscribe message when there aren't consumers of
that topic any more.
- Our set up like this:
: /--> SUB :
App : XPUB ---> XSUB/proxy/XPUB --> +--> SUB : App
: \--> SUB :
The bug is manifested in the next way: with the ZMQ_XPUB_VERBOSE you
are expected to receive N subscription messages for N consumers
listening certain topic, and to receive a single unsubscription message
when the last of those consumers disconnects. But what really happens is
that when using the proxy (without the proxy it works fine) is that that
unsubscription message never arrives to the producer application.
The problem lies in that we filter (un)subscription messages in both,
xpub_t and xsub_t and this doesn't play well with the verbose flag and a
proxy.
The next diagram explains what happens with the verbose flag but without
the proxy:
- Here we have a direct connection between one publisher and three
consumers. Each consumer will send a subscription message when they join
the same topic and a unsubscription message when they stop following
that topic. The application using the XPUB socket in verbose mode will
see three subscription messages (1st frame \0x01) and only one
unsubscription message (1st frame \0x00) for the last disconnect. This
works as intended:
: /--> SUB :
App : XPUB -------------------> +--> SUB : App
: \--> SUB :
<-sub : <---------------------------sub :
<-sub : <---------------------------sub :
<-sub : <---------------------------sub :
: <---------------------------unsub :
: <---------------------------unsub :
<-unsub : <---------------------------unsub :
(xpub_t filters all unsubcription msgs except the last one to the app)
- The problematic scenario is when we put a proxy in the middle where we
we also enable the verbose flag in its XPUB socket.
: /--> SUB :
App : XPUB ---> XSUB/proxy/XPUB -->+--> SUB : App
: \--> SUB :
<-sub : <-sub <---sub <-----sub :
<-sub : <-sub <---sub <-----sub :
<-sub : <-sub <---sub <-----sub :
: <-----unsub :
: <-----unsub :
: <---unsub <-----unsub :
(xpub_t filters all unsubcription msgs except the last one in the proxy
and also in the app, but the app never gets to see that unsubscription
message)
- The cause of this bug is:
Basically, the XSUB socket in the proxy doesn't see enough
unsubscription messages to consider that the last subscriber has been
disconnected and propagate this to the XPUB in the publisher
application. It's expecting to see three unsubscribe messages before
passing the last one to the XPUB in the producer application.
- Two possible solution for his bug:
(a) Stop filtering in XSUB socket at all and rely on this in XPUB
sockets. So that we modify the trie_t to not to have a refcnt field
and xsub_t forwards all the (un)subscription messages it received.
This is my preferred solution although it could break applications
that subscribe multiple times to the same topic and expect to stop
receiving messages only when they unsubscribe the same number of
times. Although I'm not aware that this behavior is documented
which could mean it isn't really a problem.
(b) Add a ZMQ_XSUB_VERBOSE flag specific flag to the XSUB socket to
forwards all the unsubscribe messages that should be enabled when
XSUB is used in a proxy.
I don't like the idea of adding a new flag, but if keeping the old
semantics is necessary, this might be the only way.
- A bit of backgroun of the current logic implemented in libzmq 4.x:
Both xpub_t and xsub_t contain a trie to keep track of the subscribers.
And xpub_t only sends to the application a subscription message when the
first subscriber connects to certain topic and an unsubscription message
when the last subscriber disconnects from that topic. When the
ZMQ_XPUB_VERBOSE is enabled, all subscription messages are passed to the
application unfiltered.
The funny bit is that xsub_t keeps track of the same on its own with
another trie, but doesn't filter subscription messages at all, and
sending only a unsubscription message when the last subscriber
disconnects.
The code snippets below shows the described logic of XSUB and XPUB:
---8< xsub.cpp ---------------------------------------------------------
int zmq::xsub_t::xsend (msg_t *msg_)
{
size_t size = msg_->size ();
unsigned char *data = (unsigned char *) msg_->data ();
if (size > 0 && *data == 1) {
// Process subscribe message
// This used to filter out duplicate subscriptions,
// however this is alread done on the XPUB side and
// doing it here as well breaks ZMQ_XPUB_VERBOSE
// when there are forwarding devices involved.
subscriptions.add (data + 1, size - 1);
return dist.send_to_all (msg_);
}
else
if (size > 0 && *data == 0) {
// Process unsubscribe message
if (subscriptions.rm (data + 1, size - 1))
return dist.send_to_all (msg_);
}
else
// User message sent upstream to XPUB socket
return dist.send_to_all (msg_);
[...]
}
------------------------------------------------------------------------
---8< xpub.cpp ---------------------------------------------------------
void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{
// There are some subscriptions waiting. Let's process them.
msg_t sub;
while (pipe_->read (&sub)) {
// Apply the subscription to the trie
unsigned char *const data = (unsigned char *) sub.data ();
const size_t size = sub.size ();
if (size > 0 && (*data == 0 || *data == 1)) {
[...]
else
{
bool unique;
if (*data == 0)
unique = subscriptions.rm(data + 1, size - 1, pipe_);
else
unique = subscriptions.add(data + 1, size - 1, pipe_);
// If the subscription is not a duplicate store it so that it can be
// passed to used on next recv call. (Unsubscribe is not verbose.)
if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) {
pending_data.push_back(blob_t(data, size));
pending_flags.push_back(0);
}
}
}
else {
[...]
}
sub.close ();
}
}
------------------------------------------------------------------------
/Ricardo
More information about the zeromq-dev
mailing list