[zeromq-dev] [PATCH] Publisher side filtering

Gerard Toonstra gtoonstra at gmail.com
Wed Feb 2 10:46:47 CET 2011


Hi Martin,

Success!

With this change, I'm no longer to reproduce the issue in about 40-50
restarts. So this looks like it has solved it.

The following was verified:

- The subscription is now also forwarded upstream once, even when there are
6 subscribers connected. So I don't see strange behavior anymore where every
subscriber restarting would trigger the resend of the subscription every
time.
- On restart of the zmq_forwarder and the reconnect, there's also only one
sub forward happening.
- When publisher restarts, it only receives one sub forward, so that works.
- Restarting subs, pubs and forwarders now don't seem to give any troubles
anymore in terms of forwarding subscriptions.

I did modify the send.cpp to use known data, instead of whatever was in
memory:

-=-=-=-=-=-=-=-=-=-=-=-=

const char *TOPIC = "Special.Topic";

        strncpy( (char *)msg.data(), TOPIC, strlen( TOPIC ) );

-=-=-=-=-=-=-=-=-=-=-=-=

To test the msgs actually arrived at the subs. For continued testing, I
modified recv.cpp to explicitly unsubscribe

-=-=-=-=-=-=-=-=-=

    int ctr = 0;

    while (ctr < 10) {
        zmq::message_t msg;
        s.recv (&msg);
        printf ("got message\n");
        ctr++;
    }

    s.setsockopt( ZMQ_UNSUBSCRIBE, TOPIC, strlen( TOPIC ));

    return 0;

-=-=-=-=-=-=-=-=-=


Looks like the things left to do are..... :
- subscription matching at the pub side.
- subscription ref counting @ XSUB and subscriptions.rm functionality
testing. See patch.
- removal of printf's and some review?


Some testing with subscription removals demonstrated that unsub messages are
sent every time. The subscriptions.rm method only returns
true or false, so it cannot detect when the last refcnt was just decreased.
The patch is a proposal on an approach to do that.

The other thing is that when subscribers get killed, the subscriptions trie
doesn't get updated. So subscribers connecting and disconnecting
eventually lead to a state where the refcnt is higher.  Is there a smart
approach for this on the xsub side in the forwarder?
Because it sounds like it requires keeping track which subscriptions were
received by which pipe, so that unsub messages can be generated
on behalf of the sub disconnect?

Rgds,

G>

On Tue, Feb 1, 2011 at 8:56 PM, Martin Sustrik <sustrik at 250bpm.com> wrote:

> Hi Gerard,
>
>
>  At some point after a number of restarts of the zmq_forwarder, there are
>> messages getting stuck in the TCP RecvQ buffer (see netstat output).
>>
>
> Hopefully, I've found and fixed the problem.
>
> The problem was that on disconnection, the pipe for subscriptions
> termination was initiated. When new connection was created and the
> termination was not yet over, the new subscription was accidentally pushed
> to the pipe being closed at the moment. I've blocked any input/output from
> the socket till the pipe is re-created.
>
> Please, do check whether it works now.
>
> Martin
>



-- 
Gerard Toonstra
-----------------------
http://www.radialmind.org
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20110202/6f07e6c9/attachment.htm>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: 0001-Allow-xsub-to-detect-when-last-refcnt-was-decreased-.patch
Type: text/x-patch
Size: 1964 bytes
Desc: not available
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20110202/6f07e6c9/attachment.bin>


More information about the zeromq-dev mailing list