[zeromq-dev] Paranoid Pirate Pattern

Riskybiz riskybizlive at live.com
Fri Jul 4 17:51:29 CEST 2014


Dear zeromq list,

 

I cannot get the Paranoid Pirate Pattern example to function fully on either
Windows 7 or Debian (Wheezy).  I have compiled the example code in both
environments, making a few modifications to delay intervals to fix extended
waiting periods and adding print statements to track whatever is going on.

 

I nearly have the Paranoid Pirate Pattern working on Debian with
zeromq-4.0.4 but for a fundamental problem, lpclient never connects to
ppqueue.  It tries to connect repeatedly and ultimately fails.  Meanwhile
(as far as I can tell)  ppqueue and ppworker(s) are happily communicating
and heartbeating with one another.

 

The lpclient program is reused from the 'Lazy Pirate' example.  My lpclient
example functions as expected with lpserver.

 

However when the same lpclient is used in conjunction with ppqueue and
ppworker to attempt the Paranoid Pirate Pattern then the zeromq network
connection between lpclient and ppqueue is not made/working.

 

I'd like to study the fully functioning example so that I can work out how
to integrate it into a real project!

 

Please, does anyone know how to get the Paranoid Pirate Pattern example
working?

 

Thanks,

 

Riskybiz.

 

Included below are the slightly modified .cpp files from my Debian attempts
at ppqueue, ppworker and lpclient.

//

//  Paranoid Pirate queue

//

//     Andreas Hoelzlwimmer
<mailto:%3candreas.hoelzlwimmer at fh-hagenberg.at>
<andreas.hoelzlwimmer at fh-hagenberg.at>

//

#include "zmsg.hpp"


 

#include <stdint.h>

#include <vector>


 

#define HEARTBEAT_LIVENESS  3       //  3-5 is reasonable

#define HEARTBEAT_INTERVAL  1000    //  msecs


 

//  This defines one active worker in our worker queue


 

typedef struct {

    std::string identity;           //  Address of worker

    int64_t     expiry;             //  Expires at this time

} worker_t;


 

//  Insert worker at end of queue, reset expiry

//  Worker must not already be in queue

static void

s_worker_append (std::vector<worker_t> &queue, std::string &identity)

{

    bool found = false;

    for (std::vector<worker_t>::iterator it = queue.begin(); it <
queue.end(); it++) {

        if (it->identity.compare(identity) == 0) {

            std::cout << "E: queue has duplicate worker identity " <<
identity.c_str() << std::endl;

            found = true;

            break;

        }

    }

    if (!found) {

        worker_t worker;

        worker.identity = identity;

        worker.expiry = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;

        queue.push_back(worker);

std::cout << "I: queue appends worker (" << identity << ")" << std::endl;

    }

}


 

//  Remove worker from queue, if present

static void

s_worker_delete (std::vector<worker_t> &queue, std::string &identity)

{

    for (std::vector<worker_t>::iterator it = queue.begin(); it <
queue.end(); it++) {

        if (it->identity.compare(identity) == 0) {

            it = queue.erase(it);

std::cout << "I: queue deletes worker (" << identity << ")" << std::endl;

            break;

         }

    }

}


 

//  Reset worker expiry, worker must be present

static void

s_worker_refresh (std::vector<worker_t> &queue, std::string &identity)

{

    bool found = false;

    for (std::vector<worker_t>::iterator it = queue.begin(); it <
queue.end(); it++) {

        if (it->identity.compare(identity) == 0) {

           it->expiry = s_clock ()

                 + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;

           found = true;

  std::cout << "I: queue refreshes worker (" << identity << ")" <<
std::endl;

           break;

        }

    }

    if (!found) {

       std::cout << "E: queue reports worker " << identity << " not ready"
<< std::endl;

    }

}


 

//  Pop next available worker off queue, return identity

static std::string

s_worker_dequeue (std::vector<worker_t> &queue)

{

    assert (queue.size());

    std::string identity = queue[0].identity;

    queue.erase(queue.begin());

std::cout << "I: queue accesses next available worker (" << identity << ")"
<< std::endl;

    return identity;

}


 

//  Look for & kill expired workers

static void

s_queue_purge (std::vector<worker_t> &queue)

{

    int64_t clock = s_clock();

    for (std::vector<worker_t>::iterator it = queue.begin(); it <
queue.end(); it++) {

        if (clock > it->expiry) {

           it = queue.erase(it)-1;

  std::cout << "I: queue purges worker (" << it->identity << ")" <<
std::endl;

        }

    }

}


 

int main (void)

