[zeromq-dev] using majordomo broker with asynchronous clients
Andre Caron
andre.l.caron at gmail.com
Thu Dec 11 06:07:20 CET 2014
The broker uses a router socket, which silently drops outbound messages when the sndhwm is reached. If your client is reading more slowly than the broker is writing, you'll lose messages.
In your case, the slow part is writing to the terminal. Try removing the printf in the recv loop and you should lose fewer messages.
André
> On Dec 10, 2014, at 9:40 PM, Vishal Ahuja <vahuja4 at gmail.com> wrote:
>
> While reading the zeromq guide, I came across client code (pertaining to the majordomo pattern) which sends 100k requests in a loop, and then receives the reply in a second loop.
>
> #include "../include/mdp.h"
> #include <time.h>
>
>
> int main (int argc, char *argv [])
> {
> int verbose = (argc > 1 && streq (argv [1], "-v"));
> mdp_client_t *session = mdp_client_new ("tcp://localhost:5555", verbose);
> int count;
> for (count = 0; count < 100000; count++) {
> zmsg_t *request = zmsg_new ();
> zmsg_pushstr (request, "Hello world");
> mdp_client_send (session, "echo", &request);
> }
> printf("sent all\n");
>
> for (count = 0; count < 100000; count++) {
> zmsg_t *reply = mdp_client_recv (session,NULL,NULL);
> if (reply)
> zmsg_destroy (&reply);
> else
> break; // Interrupted by Ctrl-C
> printf("reply received:%d\n", count);
> }
> printf ("%d replies received\n", count);
> mdp_client_destroy (&session);
> return 0;
> }
>
> I have added a counter to count the number of replies that the worker (test_worker.c) sends to the broker, and another counter in mdp_broker.c to count the number of replies the broker sends to a client. Both of these count up to 100k, but the client is receiving only around 37k replies.
>
> If the number of client requests is set to around 40k, then it receives all the replies. Can someone please tell me why packets are lost when the client sends more than 40k asynchronous requests?
>
> I tried setting the HWM to 100k for the broker socket, but the problem persists:
>
> static broker_t *
> s_broker_new (int verbose)
> {
> broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));
> int64_t hwm = 100000;
> // Initialize broker state
> self->ctx = zctx_new ();
> self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
> zmq_setsockopt(self->socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));
>
> zmq_setsockopt(self->socket, ZMQ_RCVHWM, &hwm, sizeof(hwm));
> self->verbose = verbose;
> self->services = zhash_new ();
> self->workers = zhash_new ();
> self->waiting = zlist_new ();
> self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
> return self;
> }
>
> Vishal
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
More information about the zeromq-dev
mailing list