[zeromq-dev] PubSub server strange behavior when scaling
Pieter Hintjens
ph at imatix.com
Mon Aug 31 13:34:16 CEST 2015
You could retest this with a simpler example. The key-value store does
a fair amount of work per message and it's possible your set-up
provokes scheduler thrashing, i.e. the OS spends more time swapping
threads in and out than it leaves for the threads...
On Fri, Aug 28, 2015 at 5:31 PM, Blair Anson <blair.anson at gmail.com> wrote:
> Hi, I have been load testing a pubsub server (essentially CloneServer3 from
> the examples) and have come across some curious behavior that is stopping me
> from adding additional clients once I get to around 400. I was wondering if
> anyone has an idea as to what is happening when I reach 400 clients, and how
> I can resolve the issue?
>
> Each client performs:
> 1. a snapshot request
> 2. receives snap shot
> 3. loops over a subscriber & publisher calls until the process is killed
>
> (code for the client and server are at the bottom of the email)
>
> zeromq version is 4.0.4 (I have some project dependencies so I'm unable to
> update to the latest version at the moment)
>
> I'm running the PubSub server on an m3-medium in Amazon.
>
> Increasing the number of clients in steps of 100 up to 300 results in the
> following metrics
>
> CPU Avg% 14
> CPU Peak% 23
> lsof | grep -c server 2784
>
> So overall the box isn't working very hard, however if I bump up the clients
> to 400 then I get
>
> CPU Avg% 98
> CPU Peak% 100
> lsof | grep -c server 3111
>
> the box suddenly starts to thrash to CPU. Delving into top we can see the
> following...
>
> %Cpu(s): 15.6 us, 43.1 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.5 si, 40.7
> st
>
> it appears that most of the load is attributed to "sy" and "st" which are..
>
> us: user cpu time (or) % CPU time spent in user space
> sy: system cpu time (or) % CPU time spent in kernel space
> ni: user nice cpu time (or) % CPU time spent on low priority processes
> id: idle cpu time (or) % CPU time spent idle
> wa: io wait cpu time (or) % CPU time spent in wait (on disk)
> hi: hardware irq (or) % CPU time spent servicing/handling hardware
> interrupts
> si: software irq (or) % CPU time spent servicing/handling software
> interrupts
> st: steal time - - % CPU time in involuntary wait by virtual cpu while
> hypervisor is servicing another processor (or) % CPU time stolen from a
> virtual machine
>
> I don't understand how increasing the load of a zmq user process results in
> the kernel & hypervisor consuming all the available CPU. I would have
> expected the User CPU Time to increase as that is the type of cpu usage the
> zmq pubsub server consumes.
> Does anyone have any ideas as to why I'm hitting a limit?
>
>
> ----------- Code of the Client
> -----------------------------------------------------------------------------------------------------
>
> #include "kvsimple.c"
>
> // This client is identical to clonecli3 except for where we
> // handle subtrees.
> #define SUBTREE
> "/a14d5afa-edec-4545-82a4-d6db4a5441e0:eda2f56f-4afe-4955-95e0-dc13a32c7663/"
>
> #define LOCATION
> "{\"_latlon\":[{\"longitude\":-0.074397,\"latitude\":51.5452491,\"iM\":1},{\"longitude\":-0.074297,\"latitude\":51.5452298,\"iM\":1},{\"longitude\":-0.0742712,\"latitude\":51.5452147,\"iM\":1}],\"_findmUserId\":\"eda2f56f-4afe-4955-95e0-dc13a32c7663\",\"_bearing\":0.0,\"maxLocationPoints\":10}"
>
> #define SUBTREERESULTS "/results/"
>
> int main (void)
> {
> // Prepare our context and subscriber
> zctx_t *ctx = zctx_new ();
> void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
> zsocket_connect (snapshot,
> "tcp://ec2-xx-xx-xx-xx.compute.amazonaws.com:5556");
> void *subscriber = zsocket_new (ctx, ZMQ_SUB);
>
> zsocket_set_subscribe (subscriber, SUBTREE);
>
> zsocket_connect (subscriber,
> "tcp://ec2-xx-xx-xx-xx.compute.amazonaws.com:5557");
>
>
> void *publisher = zsocket_new (ctx, ZMQ_PUSH);
> zsocket_connect (publisher,
> "tcp://ec2-xx-xx-xx-xx.compute.amazonaws.com:5558");
>
> zhash_t *kvmap = zhash_new ();
> srandom ((unsigned) time (NULL));
>
>
> // .until
> // We first request a state snapshot:
> int64_t sequence = 0;
> zstr_sendm (snapshot, "ICANHAZ?");
> zstr_send (snapshot, SUBTREE);
> // .skip
> while (true) {
> kvmsg_t *kvmsg = kvmsg_recv (snapshot);
> if (!kvmsg)
> break; // Interrupted
> if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
> sequence = kvmsg_sequence (kvmsg);
> printf ("I: received snapshot=%d\n", (int) sequence);
> kvmsg_dump(kvmsg);
> kvmsg_destroy (&kvmsg);
> break; // Done
> }
> kvmsg_store (&kvmsg, kvmap);
> }
> int64_t alarm = zclock_time () + 1000;
> int64_t startTime = zclock_time ();
>
> while (!zctx_interrupted) {
> zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
> int tickless = (int) ((alarm - zclock_time ()));
> if (tickless < 0)
> tickless = 0;
> int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
> if (rc == -1)
> break; // Context has been shut down
>
> if (items [0].revents & ZMQ_POLLIN) {
> kvmsg_t *kvmsg = kvmsg_recv (subscriber);
> if (!kvmsg)
> break; // Interrupted
>
> // Discard out-of-sequence kvmsgs, incl. heartbeats
> if (kvmsg_sequence (kvmsg) > sequence) {
> kvmsg_dump(kvmsg);
> sequence = kvmsg_sequence (kvmsg);
> kvmsg_store (&kvmsg, kvmap);
> printf ("I: received update=%d\n", (int) sequence);
> int64_t elapsedtime = zclock_time ()-startTime;
> printf (" Time elapsed %" PRId64"\n", elapsedtime);
>
> }
> else
> kvmsg_destroy (&kvmsg);
> }
>
>
>
> // .until
> // If we timed out, generate a random kvmsg
> if (zclock_time () >= alarm) {
> kvmsg_t *kvmsg = kvmsg_new (0);
> kvmsg_fmt_key (kvmsg, "%s%d", SUBTREE, randof (10000));
> //kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
> //kvmsg_fmt_body (kvmsg, "%s", LOCATION);
> kvmsg_set_body (kvmsg, (byte *) LOCATION, strlen(LOCATION));
> kvmsg_send (kvmsg, publisher);
> kvmsg_destroy (&kvmsg);
> alarm = zclock_time () + 1000;
> }
>
>
>
> // .skip
> }
> printf (" Interrupted\n%d messages in\n", (int) sequence);
> zhash_destroy (&kvmap);
> zctx_destroy (&ctx);
>
> if((int)sequence>0){
> exit(0);
> } else {
> exit(1);
> }
> }
>
>
>
>
> ---------- Code of the Server
> -----------------------------------------------------------------------------------------------------
>
>
> #include "kvsimple.c"
>
> // Routing information for a key-value snapshot
> typedef struct {
> void *socket; // ROUTER socket to send to
> zframe_t *identity; // Identity of peer who requested state
> char *subtree; // Client subtree specification
> } kvroute_t;
>
> // Send one state snapshot key-value pair to a socket
> // Hash item data is our kvmsg object, ready to send
> static int
> s_send_single (const char *key, void *data, void *args)
> {
> kvroute_t *kvroute = (kvroute_t *) args;
> kvmsg_t *kvmsg = (kvmsg_t *) data;
> if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
> && memcmp (kvroute->subtree,
> kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
> // Send identity of recipient first
> zframe_send (&kvroute->identity,
> kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
> kvmsg_send (kvmsg, kvroute->socket);
> }
> return 0;
> }
>
> // The main task is identical to clonesrv3 except for where it
> // handles subtrees.
> // .skip
>
> int main (void)
> {
> // Prepare our context and sockets
> zctx_t *ctx = zctx_new ();
> void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
> zsocket_bind (snapshot, "tcp://*:5556");
> void *publisher = zsocket_new (ctx, ZMQ_PUB);
> zsocket_bind (publisher, "tcp://*:5557");
> void *collector = zsocket_new (ctx, ZMQ_PULL);
> zsocket_bind (collector, "tcp://*:5558");
>
> int64_t sequence = 0;
> zhash_t *kvmap = zhash_new ();
>
> zmq_pollitem_t items [] = {
> { collector, 0, ZMQ_POLLIN, 0 },
> { snapshot, 0, ZMQ_POLLIN, 0 }
> };
> while (!zctx_interrupted) {
> int rc = zmq_poll (items, 2, 10 * ZMQ_POLL_MSEC);
>
> // Apply state update sent from client
> if (items [0].revents & ZMQ_POLLIN) {
> kvmsg_t *kvmsg = kvmsg_recv (collector);
> kvmsg_dump(kvmsg); // stderr msg body
> if (!kvmsg)
> break; // Interrupted
> kvmsg_set_sequence (kvmsg, ++sequence);
> kvmsg_send (kvmsg, publisher);
> //kvmsg_store (&kvmsg, kvmap); // don't store to save time
> kvmsg_destroy(&kvmsg); // remove message from memory
> printf ("I: publishing update %5d\n", (int) sequence);
> //kvmsg_dump(kvmsg); // stderr msg body
> }
> // Execute state snapshot request
> if (items [1].revents & ZMQ_POLLIN) {
> zframe_t *identity = zframe_recv (snapshot);
> if (!identity)
> break; // Interrupted
>
> // .until
> // Request is in second frame of message
> char *request = zstr_recv (snapshot);
> char *subtree = NULL;
> if (streq (request, "ICANHAZ?")) {
> free (request);
> subtree = zstr_recv (snapshot);
> }
> // .skip
> else {
> printf ("E: bad request, aborting\n");
> break;
> }
> // .until
> // Send state snapshot to client
> kvroute_t routing = { snapshot, identity, subtree };
> // .skip
>
> // For each entry in kvmap, send kvmsg to client
> zhash_foreach (kvmap, s_send_single, &routing);
>
> // .until
> // Now send END message with sequence number
> printf ("I: sending shapshot=%d\n", (int) sequence);
> zframe_send (&identity, snapshot, ZFRAME_MORE);
> kvmsg_t *kvmsg = kvmsg_new (sequence);
> kvmsg_set_key (kvmsg, "KTHXBAI");
> kvmsg_set_body (kvmsg, (byte *) subtree, 0);
> kvmsg_send (kvmsg, snapshot);
> kvmsg_destroy (&kvmsg);
> free (subtree);
> }
> }
> // .skip
> printf (" Interrupted\n%d messages handled\n", (int) sequence);
> zhash_destroy (&kvmap);
> zctx_destroy (&ctx);
>
> return 0;
> }
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
More information about the zeromq-dev
mailing list