{

    s_version_assert (2, 1);


 

    //  Prepare our context and sockets

    zmq::context_t context(1);

    zmq::socket_t frontend(context, ZMQ_ROUTER);

    zmq::socket_t backend (context, ZMQ_ROUTER);

    frontend.bind("tcp://*:5555");    //  For clients

    backend.bind ("tcp://*:5556");    //  For workers


 

    //  Queue of available workers

    std::vector<worker_t> queue;


 

    //  Send out heartbeats at regular intervals

    int64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;


 

    while (1) {

        zmq::pollitem_t items [] = {

            { backend,  0, ZMQ_POLLIN, 0 },

            { frontend, 0, ZMQ_POLLIN, 0 }

        };

        //  Poll frontend only if we have available workers

        if (queue.size()) {

            zmq::poll (items, 2, HEARTBEAT_INTERVAL);//HEARTBEAT_INTERVAL *
1000

        } else {

            zmq::poll (items, 1, HEARTBEAT_INTERVAL);//HEARTBEAT_INTERVAL *
1000

        }


 

        //  Handle worker activity on backend

        if (items [0].revents & ZMQ_POLLIN) {

            zmsg msg (backend);

            std::string identity(msg.unwrap ());


 

            //  Return reply to client if it's not a control message

            if (msg.parts () == 1) {

                if (strcmp (msg.address (), "READY") == 0) {

                    s_worker_delete (queue, identity);

                    s_worker_append (queue, identity);

                }

                else {

                   if (strcmp (msg.address (), "HEARTBEAT") == 0) {

                       s_worker_refresh (queue, identity);

                   } else {

                       std::cout << "E: queue recieved invalid message from
" << identity << std::endl;

                       msg.dump ();

                   }

                }

            }

            else {

                msg.send (frontend);

std::cout << "I: queue sends message via frontend (" << identity << ")" <<
std::endl;

                s_worker_append (queue, identity);

            }

        }

        if (items [1].revents & ZMQ_POLLIN) {

            //  Now get next client request, route to next worker

            zmsg msg (frontend);

            std::string identity = std::string(s_worker_dequeue (queue));

            msg.push_front((char*)identity.c_str());

            msg.send (backend);

        }


 

        //  Send heartbeats to idle workers if it's time

        if (s_clock () > heartbeat_at) {

            for (std::vector<worker_t>::iterator it = queue.begin(); it <
queue.end(); it++) {

                zmsg msg ("HEARTBEAT");

                msg.wrap (it->identity.c_str(), NULL);

                msg.send (backend);

std::cout << "I: queue sends heartbeat to worker (" << it->identity << ")
via backend" << std::endl;

            }

            heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;

        }

        s_queue_purge(queue);

    }

    //  We never exit the main loop

    //  But pretend to do the right shutdown anyhow

    queue.clear();

    return 0;

}


 

//

//  Paranoid Pirate worker

//

//

//     Andreas Hoelzlwimmer
<mailto:%3candreas.hoelzlwimmer at fh-hagenberg.at>
<andreas.hoelzlwimmer at fh-hagenberg.at>

//

#include "zmsg.hpp"


 

#include <iomanip>


 

#define HEARTBEAT_LIVENESS  3       //  3-5 is reasonable

#define HEARTBEAT_INTERVAL  1000    //  msecs

#define INTERVAL_INIT       1000    //  Initial reconnect

#define INTERVAL_MAX       32000    //  After exponential backoff


 

//  Helper function that returns a new configured socket

//  connected to the Hello World server

//

std::string identity;


 

static zmq::socket_t *

s_worker_socket (zmq::context_t &context) {

    zmq::socket_t * worker = new zmq::socket_t(context, ZMQ_DEALER);


 

    //  Set random identity to make tracing easier

    identity = s_set_id(*worker);

    worker->connect ("tcp://localhost:5556");


 

    //  Configure socket to not wait at close time

    int linger = 0;

    worker->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));


 

    //  Tell queue we're ready for work

    std::cout << "I: (" << identity << ") worker ready" << std::endl;

    s_send (*worker, "READY");


 

    return worker;

}


 

int main (void)

