[zeromq-dev] PubSub server strange behavior when scaling

Pieter Hintjens ph at imatix.com
Mon Aug 31 13:34:16 CEST 2015


You could retest this with a simpler example. The key-value store does
a fair amount of work per message and it's possible your set-up
provokes scheduler thrashing, i.e. the OS spends more time swapping
threads in and out than it leaves for the threads...

On Fri, Aug 28, 2015 at 5:31 PM, Blair Anson <blair.anson at gmail.com> wrote:
> Hi, I have been load testing a pubsub server (essentially CloneServer3 from
> the examples) and have come across some curious behavior that is stopping me
> from adding additional clients once I get to around 400. I was wondering if
> anyone has an idea as to what is happening when I reach 400 clients, and how
> I can resolve the issue?
>
> Each client performs:
> 1. a snapshot request
> 2. receives snap shot
> 3. loops over a subscriber & publisher calls until the process is killed
>
> (code for the client and server are at the bottom of the email)
>
> zeromq version is 4.0.4 (I have some project dependencies so I'm unable to
> update to the latest version at the moment)
>
> I'm running the PubSub server on an m3-medium in Amazon.
>
> Increasing the number of clients in steps of 100 up to 300 results in the
> following metrics
>
> CPU Avg%                 14
> CPU Peak%                23
> lsof | grep -c server   2784
>
> So overall the box isn't working very hard, however if I bump up the clients
> to 400 then I get
>
> CPU Avg%                  98
> CPU Peak%                100
> lsof | grep -c server   3111
>
> the box suddenly starts to thrash to CPU. Delving into top we can see the
> following...
>
> %Cpu(s): 15.6 us, 43.1 sy,  0.0 ni,  0.0 id,  0.0 wa,  0.0 hi,  0.5 si, 40.7
> st
>
> it appears that most of the load is attributed to "sy" and "st" which are..
>
> us: user cpu time (or) % CPU time spent in user space
> sy: system cpu time (or) % CPU time spent in kernel space
> ni: user nice cpu time (or) % CPU time spent on low priority processes
> id: idle cpu time (or) % CPU time spent idle
> wa: io wait cpu time (or) % CPU time spent in wait (on disk)
> hi: hardware irq (or) % CPU time spent servicing/handling hardware
> interrupts
> si: software irq (or) % CPU time spent servicing/handling software
> interrupts
> st: steal time - - % CPU time in involuntary wait by virtual cpu while
> hypervisor is servicing another processor (or) % CPU time stolen from a
> virtual machine
>
> I don't understand how increasing the load of a zmq user process results in
> the kernel & hypervisor consuming all the available CPU. I would have
> expected the User CPU Time to increase as that is the type of cpu usage the
> zmq pubsub server consumes.
> Does anyone have any ideas as to why I'm hitting a limit?
>
>
> ----------- Code of the Client
> -----------------------------------------------------------------------------------------------------
>
> #include "kvsimple.c"
>
> //  This client is identical to clonecli3 except for where we
> //  handle subtrees.
> #define SUBTREE
> "/a14d5afa-edec-4545-82a4-d6db4a5441e0:eda2f56f-4afe-4955-95e0-dc13a32c7663/"
>
> #define LOCATION
> "{\"_latlon\":[{\"longitude\":-0.074397,\"latitude\":51.5452491,\"iM\":1},{\"longitude\":-0.074297,\"latitude\":51.5452298,\"iM\":1},{\"longitude\":-0.0742712,\"latitude\":51.5452147,\"iM\":1}],\"_findmUserId\":\"eda2f56f-4afe-4955-95e0-dc13a32c7663\",\"_bearing\":0.0,\"maxLocationPoints\":10}"
>
> #define SUBTREERESULTS "/results/"
>
> int main (void)
> {
>     //  Prepare our context and subscriber
>     zctx_t *ctx = zctx_new ();
>     void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
>     zsocket_connect (snapshot,
> "tcp://ec2-xx-xx-xx-xx.compute.amazonaws.com:5556");
>     void *subscriber = zsocket_new (ctx, ZMQ_SUB);
>
>     zsocket_set_subscribe (subscriber, SUBTREE);
>
>     zsocket_connect (subscriber,
> "tcp://ec2-xx-xx-xx-xx.compute.amazonaws.com:5557");
>
>
>     void *publisher = zsocket_new (ctx, ZMQ_PUSH);
>     zsocket_connect (publisher,
> "tcp://ec2-xx-xx-xx-xx.compute.amazonaws.com:5558");
>
>     zhash_t *kvmap = zhash_new ();
>     srandom ((unsigned) time (NULL));
>
>
>     //  .until
>     //  We first request a state snapshot:
>     int64_t sequence = 0;
>     zstr_sendm (snapshot, "ICANHAZ?");
>     zstr_send  (snapshot, SUBTREE);
>     //  .skip
>     while (true) {
>         kvmsg_t *kvmsg = kvmsg_recv (snapshot);
>         if (!kvmsg)
>             break;          //  Interrupted
>         if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
>             sequence = kvmsg_sequence (kvmsg);
>             printf ("I: received snapshot=%d\n", (int) sequence);
>             kvmsg_dump(kvmsg);
>             kvmsg_destroy (&kvmsg);
>             break;          //  Done
>         }
>         kvmsg_store (&kvmsg, kvmap);
>     }
>     int64_t alarm = zclock_time () + 1000;
>     int64_t startTime = zclock_time ();
>
>     while (!zctx_interrupted) {
>         zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
>         int tickless = (int) ((alarm - zclock_time ()));
>         if (tickless < 0)
>             tickless = 0;
>         int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
>         if (rc == -1)
>             break;              //  Context has been shut down
>
>         if (items [0].revents & ZMQ_POLLIN) {
>             kvmsg_t *kvmsg = kvmsg_recv (subscriber);
>             if (!kvmsg)
>                 break;          //  Interrupted
>
>             //  Discard out-of-sequence kvmsgs, incl. heartbeats
>             if (kvmsg_sequence (kvmsg) > sequence) {
>                 kvmsg_dump(kvmsg);
>                 sequence = kvmsg_sequence (kvmsg);
>                 kvmsg_store (&kvmsg, kvmap);
>                 printf ("I: received update=%d\n", (int) sequence);
>                 int64_t elapsedtime = zclock_time ()-startTime;
>                 printf (" Time elapsed %" PRId64"\n", elapsedtime);
>
>             }
>             else
>                 kvmsg_destroy (&kvmsg);
>         }
>
>
>
>         //  .until
>         //  If we timed out, generate a random kvmsg
>         if (zclock_time () >= alarm) {
>             kvmsg_t *kvmsg = kvmsg_new (0);
>             kvmsg_fmt_key  (kvmsg, "%s%d", SUBTREE, randof (10000));
>             //kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
>             //kvmsg_fmt_body (kvmsg, "%s", LOCATION);
>             kvmsg_set_body (kvmsg, (byte *) LOCATION, strlen(LOCATION));
>             kvmsg_send     (kvmsg, publisher);
>             kvmsg_destroy (&kvmsg);
>             alarm = zclock_time () + 1000;
>         }
>
>
>
>         //  .skip
>     }
>     printf (" Interrupted\n%d messages in\n", (int) sequence);
>     zhash_destroy (&kvmap);
>     zctx_destroy (&ctx);
>
>     if((int)sequence>0){
>       exit(0);
>     } else {
>       exit(1);
>     }
> }
>
>
>
>
> ---------- Code of the Server
> -----------------------------------------------------------------------------------------------------
>
>
> #include "kvsimple.c"
>
> //  Routing information for a key-value snapshot
> typedef struct {
>     void *socket;           //  ROUTER socket to send to
>     zframe_t *identity;     //  Identity of peer who requested state
>     char *subtree;          //  Client subtree specification
> } kvroute_t;
>
> //  Send one state snapshot key-value pair to a socket
> //  Hash item data is our kvmsg object, ready to send
> static int
> s_send_single (const char *key, void *data, void *args)
> {
>     kvroute_t *kvroute = (kvroute_t *) args;
>     kvmsg_t *kvmsg = (kvmsg_t *) data;
>     if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
>     &&  memcmp (kvroute->subtree,
>                 kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
>         //  Send identity of recipient first
>         zframe_send (&kvroute->identity,
>             kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
>         kvmsg_send (kvmsg, kvroute->socket);
>     }
>     return 0;
> }
>
> //  The main task is identical to clonesrv3 except for where it
> //  handles subtrees.
> //  .skip
>
> int main (void)
> {
>     //  Prepare our context and sockets
>     zctx_t *ctx = zctx_new ();
>     void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
>     zsocket_bind (snapshot, "tcp://*:5556");
>     void *publisher = zsocket_new (ctx, ZMQ_PUB);
>     zsocket_bind (publisher, "tcp://*:5557");
>     void *collector = zsocket_new (ctx, ZMQ_PULL);
>     zsocket_bind (collector, "tcp://*:5558");
>
>     int64_t sequence = 0;
>     zhash_t *kvmap = zhash_new ();
>
>     zmq_pollitem_t items [] = {
>         { collector, 0, ZMQ_POLLIN, 0 },
>         { snapshot, 0, ZMQ_POLLIN, 0 }
>     };
>     while (!zctx_interrupted) {
>         int rc = zmq_poll (items, 2, 10 * ZMQ_POLL_MSEC);
>
>         //  Apply state update sent from client
>         if (items [0].revents & ZMQ_POLLIN) {
>             kvmsg_t *kvmsg = kvmsg_recv (collector);
>             kvmsg_dump(kvmsg); // stderr msg body
>             if (!kvmsg)
>                 break;          //  Interrupted
>             kvmsg_set_sequence (kvmsg, ++sequence);
>             kvmsg_send (kvmsg, publisher);
>             //kvmsg_store (&kvmsg, kvmap); // don't store to save time
>             kvmsg_destroy(&kvmsg); // remove message from memory
>             printf ("I: publishing update %5d\n", (int) sequence);
>             //kvmsg_dump(kvmsg); // stderr msg body
>         }
>         //  Execute state snapshot request
>         if (items [1].revents & ZMQ_POLLIN) {
>             zframe_t *identity = zframe_recv (snapshot);
>             if (!identity)
>                 break;          //  Interrupted
>
>             //  .until
>             //  Request is in second frame of message
>             char *request = zstr_recv (snapshot);
>             char *subtree = NULL;
>             if (streq (request, "ICANHAZ?")) {
>                 free (request);
>                 subtree = zstr_recv (snapshot);
>             }
>             //  .skip
>             else {
>                 printf ("E: bad request, aborting\n");
>                 break;
>             }
>             //  .until
>             //  Send state snapshot to client
>             kvroute_t routing = { snapshot, identity, subtree };
>             //  .skip
>
>             //  For each entry in kvmap, send kvmsg to client
>             zhash_foreach (kvmap, s_send_single, &routing);
>
>             //  .until
>             //  Now send END message with sequence number
>             printf ("I: sending shapshot=%d\n", (int) sequence);
>             zframe_send (&identity, snapshot, ZFRAME_MORE);
>             kvmsg_t *kvmsg = kvmsg_new (sequence);
>             kvmsg_set_key  (kvmsg, "KTHXBAI");
>             kvmsg_set_body (kvmsg, (byte *) subtree, 0);
>             kvmsg_send     (kvmsg, snapshot);
>             kvmsg_destroy (&kvmsg);
>             free (subtree);
>         }
>     }
>     //  .skip
>     printf (" Interrupted\n%d messages handled\n", (int) sequence);
>     zhash_destroy (&kvmap);
>     zctx_destroy (&ctx);
>
>     return 0;
> }
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>



More information about the zeromq-dev mailing list