[zeromq-dev] Slow joiner syndrome solution for XSUB/XPUB broker based system

Tomer Eliyahu tomereliyahu1 at gmail.com
Tue May 29 12:07:13 CEST 2018

You are right - my method will cause high CPU usage for the period of the
sync, which is certainly a disadvantage.
The advantage though is keeping both the broker and the
publishers/subscribers simple - The sync is done "under the hood" by the
infrastructure which encapsulates zmq from the users, so that both
subscribers and publishers are kept simple and not aware of the broker in
the middle. The broker is also kept as simple as the zmq example for
XPUB/XSUB proxy.

We found a bug in our sync implementation which was the reason I opened
this thread - now that it is solved I can commit that this solution works
from both sides - subscribers and publishers, details below for those who
are interested.


In our framework, "controllers" and "agents" all use a "BrokerInterface"
which is a zeromq wrapper with PUB socket and a SUB socket.
A controller is an entity on the bus which sends commands to agents, and
expects replies.
An agent is an entity on the bus which receives commands from controllers
and sends replies.
>From zeromq perspective, both are publishers and subscribers, therefore
they use BrokerInterface class for IPC.

BrokerInterface class
Init() - connects the SUB and PUB sockets to the bus's (broker) subscribers
and publishers endpoints
Subscribe(topic) - subscribes to <topic> via the SUB socket
Send(msg) - send message via the PUB socket (message contains a topic and
Recv() - Receive a message via the SUB socket
Sync() - Synchronize for Send() and Subscribe() - subscribe("Hello"), then
send "Hello" messages until the first one is received, then

Bug description
In one of our unit tests, we forked a broker, and a test process using the
The test subscribed to topics A,B,C, called Sync() method (which subscribed
to Hello, sent Hello until received, then unsubscribed Hello), and sent
messages to topics A,B,C which were received as expected.
Then it subscribed to topic D, called Sync() again, then sent messages to
topic D which were lost.
We found out that if we add a delay before sending messages to topic D,
everything works.

Root cause
The first Sync() added a subscription to "Hello" in the broker, sent
messages, and then unsubscribed from "Hello". We know that the subscribe
was completed because we received an "Hello" message.
What we don't know for certain is that the unsubscribe completed - and in
fact it wasn't...
Recall that the Sync() after subscribe() is done to ensure that previous
subscriptions are registered in the broker.
What happened was that the 2nd Sync() used the same "Hello" topic to which
we already subscribed-unsubscribed from in the 1st Sync(), but the
unsubscribe() part hasn't happened yet, causing the "Hello" messages from
the 2nd Sync() to be received, marking this synchronization done, and
failing the test since the subscription to topic "D" hadn't occurred yet.

The fix is to make sure that the Sync() topic is unique for the process
which is using it.
The sync topic we now use contains the process name and a unique identifier
which is changed in every Sync() call.

On Tue, May 29, 2018 at 4:13 AM, Chris Billington <
chrisjbillington at gmail.com> wrote:

> I think that mostly makes sense. I'm not sure if zmq_proxy will work,
> because it will probably attempt to forward subscription requests read from
> the XPUB to the PULL socket. However I think it will work if you go back to
> the original spec of using a PUB socket for the senders and a XSUB in the
> broker.
> The only downside of that approach compared to mine I think is that the
> 'sending repeatedly' of SYNC messages will require some small sleep in
> between sends to not peg the CPU, leading to a likely slower than necessary
> time to establish that the subscription is complete, whereas my method has
> no sleeping and so will complete more deterministically and in a shorter
> time. But the simplicity of your approach might be better all things
> considered.
> -Chris
> On Mon, May 28, 2018 at 9:46 PM, Tomer Eliyahu <tomereliyahu1 at gmail.com>
> wrote:
>> Thanks for sharing Chris, this is interesting.
>> From the subscribers perspective, I think that using the same sync
>> mechanism I described for the publishers can solve the subscribers side so
>> that they know the subscription is complete in the broker while keeping the
>> broker "dumb" -
>> Subscribe first to the real topics, then perform a sync - subscribe to
>> "SYNC", then send messages via the PUB interface (or PUSH) with "SYNC"
>> topic until the first message is received.
>> Once this happens, you know for certain that the previous subscriptions
>> were also received by the broker.
>> Then the broker can go back to using zmq_proxy() instead of handling
>> subscriptions.. What do you think?
>> On Mon, May 28, 2018 at 1:21 PM, Chris Billington <
>> chrisjbillington at gmail.com> wrote:
>>> I've handled this problem by avoiding using a PUB socket for the senders
>>> of messages:
>>> a) senders of messages send them on a PUSH socket and the broker
>>> forwards from a PULL to a XPUB. This means that there is no slow joiner
>>> problem with the senders starting up (PUSH won't drop messages), but has
>>> the downside that the messages are *always* sent to the broker even if
>>> there are no subscribers. They will instead be dropped by the XPUB if there
>>> are no subscribers.
>>> b) Subscribers request and wait for subscription confirmation messages
>>> from the broker when they subscribe to a topic so calling code can be sure
>>> they are subscribed before starting the senders.
>>> See here for my Python project that implements this (the EventBroker and
>>> Event classes):
>>> https://bitbucket.org/cbillington/zprocess/src/default/zproc
>>> ess/process_tree.py?at=default&fileviewer=file-view-default#
>>> process_tree.py-102
>>> On Mon, May 28, 2018 at 7:40 PM, Tomer Eliyahu <tomereliyahu1 at gmail.com>
>>> wrote:
>>>> Hi Gyorgy,
>>>> Thank you - but assuming the subscriber connect and subscribe happen
>>>> long before the publisher starts, is there still a risk for the slow joiner
>>>> problem?
>>>> Assume the following flow:
>>>> broker:
>>>>    zmq_bind(frontend, "ipc:///tmp/publishers");
>>>>    zmq_bind(backend, "ipc:///tmp/subscribers");
>>>>    zmq_proxy(frontend, backend, NULL);
>>>> <wait 2 seconds and start subscriber process>
>>>> subscriber:
>>>>    zmq_connect(sub_socket, "ipc:///tmp/subscribers");
>>>>    <subscribe to "TEST" topic>
>>>>    <receive message from sub_socket - blocking>
>>>> <wait 2 seconds and start publisher process>
>>>> publisher:
>>>>    zmq_connect(pub_socket, "ipc:///tmp/publishers");
>>>>    zmq_connect(sub_socket, "ipc:///tmp/subscribers");
>>>>    <subscribe to "SYNC" topic>
>>>>    <sync - send DUMMY messages until received>
>>>>    <unsubscribe to "SYNC" topic>
>>>>    <send message with "TEST" topic through pub_socket>
>>>>    <terminate>
>>>> Bottom line - is there some sort of synchronization done under the hood
>>>> by ZMQ when the publisher first sends a message with the topic on which the
>>>> subscriber subscribed? or is this all handled between the broker and the
>>>> subscriber?
>>>> Thanks,
>>>> Tomer
>>>> On Mon, May 28, 2018 at 12:23 PM, Gyorgy Szekely <hoditohod at gmail.com>
>>>> wrote:
>>>>> Hi Tomer
>>>>> As far as I know the message from the publisher will reach the broker.
>>>>> According to the docs, the PUB socket drops messages in mute-state (HWM
>>>>> reached), and it's not the case here. The message will be sent as soon as
>>>>> the connection is established, and the socket termination blocks until the
>>>>> send is complete. Unless you set linger to zero.
>>>>> The slow joiner problem means that subscriptions may not be active by
>>>>> the time the publisher send the message. Either because the subscriber is
>>>>> not yet running, or because the subscribe calls themselves are asynchronous
>>>>> (by the time setsockopt(SUNSCRIBE) returns the broker is not yet aware of
>>>>> this). The zmq guide shows mitigations for this problem in the Advanced
>>>>> Publish Subscribe chapter.
>>>>> Regards,
>>>>>   Gyorgy
>>>>> On Mon, May 28, 2018 at 11:06 AM, Tomer Eliyahu <
>>>>> tomereliyahu1 at gmail.com> wrote:
>>>>>> Hi,
>>>>>> I know this topic was probably discussed before, I couldn't find a
>>>>>> proper solution, so I implemented something a bit different. I'm not sure
>>>>>> if this solves all pitfalls, i'll be greatfull for comments.
>>>>>> We have a system with a XPUB-XSUB broker running as a separate
>>>>>> process in the system (binds frontend to ipc:///tmp/publishers  and
>>>>>> backend to ipc:///tmp/subscribers).
>>>>>> Clients of the broker have both SUB socket for receiving messages,
>>>>>> and a PUB socket for sending messages. When a client boots, it connects
>>>>>> both its PUB and SUB sockets to the broker's endpoints, and subscribes to
>>>>>> the topic of interest.
>>>>>> For the sake of simplicity, lets assume there we have only the
>>>>>> broker, a publisher and a subscriber processes in the system:
>>>>>> We make sure that the broker process starts first, then a subscriber
>>>>>> which connects and subscribes to the topic, and only then start the
>>>>>> publisher. The publisher then sends a single message and terminates.
>>>>>> Obviously, the message is lost due to the slow joiner syndrome - I
>>>>>> assume the reason for that is because the publisher process zmq_connect()
>>>>>> call is asynchronous, therefore the connect is not actually complete by the
>>>>>> time we send the message.
>>>>>> I thought of a possible solution for this - basically we want to
>>>>>> synchronize the connect operation done by the publisher. Having both PUB
>>>>>> and SUB socket, we can simply send a dummy message from PUB to SUB on the
>>>>>> same publisher process until the first message is receieved, and then it is
>>>>>> guarantied that the connect is done and consecutive messages (now to "real"
>>>>>> topics with actual subscribers) will not be lost.
>>>>>> The only part i'm not sure about is the subscriber side - assuming
>>>>>> the subscriber boots, connects and subscribes _before_ we start the
>>>>>> publisher - is it guarantied that no message will be lost (assuming
>>>>>> ofcourse the subscriber doesn't crash / unsubscribe / etc.) ?
>>>>>> Thanks,
>>>>>> Tomer
>>>>>> _______________________________________________
>>>>>> zeromq-dev mailing list
>>>>>> zeromq-dev at lists.zeromq.org
>>>>>> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>>> _______________________________________________
>>>>> zeromq-dev mailing list
>>>>> zeromq-dev at lists.zeromq.org
>>>>> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>> _______________________________________________
>>>> zeromq-dev mailing list
>>>> zeromq-dev at lists.zeromq.org
>>>> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>> _______________________________________________
>>> zeromq-dev mailing list
>>> zeromq-dev at lists.zeromq.org
>>> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20180529/39c4ca93/attachment.htm>

More information about the zeromq-dev mailing list