{

    s_version_assert (2, 1);

    srandom ((unsigned) time (NULL));


 

    zmq::context_t context (1);

    zmq::socket_t * worker = s_worker_socket (context);


 

    //  If liveness hits zero, queue is considered disconnected

    size_t liveness = HEARTBEAT_LIVENESS;

    size_t interval = INTERVAL_INIT;


 

    //  Send out heartbeats at regular intervals

    int64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;


 

    int cycles = 0;

    while (1) {

        zmq::pollitem_t items [] = { { *worker,  0, ZMQ_POLLIN, 0 } };

        zmq::poll (items, 1, HEARTBEAT_INTERVAL);//HEARTBEAT_INTERVAL * 1000


 

        if (items [0].revents & ZMQ_POLLIN) {

            //  Get message

            //  - 3-part envelope + content -> request

            //  - 1-part "HEARTBEAT" -> heartbeat

            zmsg msg (*worker);


 

            if (msg.parts () == 3) {

                //  Simulate various problems, after a few cycles

                cycles++;

                if (cycles > 3 && within (5) == 0) {

                    std::cout << "I: worker (" << identity << ") simulating
a crash" << std::endl;

                    msg.clear ();

                    break;

                }

                else {

                   if (cycles > 3 && within (5) == 0) {

                      std::cout << "I: worker (" << identity << ")
simulating CPU overload" << std::endl;

                       sleep (5);

                   }

                }

                std::cout << "I: worker (" << identity << ") received normal
reply - " << msg.body() << std::endl;

                msg.send (*worker);

                liveness = HEARTBEAT_LIVENESS;

                sleep (1);              //  Do some heavy work

            }

            else {

               if (msg.parts () == 1

               && strcmp (msg.body (), "HEARTBEAT") == 0) {

                   liveness = HEARTBEAT_LIVENESS;

               }

               else {

                   std::cout << "E: worker (" << identity << ") recieved
invalid message" << std::endl;

                   msg.dump ();

               }

            }

            interval = INTERVAL_INIT;

        }

        else

        if (--liveness == 0) {

            std::cout << "W: worker (" << identity << ") heartbeat failure,
can't reach queue" << std::endl;

            std::cout << "W: worker (" << identity << ") reconnecting in "
<< interval << " msec..." << std::endl;

            s_sleep (interval);


 

            if (interval < INTERVAL_MAX) {

                interval *= 2;

            }

            delete worker;

            worker = s_worker_socket (context);

            liveness = HEARTBEAT_LIVENESS;

        }


 

        //  Send heartbeat to queue if it's time

        if (s_clock () > heartbeat_at) {

            heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;

            std::cout << "I: worker (" << identity << ") sends heartbeat" <<
std::endl;

            s_send (*worker, "HEARTBEAT");

        }

    }

    delete worker;

    return 0;

}


 

//

//  Lazy Pirate client

//  Use zmq_poll to do a safe request-reply

//  To run, start piserver and then randomly kill/restart it

//

#include "zhelpers.hpp"


 

#include <sstream>


 

#define REQUEST_TIMEOUT     2500    //  msecs, (> 1000!)

#define REQUEST_RETRIES     3       //  Before we abandon


 

//  Helper function that returns a new configured socket

//  connected to the Hello World server

//

static zmq::socket_t * s_client_socket (zmq::context_t & context)

{

std::cout << "I: client connecting to server..." << std::endl;

zmq::socket_t * client = new zmq::socket_t (context, ZMQ_REQ);

client->connect ("tcp://localhost:5555");


 

//  Configure socket to not wait at close time

int linger = 0;

client->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));

return client;

}


 

int main ()

{

zmq::context_t context (1);


 

zmq::socket_t * client = s_client_socket (context);


 

int sequence = 0;

int retries_left = REQUEST_RETRIES;


 

while (retries_left)

{

std::stringstream request;

request << ++sequence;

s_send (*client, request.str());

sleep (1);


 

bool expect_reply = true;

while (expect_reply)

{

//  Poll socket for a reply, with timeout

zmq::pollitem_t items[] = { { *client, 0, ZMQ_POLLIN, 0 } };

zmq::poll (&items[0], 1, REQUEST_TIMEOUT);//corrected this, was
REQUEST_TIMEOUT * 1000


 

//  If we got a reply, process it

if (items[0].revents & ZMQ_POLLIN)

{

//  We got a reply from the server, must match sequence

std::string reply = s_recv (*client);

if (atoi (reply.c_str ()) == sequence)

{

std::cout << "I: client received: (" << reply << ") in reply from server" <<
std::endl;

retries_left = REQUEST_RETRIES;

expect_reply = false;

}

else

{

std::cout << "E: client received malformed reply from server: " << reply <<
std::endl;

}

}

else if (--retries_left == 0)

{

std::cout << "E: client reports; server seems to be offline, abandoning" <<
std::endl;

expect_reply = false;

break;

}

else

{

std::cout << "W: client reports; no response from server, retrying..." <<
std::endl;

//  Old socket will be confused; close it and open a new one

delete client;

client = s_client_socket (context);

//  Send request again, on new socket

s_send (*client, request.str());

}

}

}

delete client;

return 0;

}

 

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20140704/71039531/attachment.htm>


More information about the zeromq-dev mailing list