[zeromq-dev] PubSub server strange behavior when scaling

Blair Anson blair.anson at gmail.com
Fri Aug 28 17:31:04 CEST 2015

Hi, I have been load testing a pubsub server (essentially CloneServer3 from
the examples) and have come across some curious behavior that is stopping
me from adding additional clients once I get to around 400. I was wondering
if anyone has an idea as to what is happening when I reach 400 clients, and
how I can resolve the issue?

Each client performs:
1. a snapshot request
2. receives snap shot
3. loops over a subscriber & publisher calls until the process is killed

(code for the client and server are at the bottom of the email)

zeromq version is 4.0.4 (I have some project dependencies so I'm unable to
update to the latest version at the moment)

I'm running the PubSub server on an m3-medium in Amazon.

Increasing the number of clients in steps of 100 up to 300 results in the
following metrics

CPU Avg%                 14
*CPU Peak%                *23
*lsof | grep -c server   *2784

So overall the box isn't working very hard, however if I bump up the
clients to 400 then I get

CPU Avg%                  98
*CPU Peak%                *100
*lsof | grep -c server   *3111

the box suddenly starts to thrash to CPU. Delving into top we can see the

%Cpu(s): 15.6 us, 43.1 sy,  0.0 ni,  0.0 id,  0.0 wa,  0.0 hi,  0.5 si,
40.7 st

it appears that most of the load is attributed to "sy" and "st" which are..

us: user cpu time (or) % CPU time spent in user space
*sy: system cpu time (or) % CPU time spent in kernel space*
ni: user nice cpu time (or) % CPU time spent on low priority processes
id: idle cpu time (or) % CPU time spent idle
wa: io wait cpu time (or) % CPU time spent in wait (on disk)
hi: hardware irq (or) % CPU time spent servicing/handling hardware
si: software irq (or) % CPU time spent servicing/handling software
*st: steal time - - % CPU time in involuntary wait by virtual cpu while
hypervisor is servicing another processor (or) % CPU time stolen from a
virtual machine*

I don't understand how increasing the load of a zmq user process results in
the kernel & hypervisor consuming all the available CPU. I would have
expected the User CPU Time to increase as that is the type of cpu usage the
zmq pubsub server consumes.
Does anyone have any ideas as to why I'm hitting a limit?

----------- Code of the Client

#include "kvsimple.c"

//  This client is identical to clonecli3 except for where we
//  handle subtrees.
#define SUBTREE

#define LOCATION

#define SUBTREERESULTS "/results/"

