[zeromq-dev] PubSub server strange behavior when scaling

Blair Anson blair.anson at gmail.com
Mon Aug 31 18:01:58 CEST 2015


Thanks Pieter,
I found out this morning that it was actually a linux kernel limit
(vm.max_map_count in /etc/sysctrl.conf) that controls the maximum number of
sockets an individual process can have. Note that this is separate from the
open file descriptor limit I've seen people mention on numerous occasions.
Anyhow it's scaling up to 75k on an m3-medium which is good. However I am
seeing some new odd behavior. When the server publishes a message, about
every 3min on my test servers, I can see the number of open sockets & files
drop by a few thousand and then steadily climb back up again. It appears
that the act of publishing a message is causing some sort of resource
contention and the kernel copes by temporarily dropping some connections. I
haven't expanded the test to see what each of the 70k clients is
experiencing nor if they successfully receive the published message, but I
suspect the ZMQ socket is performing a reconnect whenever the server
disconnects.
You're comment about resource contention might be applicable here, and I'll
take your advice and see if I can remove the key-store.

Cheers,
   Blair


On 31 August 2015 at 12:34, Pieter Hintjens <ph at imatix.com> wrote:

> You could retest this with a simpler example. The key-value store does
> a fair amount of work per message and it's possible your set-up
> provokes scheduler thrashing, i.e. the OS spends more time swapping
> threads in and out than it leaves for the threads...
>
> On Fri, Aug 28, 2015 at 5:31 PM, Blair Anson <blair.anson at gmail.com>
> wrote:
> > Hi, I have been load testing a pubsub server (essentially CloneServer3
> from
> > the examples) and have come across some curious behavior that is
> stopping me
> > from adding additional clients once I get to around 400. I was wondering
> if
> > anyone has an idea as to what is happening when I reach 400 clients, and
> how
> > I can resolve the issue?
> >
> > Each client performs:
> > 1. a snapshot request
> > 2. receives snap shot
> > 3. loops over a subscriber & publisher calls until the process is killed
> >
> > (code for the client and server are at the bottom of the email)
> >
> > zeromq version is 4.0.4 (I have some project dependencies so I'm unable
> to
> > update to the latest version at the moment)
> >
> > I'm running the PubSub server on an m3-medium in Amazon.
> >
> > Increasing the number of clients in steps of 100 up to 300 results in the
> > following metrics
> >
> > CPU Avg%                 14
> > CPU Peak%                23
> > lsof | grep -c server   2784
> >
> > So overall the box isn't working very hard, however if I bump up the
> clients
> > to 400 then I get
> >
> > CPU Avg%                  98
> > CPU Peak%                100
> > lsof | grep -c server   3111
> >
> > the box suddenly starts to thrash to CPU. Delving into top we can see the
> > following...
> >
> > %Cpu(s): 15.6 us, 43.1 sy,  0.0 ni,  0.0 id,  0.0 wa,  0.0 hi,  0.5 si,
> 40.7
> > st
> >
> > it appears that most of the load is attributed to "sy" and "st" which
> are..
> >
> > us: user cpu time (or) % CPU time spent in user space
> > sy: system cpu time (or) % CPU time spent in kernel space
> > ni: user nice cpu time (or) % CPU time spent on low priority processes
> > id: idle cpu time (or) % CPU time spent idle
> > wa: io wait cpu time (or) % CPU time spent in wait (on disk)
> > hi: hardware irq (or) % CPU time spent servicing/handling hardware
> > interrupts
> > si: software irq (or) % CPU time spent servicing/handling software
> > interrupts
> > st: steal time - - % CPU time in involuntary wait by virtual cpu while
> > hypervisor is servicing another processor (or) % CPU time stolen from a
> > virtual machine
> >
> > I don't understand how increasing the load of a zmq user process results
> in
> > the kernel & hypervisor consuming all the available CPU. I would have
> > expected the User CPU Time to increase as that is the type of cpu usage
> the
> > zmq pubsub server consumes.
> > Does anyone have any ideas as to why I'm hitting a limit?
> >
> >
> > ----------- Code of the Client
> >
> -----------------------------------------------------------------------------------------------------
> >
> > #include "kvsimple.c"
> >
> > //  This client is identical to clonecli3 except for where we
> > //  handle subtrees.
> > #define SUBTREE
> >
> "/a14d5afa-edec-4545-82a4-d6db4a5441e0:eda2f56f-4afe-4955-95e0-dc13a32c7663/"
> >
> > #define LOCATION
> >
> "{\"_latlon\":[{\"longitude\":-0.074397,\"latitude\":51.5452491,\"iM\":1},{\"longitude\":-0.074297,\"latitude\":51.5452298,\"iM\":1},{\"longitude\":-0.0742712,\"latitude\":51.5452147,\"iM\":1}],\"_findmUserId\":\"eda2f56f-4afe-4955-95e0-dc13a32c7663\",\"_bearing\":0.0,\"maxLocationPoints\":10}"
> >
> > #define SUBTREERESULTS "/results/"
> >
> > int main (void)
> > {
> >     //  Prepare our context and subscriber
> >     zctx_t *ctx = zctx_new ();
> >     void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
> >     zsocket_connect (snapshot,
> > "tcp://ec2-xx-xx-xx-xx.compute.amazonaws.com:5556");
> >     void *subscriber = zsocket_new (ctx, ZMQ_SUB);
> >
> >     zsocket_set_subscribe (subscriber, SUBTREE);
> >
> >     zsocket_connect (subscriber,
> > "tcp://ec2-xx-xx-xx-xx.compute.amazonaws.com:5557");
> >
> >
> >     void *publisher = zsocket_new (ctx, ZMQ_PUSH);
> >     zsocket_connect (publisher,
> > "tcp://ec2-xx-xx-xx-xx.compute.amazonaws.com:5558");
> >
> >     zhash_t *kvmap = zhash_new ();
> >     srandom ((unsigned) time (NULL));
> >
> >
> >     //  .until
> >     //  We first request a state snapshot:
> >     int64_t sequence = 0;
> >     zstr_sendm (snapshot, "ICANHAZ?");
> >     zstr_send  (snapshot, SUBTREE);
> >     //  .skip
> >     while (true) {
> >         kvmsg_t *kvmsg = kvmsg_recv (snapshot);
> >         if (!kvmsg)
> >             break;          //  Interrupted
> >         if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
> >             sequence = kvmsg_sequence (kvmsg);
> >             printf ("I: received snapshot=%d\n", (int) sequence);
> >             kvmsg_dump(kvmsg);
> >             kvmsg_destroy (&kvmsg);
> >             break;          //  Done
> >         }
> >         kvmsg_store (&kvmsg, kvmap);
> >     }
> >     int64_t alarm = zclock_time () + 1000;
> >     int64_t startTime = zclock_time ();
> >
> >     while (!zctx_interrupted) {
> >         zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
> >         int tickless = (int) ((alarm - zclock_time ()));
> >         if (tickless < 0)
> >             tickless = 0;
> >         int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
> >         if (rc == -1)
> >             break;              //  Context has been shut down
> >
> >         if (items [0].revents & ZMQ_POLLIN) {
> >             kvmsg_t *kvmsg = kvmsg_recv (subscriber);
> >             if (!kvmsg)
> >                 break;          //  Interrupted
> >
> >             //  Discard out-of-sequence kvmsgs, incl. heartbeats
> >             if (kvmsg_sequence (kvmsg) > sequence) {
> >                 kvmsg_dump(kvmsg);
> >                 sequence = kvmsg_sequence (kvmsg);
> >                 kvmsg_store (&kvmsg, kvmap);
> >                 printf ("I: received update=%d\n", (int) sequence);
> >                 int64_t elapsedtime = zclock_time ()-startTime;
> >                 printf (" Time elapsed %" PRId64"\n", elapsedtime);
> >
> >             }
> >             else
> >                 kvmsg_destroy (&kvmsg);
> >         }
> >
> >
> >
> >         //  .until
> >         //  If we timed out, generate a random kvmsg
> >         if (zclock_time () >= alarm) {
> >             kvmsg_t *kvmsg = kvmsg_new (0);
> >             kvmsg_fmt_key  (kvmsg, "%s%d", SUBTREE, randof (10000));
> >             //kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
> >             //kvmsg_fmt_body (kvmsg, "%s", LOCATION);
> >             kvmsg_set_body (kvmsg, (byte *) LOCATION, strlen(LOCATION));
> >             kvmsg_send     (kvmsg, publisher);
> >             kvmsg_destroy (&kvmsg);
> >             alarm = zclock_time () + 1000;
> >         }
> >
> >
> >
> >         //  .skip
> >     }
> >     printf (" Interrupted\n%d messages in\n", (int) sequence);
> >     zhash_destroy (&kvmap);
> >     zctx_destroy (&ctx);
> >
> >     if((int)sequence>0){
> >       exit(0);
> >     } else {
> >       exit(1);
> >     }
> > }
> >
> >
> >
> >
> > ---------- Code of the Server
> >
> -----------------------------------------------------------------------------------------------------
> >
> >
> > #include "kvsimple.c"
> >
> > //  Routing information for a key-value snapshot
> > typedef struct {
> >     void *socket;           //  ROUTER socket to send to
> >     zframe_t *identity;     //  Identity of peer who requested state
> >     char *subtree;          //  Client subtree specification
> > } kvroute_t;
> >
> > //  Send one state snapshot key-value pair to a socket
> > //  Hash item data is our kvmsg object, ready to send
> > static int
> > s_send_single (const char *key, void *data, void *args)
> > {
> >     kvroute_t *kvroute = (kvroute_t *) args;
> >     kvmsg_t *kvmsg = (kvmsg_t *) data;
> >     if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
> >     &&  memcmp (kvroute->subtree,
> >                 kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
> >         //  Send identity of recipient first
> >         zframe_send (&kvroute->identity,
> >             kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
> >         kvmsg_send (kvmsg, kvroute->socket);
> >     }
> >     return 0;
> > }
> >
> > //  The main task is identical to clonesrv3 except for where it
> > //  handles subtrees.
> > //  .skip
> >
> > int main (void)
> > {
> >     //  Prepare our context and sockets
> >     zctx_t *ctx = zctx_new ();
> >     void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
> >     zsocket_bind (snapshot, "tcp://*:5556");
> >     void *publisher = zsocket_new (ctx, ZMQ_PUB);
> >     zsocket_bind (publisher, "tcp://*:5557");
> >     void *collector = zsocket_new (ctx, ZMQ_PULL);
> >     zsocket_bind (collector, "tcp://*:5558");
> >
> >     int64_t sequence = 0;
> >     zhash_t *kvmap = zhash_new ();
> >
> >     zmq_pollitem_t items [] = {
> >         { collector, 0, ZMQ_POLLIN, 0 },
> >         { snapshot, 0, ZMQ_POLLIN, 0 }
> >     };
> >     while (!zctx_interrupted) {
> >         int rc = zmq_poll (items, 2, 10 * ZMQ_POLL_MSEC);
> >
> >         //  Apply state update sent from client
> >         if (items [0].revents & ZMQ_POLLIN) {
> >             kvmsg_t *kvmsg = kvmsg_recv (collector);
> >             kvmsg_dump(kvmsg); // stderr msg body
> >             if (!kvmsg)
> >                 break;          //  Interrupted
> >             kvmsg_set_sequence (kvmsg, ++sequence);
> >             kvmsg_send (kvmsg, publisher);
> >             //kvmsg_store (&kvmsg, kvmap); // don't store to save time
> >             kvmsg_destroy(&kvmsg); // remove message from memory
> >             printf ("I: publishing update %5d\n", (int) sequence);
> >             //kvmsg_dump(kvmsg); // stderr msg body
> >         }
> >         //  Execute state snapshot request
> >         if (items [1].revents & ZMQ_POLLIN) {
> >             zframe_t *identity = zframe_recv (snapshot);
> >             if (!identity)
> >                 break;          //  Interrupted
> >
> >             //  .until
> >             //  Request is in second frame of message
> >             char *request = zstr_recv (snapshot);
> >             char *subtree = NULL;
> >             if (streq (request, "ICANHAZ?")) {
> >                 free (request);
> >                 subtree = zstr_recv (snapshot);
> >             }
> >             //  .skip
> >             else {
> >                 printf ("E: bad request, aborting\n");
> >                 break;
> >             }
> >             //  .until
> >             //  Send state snapshot to client
> >             kvroute_t routing = { snapshot, identity, subtree };
> >             //  .skip
> >
> >             //  For each entry in kvmap, send kvmsg to client
> >             zhash_foreach (kvmap, s_send_single, &routing);
> >
> >             //  .until
> >             //  Now send END message with sequence number
> >             printf ("I: sending shapshot=%d\n", (int) sequence);
> >             zframe_send (&identity, snapshot, ZFRAME_MORE);
> >             kvmsg_t *kvmsg = kvmsg_new (sequence);
> >             kvmsg_set_key  (kvmsg, "KTHXBAI");
> >             kvmsg_set_body (kvmsg, (byte *) subtree, 0);
> >             kvmsg_send     (kvmsg, snapshot);
> >             kvmsg_destroy (&kvmsg);
> >             free (subtree);
> >         }
> >     }
> >     //  .skip
> >     printf (" Interrupted\n%d messages handled\n", (int) sequence);
> >     zhash_destroy (&kvmap);
> >     zctx_destroy (&ctx);
> >
> >     return 0;
> > }
> >
> >
> >
> > _______________________________________________
> > zeromq-dev mailing list
> > zeromq-dev at lists.zeromq.org
> > http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> >
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20150831/ae82f5fc/attachment.htm>


More information about the zeromq-dev mailing list