[zeromq-dev] subscriber stopped receiving messages from XPUB socket

Pieter Hintjens ph at imatix.com
Fri Mar 22 09:23:52 CET 2013


Hi Winston,

I think you really should install git and make a pull request with
your patch, which looks great. We use a pretty simple process, it's
explained in some detail in Chapter 6 of the Guide. It would be great
to see you join the contributors to 0MQ.

If you need any help at all installing or using git for this, just ask!

-Pieter

On Fri, Mar 22, 2013 at 5:05 AM, Winston Huang <weiqingh at mail.com> wrote:
> there was a typo in the code below. it should be
> pipes.swap(pipes.index(pipe_), eligible - 1);
>
> the entire snippet should be like this:
>
>
>
>     if (pipes.index (pipe_) < matching) {
>         pipes.swap (pipes.index (pipe_), matching - 1);
>         matching--;
>     }
>     if (pipes.index (pipe_) < active) {
>         pipes.swap (pipes.index (pipe_), active - 1);
>         active--;
>     }
>     if (pipes.index (pipe_) < eligible) {
>         pipes.swap(pipes.index(pipe_), eligible - 1);
>         eligible--;
>     }
>     pipes.erase (pipe_);
>
>
>
> ----- Original Message -----
>
> From: Winston Huang
>
> Sent: 03/21/13 09:13 PM
>
> To: ZeroMQ development list
>
> Subject: Re: [zeromq-dev] subscriber stopped receiving messages from XPUB
> socket
>
>
> that's a good point. I am actually not familar with the inner-working of zmq
> at all. first time trying to dig into the code to fix the problem. that's
> why I suspected my fix was not complete anyway.
>
> now I have the new version as you suggested:
>
>     if (pipes.index (pipe_) < matching) {
>         pipes.swap (pipes.index (pipe_), matching - 1);
>         matching--;
>     }
>     if (pipes.index (pipe_) < active) {
>         pipes.swap (pipes.index (pipe_), active - 1);
>         active--;
>     }
>     if (pipes.index (pipe_) < eligible) {
>         pipes.swap(pipes.index(pipe_), eligible);
>         eligible--;
>     }
>     pipes.erase (pipe_);
>
>
>
> ----- Original Message -----
>
> From: Martin Hurton
>
> Sent: 03/21/13 07:19 PM
>
> To: ZeroMQ development list
>
> Subject: Re: [zeromq-dev] subscriber stopped receiving messages from XPUB
> socket
>
>
>
> Hi Winston,
>
> Thanks for the patch. I think there are still some cases which are not
> handled though.
>
> For example, if there are 8 pipes handled by the dist: 2 are matching,
> 4 are active, 6 are eligible and the last two are congested. What
> happens when the first active pipe terminates?
>
> I would suggest to fix the terminated method so that the terminating
> pipe is moved beyond the eligible pointer the same way as in the write
> method. Then it can be safely erased. What do you think?
>
> - Martin
>
>> void zmq::dist_t::terminated (pipe_t *pipe_)
>> {
>>     //  Remove the pipe from the list; adjust number of matching, active
>> and/or
>>     //  eligible pipes accordingly.
>>     if (pipes.index (pipe_) < matching)
>>         matching--;
>>     if (pipes.index (pipe_) < active)
>>         active--;
>>     bool swapEligible = false;
>>     if (pipes.index (pipe_) < eligible) {
>>         eligible--;
>>         swapEligible = true;
>>     }
>>     if (swapEligible) {
>>         pipes.swap(pipes.index(pipe_), eligible);
>>         pipes.erase(eligible);
>>     } else {
>>         pipes.erase (pipe_);
>>     }
>> }
>>
>>
>>
>>
>>
>> ----- Original Message -----
>>
>> From: Pieter Hintjens
>>
>> Sent: 03/21/13 10:06 AM
>>
>> To: ZeroMQ development list
>>
>> Subject: Re: [zeromq-dev] subscriber stopped receiving messages from XPUB
>> socket
>>
>>
>>
>> Hi Winston,
>>
>> Great analysis of the problem! Would you like to send a pull request
>> with a patch?
>>
>> -Pieter
>>
>> On Thu, Mar 21, 2013 at 2:43 PM, Winston Huang <weiqingh at mail.com> wrote:
>>> Pieter,
>>>
>>> Thanks for your reply. I think I might have found the problem. I have a
>>> xpub
>>> socket that has about 10 subscribers. The following events happened:
>>>
>>> 1) one subscriber's hwm is reached and it's moved to the end of the
>>> pipes.
>>> (in zmq::dist_t::write).
>>>
>>> 2) another subscriber was terminated (zmq::xpub_t::xterminated), causing
>>> the
>>> pipe to be removed from the dist. however in zmq::dist_t::terminated, the
>>> terminated pipe was removed by moving the last pipe to the to-be-removed
>>> pipe's spot. therefore the deactivated pipe in step 1 is moved in the
>>> front
>>> of the pipes. in the meantime, the value of eligible and active are
>>> decremented. therefore the last eligible pipe (which was in front of the
>>> de-activated pipe before this event) now becomes in-eligible. and it will
>>> not receive any messages after this.
>>>
>>> let me know if this makes any sense. it's hard for me to write a
>>> standalone
>>> test case like this. I hope my explanation is clear. And if you can
>>> suggest
>>> any fix, let me know.  I can see one fix is to swap with the last
>>> eligible
>>> pipe and then delete that position.
>>>
>>> Thanks,
>>> Winston
>>>
>>>
>>>
>>> ----- Original Message -----
>>>
>>> From: Pieter Hintjens
>>>
>>> Sent: 03/15/13 09:15 AM
>>>
>>> To: ZeroMQ development list
>>>
>>> Subject: Re: [zeromq-dev] subscriber stopped receiving messages from XPUB
>>> socket
>>>
>>>
>>>
>>> It sounds like a problem in the subscription forwarding, yet it's not
>>> clear how a subscriber could be affected by the publisher restarting,
>>> with the proxy in between.
>>>
>>> Do you need the proxy at all? First thing would be to connect
>>> subscribers directly to the publisher. If the problem then still
>>> happens we can try to make a reproducible test case.
>>>
>>> -Pieter
>>>
>>>
>>>
>>> On Fri, Mar 15, 2013 at 4:59 AM, Winston Huang <weiqingh at mail.com> wrote:
>>>> hi there,
>>>>
>>>> I have multiple (5-10) subscribers subscribing to the same topic
>>>> published
>>>> by one publisher. They are connected via a XSUB-XPUB proxy. All the
>>>> subscribers are always up and the publisher may come and go at times. I
>>>> have
>>>> noticed that at times, after the publisher is restarted, one of the
>>>> subscribers might stop receiving any messages at all. It's not a
>>>> slow-joiner
>>>> kind of issue because the publisher continues to publish message every
>>>> second and that subscriber may not get any messages at all forever,
>>>> whereas
>>>> other subscribers are getting messages at the same time. I also verified
>>>> that the subscriber is waiting for messages (it's calling the receive
>>>> function.) and if I restart the subscriber, it will get messages again.
>>>>
>>>> Could someone enlighten me what I may be doing wrong? Is there any thing
>>>> I
>>>> should be looking into?
>>>>
>>>> Thanks in advance.
>>>> Winston
>>>> _______________________________________________
>>>> zeromq-dev mailing list
>>>> zeromq-dev at lists.zeromq.org
>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>>
>>> _______________________________________________
>>> zeromq-dev mailing list
>>> zeromq-dev at lists.zeromq.org
>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>
>>>
>>>
>>>
>>> _______________________________________________
>>> zeromq-dev mailing list
>>> zeromq-dev at lists.zeromq.org
>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
>>
>>
>>
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
>
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>



More information about the zeromq-dev mailing list