int main (void)
    //  Prepare our context and subscriber
    zctx_t *ctx = zctx_new ();
    void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (snapshot,
    void *subscriber = zsocket_new (ctx, ZMQ_SUB);

    zsocket_set_subscribe (subscriber, SUBTREE);

    zsocket_connect (subscriber,

    void *publisher = zsocket_new (ctx, ZMQ_PUSH);
    zsocket_connect (publisher,

    zhash_t *kvmap = zhash_new ();
    srandom ((unsigned) time (NULL));

    //  .until
    //  We first request a state snapshot:
    int64_t sequence = 0;
    zstr_sendm (snapshot, "ICANHAZ?");
    zstr_send  (snapshot, SUBTREE);
    //  .skip
    while (true) {
        kvmsg_t *kvmsg = kvmsg_recv (snapshot);
        if (!kvmsg)
            break;          //  Interrupted
        if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
            sequence = kvmsg_sequence (kvmsg);
            printf ("I: received snapshot=%d\n", (int) sequence);
            kvmsg_destroy (&kvmsg);
            break;          //  Done
        kvmsg_store (&kvmsg, kvmap);
    int64_t alarm = zclock_time () + 1000;
    int64_t startTime = zclock_time ();

    while (!zctx_interrupted) {
        zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
        int tickless = (int) ((alarm - zclock_time ()));
        if (tickless < 0)
            tickless = 0;
        int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  Context has been shut down

        if (items [0].revents & ZMQ_POLLIN) {
            kvmsg_t *kvmsg = kvmsg_recv (subscriber);
            if (!kvmsg)
                break;          //  Interrupted

            //  Discard out-of-sequence kvmsgs, incl. heartbeats
            if (kvmsg_sequence (kvmsg) > sequence) {
                sequence = kvmsg_sequence (kvmsg);
                kvmsg_store (&kvmsg, kvmap);
                printf ("I: received update=%d\n", (int) sequence);
                int64_t elapsedtime = zclock_time ()-startTime;
                printf (" Time elapsed %" PRId64"\n", elapsedtime);

                kvmsg_destroy (&kvmsg);

        //  .until
        //  If we timed out, generate a random kvmsg
        if (zclock_time () >= alarm) {
            kvmsg_t *kvmsg = kvmsg_new (0);
            kvmsg_fmt_key  (kvmsg, "%s%d", SUBTREE, randof (10000));
            //kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
            //kvmsg_fmt_body (kvmsg, "%s", LOCATION);
            kvmsg_set_body (kvmsg, (byte *) LOCATION, strlen(LOCATION));
            kvmsg_send     (kvmsg, publisher);
            kvmsg_destroy (&kvmsg);
            alarm = zclock_time () + 1000;

        //  .skip
    printf (" Interrupted\n%d messages in\n", (int) sequence);
    zhash_destroy (&kvmap);
    zctx_destroy (&ctx);

    } else {

---------- Code of the Server

#include "kvsimple.c"

//  Routing information for a key-value snapshot
typedef struct {
    void *socket;           //  ROUTER socket to send to
    zframe_t *identity;     //  Identity of peer who requested state
    char *subtree;          //  Client subtree specification
} kvroute_t;

//  Send one state snapshot key-value pair to a socket
//  Hash item data is our kvmsg object, ready to send
static int
s_send_single (const char *key, void *data, void *args)
    kvroute_t *kvroute = (kvroute_t *) args;
    kvmsg_t *kvmsg = (kvmsg_t *) data;
    if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
    &&  memcmp (kvroute->subtree,
                kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
        //  Send identity of recipient first
        zframe_send (&kvroute->identity,
            kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
        kvmsg_send (kvmsg, kvroute->socket);
    return 0;

//  The main task is identical to clonesrv3 except for where it
//  handles subtrees.
//  .skip

int main (void)
    //  Prepare our context and sockets
    zctx_t *ctx = zctx_new ();
    void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (snapshot, "tcp://*:5556");
    void *publisher = zsocket_new (ctx, ZMQ_PUB);
    zsocket_bind (publisher, "tcp://*:5557");
    void *collector = zsocket_new (ctx, ZMQ_PULL);
    zsocket_bind (collector, "tcp://*:5558");

    int64_t sequence = 0;
    zhash_t *kvmap = zhash_new ();

    zmq_pollitem_t items [] = {
        { collector, 0, ZMQ_POLLIN, 0 },
        { snapshot, 0, ZMQ_POLLIN, 0 }
    while (!zctx_interrupted) {
        int rc = zmq_poll (items, 2, 10 * ZMQ_POLL_MSEC);

        //  Apply state update sent from client
        if (items [0].revents & ZMQ_POLLIN) {
            kvmsg_t *kvmsg = kvmsg_recv (collector);
            kvmsg_dump(kvmsg); // stderr msg body
            if (!kvmsg)
                break;          //  Interrupted
            kvmsg_set_sequence (kvmsg, ++sequence);
            kvmsg_send (kvmsg, publisher);
            //kvmsg_store (&kvmsg, kvmap); // don't store to save time
            kvmsg_destroy(&kvmsg); // remove message from memory
            printf ("I: publishing update %5d\n", (int) sequence);
            //kvmsg_dump(kvmsg); // stderr msg body
        //  Execute state snapshot request
        if (items [1].revents & ZMQ_POLLIN) {
            zframe_t *identity = zframe_recv (snapshot);
            if (!identity)
                break;          //  Interrupted

            //  .until
            //  Request is in second frame of message
            char *request = zstr_recv (snapshot);
            char *subtree = NULL;
            if (streq (request, "ICANHAZ?")) {
                free (request);
                subtree = zstr_recv (snapshot);
            //  .skip
            else {
                printf ("E: bad request, aborting\n");
            //  .until
            //  Send state snapshot to client
            kvroute_t routing = { snapshot, identity, subtree };
            //  .skip

            //  For each entry in kvmap, send kvmsg to client
            zhash_foreach (kvmap, s_send_single, &routing);

            //  .until
            //  Now send END message with sequence number
            printf ("I: sending shapshot=%d\n", (int) sequence);
            zframe_send (&identity, snapshot, ZFRAME_MORE);
            kvmsg_t *kvmsg = kvmsg_new (sequence);
            kvmsg_set_key  (kvmsg, "KTHXBAI");
            kvmsg_set_body (kvmsg, (byte *) subtree, 0);
            kvmsg_send     (kvmsg, snapshot);
            kvmsg_destroy (&kvmsg);
            free (subtree);
    //  .skip
    printf (" Interrupted\n%d messages handled\n", (int) sequence);
    zhash_destroy (&kvmap);
    zctx_destroy (&ctx);

    return 0;
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20150828/81d7e72d/attachment.htm>

More information about the zeromq-dev mailing list