[zeromq-dev] using majordomo broker with asynchronous clients

Pieter Hintjens ph at imatix.com
Fri Dec 12 09:06:10 CET 2014


Vishal,

Please see the freelance model; a broker design won't scale to 1M
messages a second without significant extra work.

-Pieter

On Fri, Dec 12, 2014 at 7:57 AM, Vishal Ahuja <vahuja4 at gmail.com> wrote:
> Andre, thank you for your help! I changed the client to send 10k requests at
> once, and then send two requests for every reply that it receives. I chose
> 10k because that allowed me to run up to ten clients (each sending 100k
> messages) in parallel without any packet loss. Here is the client code:
>
> #include "../include/mdp.h"
>
> #include <time.h>
>
> int main (int argc, char *argv [])
>
> {
>
>     int verbose = (argc > 1 && streq (argv [1], "-v"));
>
>     mdp_client_t *session = mdp_client_new (argv[1], verbose);
>
>     //mdp_client_t *session = mdp_client_new("tcp://localhost:5555",
> verbose);
>
>     int count1, count2;
>
>     struct timeval start,end;
>
>     gettimeofday(&start, NULL);
>
>     for (count1 = 0; count1 < 10000; count1++) {
>
>         zmsg_t *request = zmsg_new ();
>
>         zmsg_pushstr (request, "Hello world");
>
>         mdp_client_send (session, "echo", &request);
>
>     }
>
>     printf("sent all\n");
>
>     for (count1 = 0; count1 < 45000; count1++) {
>
>         zmsg_t *reply = mdp_client_recv (session,NULL,NULL);
>
>         if (reply)
>
>         {
>
>             zmsg_destroy (&reply);
>
>             zmsg_t *request = zmsg_new ();
>
>             zmsg_pushstr (request, "Hello world");
>
>             mdp_client_send (session, "echo", &request);
>
>
>             request = zmsg_new ();
>
>             zmsg_pushstr (request, "Hello world");
>
>             mdp_client_send (session, "echo", &request);
>
>         }
>
>          else
>
>             break;              //  Interrupted by Ctrl-C
>
>     }
>
>    /* receiving the remaining 55k replies */
>
>     for(count1 = 45000; count1 < 100000; count1++)
>
>     {
>
>         zmsg_t *reply = mdp_client_recv (session,NULL,NULL);
>
>         if (reply)
>
>         {
>
>             zmsg_destroy (&reply);
>
>         }
>
>          else
>
>             break;
>
>
>     }
>
>     gettimeofday(&end, NULL);
>
>     long elapsed = (end.tv_sec - start.tv_sec) +((end.tv_usec -
> start.tv_usec)/1000000);
>
>     printf("time = %ld\n", elapsed);
>
>     printf ("%d replies received\n", count1);
>
>     mdp_client_destroy (&session);
>
>     return 0;
>
> }
>
>
> I ran the broker, worker, and the clients within the same machine. Here is
> the recorded time:
>
> # of clients in parallel
>
> (each client sends 100k )                   Time elapsed (seconds)
>
> 1                                                                     4
>
> 2                                                                     9
>
> 3                                                                    12
>
> 4                                                                    16
>
> 5                                                                    21
>
> 10                                                                  43
>
>
> So for every 100k requests, the broker is taking about 4 seconds. Is this
> the expected behavior? Am not sure how to achieve million messages per
> second. Please advise.
>
> Sincerely,
>
> Vishal
>
>
>
>
> On Thu, Dec 11, 2014 at 10:22 PM, André Caron <andre.l.caron at gmail.com>
> wrote:
>>
>> You are observing lost replies because the broker is sending replies
>> faster than the client is consuming them: each time the broker sends a
>> reply, it is stored in an internal queue for that client.  When the broker
>> sends more replies than the client can consume, this internal queue fills up
>> to tolerate shorts bursts (the client's RCVHWM and the broker's SNDHWM limit
>> indirectly control this tolerance).  However, if the broker's send rate is
>> constantly faster than the client's recv rate, you need infinite memory to
>> keep avoid losing replies.
>>
>> In this scenario, the clients are operating in request-reply, so if this
>> happens, they are sending requests faster than they can consume the replies.
>> Basically, your client is sending requests until both the client's reply
>> queue and the broker's reply queue (for that client) are saturated and then,
>> instead of reading replies, it continues sending requests.  This doesn't
>> make any sense in any real world scenario.  The "real" problem here is that
>> the test your are performing is not realistic.
>>
>> Note that it would be different if it was pub-sub instead of req-rep
>> because the client would not be in control of the broker's "reply" rate.
>>
>> To fix this to match a real-world scenario, you should modify your client
>> program such that it has at most N outstanding requests, where N is
>> guaranteed to fit in the cilent's reply queue and broker's reply queue:
>> - initially send N requests;
>> - each time you get a reply, send a new request.
>>
>> If you do this and the client's RCVHWM and broker's SNDHWM are
>> sufficiently high to hold a total of N replies, then you will go down to 0
>> lost replies -- unless you hit other problems like disconnections, of
>> course.
>>
>> Instead of doing that, you could technically increase the server's SNDHWM
>> to hold 100,000 replies per client (assuming the broker has enough memory),
>> but the problem same problem would creep up again if you increase the number
>> of outstanding requests to more than 100,000.
>>
>> Cheers,
>>
>> André
>>
>> On Thu, Dec 11, 2014 at 8:05 AM, Vishal Ahuja <vahuja4 at gmail.com> wrote:
>>>
>>> Sure, but when I have 3 parallel clients (300k messages), I am observing
>>> packet loss.
>>>
>>> Sincerely,
>>>
>>> Vishal
>>>
>>> On Thu, Dec 11, 2014 at 5:43 PM, André Caron <andre.l.caron at gmail.com>
>>> wrote:
>>>>
>>>> Throughput is bound by multiple factors.  In your case, my guess is that
>>>> your bottleneck is the throughput of a single TCP socket on your system.  I
>>>> doubt that you can reach millions of messages per second with only 3
>>>> clients.  If you add more clients, you should see the server's throughput
>>>> increase well over 30k.
>>>>
>>>> Cheers,
>>>>
>>>> André
>>>>
>>>> On Thu, Dec 11, 2014 at 4:40 AM, Vishal Ahuja <vahuja4 at gmail.com> wrote:
>>>>>
>>>>> I removed the printfs, and increased the TCP send and receive buffers
>>>>> to the max possible. When I have two clients (running in parallel) sending
>>>>> 100k messages, there is no packet loss. But when I increase the number of
>>>>> clients to three, then each client receives around 93k packets. Also, the
>>>>> majordomo specification (http://rfc.zeromq.org/spec:7) says that the broker
>>>>> should be able to switch millions of messages per second, whereas I seeing a
>>>>> throughput of around 30k messages per second. Can someone tell how to
>>>>> improve the throughput?
>>>>>
>>>>> Sincerely,
>>>>>
>>>>> Vishal
>>>>>
>>>>> On Thu, Dec 11, 2014 at 10:37 AM, Andre Caron <andre.l.caron at gmail.com>
>>>>> wrote:
>>>>>>
>>>>>> The broker uses a router socket, which silently drops outbound
>>>>>> messages when the sndhwm is reached.  If your client is reading more slowly
>>>>>> than the broker is writing, you'll lose messages.
>>>>>>
>>>>>> In your case, the slow part is writing to the terminal.  Try removing
>>>>>> the printf in the recv loop and you should lose fewer messages.
>>>>>>
>>>>>> André
>>>>>>
>>>>>> > On Dec 10, 2014, at 9:40 PM, Vishal Ahuja <vahuja4 at gmail.com> wrote:
>>>>>> >
>>>>>> > While reading the zeromq guide, I came across client code
>>>>>> > (pertaining to the majordomo pattern) which sends 100k requests in a loop,
>>>>>> > and then receives the reply in a second loop.
>>>>>> >
>>>>>> >     #include "../include/mdp.h"
>>>>>> >     #include <time.h>
>>>>>> >
>>>>>> >
>>>>>> >     int main (int argc, char *argv [])
>>>>>> >     {
>>>>>> >         int verbose = (argc > 1 && streq (argv [1], "-v"));
>>>>>> >         mdp_client_t *session = mdp_client_new
>>>>>> > ("tcp://localhost:5555", verbose);
>>>>>> >         int count;
>>>>>> >         for (count = 0; count < 100000; count++) {
>>>>>> >             zmsg_t *request = zmsg_new ();
>>>>>> >             zmsg_pushstr (request, "Hello world");
>>>>>> >             mdp_client_send (session, "echo", &request);
>>>>>> >         }
>>>>>> >         printf("sent all\n");
>>>>>> >
>>>>>> >         for (count = 0; count < 100000; count++) {
>>>>>> >             zmsg_t *reply = mdp_client_recv (session,NULL,NULL);
>>>>>> >             if (reply)
>>>>>> >                 zmsg_destroy (&reply);
>>>>>> >             else
>>>>>> >                 break;              //  Interrupted by Ctrl-C
>>>>>> >             printf("reply received:%d\n", count);
>>>>>> >         }
>>>>>> >         printf ("%d replies received\n", count);
>>>>>> >         mdp_client_destroy (&session);
>>>>>> >         return 0;
>>>>>> >     }
>>>>>> >
>>>>>> > I have added a counter to count the number of replies that the
>>>>>> > worker (test_worker.c) sends to the broker, and another counter in
>>>>>> > mdp_broker.c to count the number of replies the broker sends to a client.
>>>>>> > Both of these count up to 100k, but the client is receiving only around 37k
>>>>>> > replies.
>>>>>> >
>>>>>> > If the number of client requests is set to around 40k, then it
>>>>>> > receives all the replies. Can someone please tell me why packets are lost
>>>>>> > when the client sends more than 40k asynchronous requests?
>>>>>> >
>>>>>> > I tried setting the HWM to 100k for the broker socket, but the
>>>>>> > problem persists:
>>>>>> >
>>>>>> >     static broker_t *
>>>>>> >     s_broker_new (int verbose)
>>>>>> >     {
>>>>>> >         broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));
>>>>>> >         int64_t hwm = 100000;
>>>>>> >         //  Initialize broker state
>>>>>> >         self->ctx = zctx_new ();
>>>>>> >         self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
>>>>>> >         zmq_setsockopt(self->socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));
>>>>>> >
>>>>>> >         zmq_setsockopt(self->socket, ZMQ_RCVHWM, &hwm, sizeof(hwm));
>>>>>> >         self->verbose = verbose;
>>>>>> >         self->services = zhash_new ();
>>>>>> >         self->workers = zhash_new ();
>>>>>> >         self->waiting = zlist_new ();
>>>>>> >         self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
>>>>>> >         return self;
>>>>>> > }
>>>>>> >
>>>>>> > Vishal
>>>>>> > _______________________________________________
>>>>>> > zeromq-dev mailing list
>>>>>> > zeromq-dev at lists.zeromq.org
>>>>>> > http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>>>> _______________________________________________
>>>>>> zeromq-dev mailing list
>>>>>> zeromq-dev at lists.zeromq.org
>>>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>>>
>>>>>
>>>>>
>>>>> _______________________________________________
>>>>> zeromq-dev mailing list
>>>>> zeromq-dev at lists.zeromq.org
>>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>>>
>>>>
>>>>
>>>> _______________________________________________
>>>> zeromq-dev mailing list
>>>> zeromq-dev at lists.zeromq.org
>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>>
>>>
>>>
>>> _______________________________________________
>>> zeromq-dev mailing list
>>> zeromq-dev at lists.zeromq.org
>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>
>>
>>
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>



More information about the zeromq-dev mailing list