[zeromq-dev] Paranoid Pirate Pattern
Riskybiz
riskybizlive at live.com
Fri Jul 4 17:51:29 CEST 2014
Dear zeromq list,
I cannot get the Paranoid Pirate Pattern example to function fully on either
Windows 7 or Debian (Wheezy). I have compiled the example code in both
environments, making a few modifications to delay intervals to fix extended
waiting periods and adding print statements to track whatever is going on.
I nearly have the Paranoid Pirate Pattern working on Debian with
zeromq-4.0.4 but for a fundamental problem, lpclient never connects to
ppqueue. It tries to connect repeatedly and ultimately fails. Meanwhile
(as far as I can tell) ppqueue and ppworker(s) are happily communicating
and heartbeating with one another.
The lpclient program is reused from the 'Lazy Pirate' example. My lpclient
example functions as expected with lpserver.
However when the same lpclient is used in conjunction with ppqueue and
ppworker to attempt the Paranoid Pirate Pattern then the zeromq network
connection between lpclient and ppqueue is not made/working.
I'd like to study the fully functioning example so that I can work out how
to integrate it into a real project!
Please, does anyone know how to get the Paranoid Pirate Pattern example
working?
Thanks,
Riskybiz.
Included below are the slightly modified .cpp files from my Debian attempts
at ppqueue, ppworker and lpclient.
//
// Paranoid Pirate queue
//
// Andreas Hoelzlwimmer
<mailto:%3candreas.hoelzlwimmer at fh-hagenberg.at>
<andreas.hoelzlwimmer at fh-hagenberg.at>
//
#include "zmsg.hpp"
#include <stdint.h>
#include <vector>
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs
// This defines one active worker in our worker queue
typedef struct {
std::string identity; // Address of worker
int64_t expiry; // Expires at this time
} worker_t;
// Insert worker at end of queue, reset expiry
// Worker must not already be in queue
static void
s_worker_append (std::vector<worker_t> &queue, std::string &identity)
{
bool found = false;
for (std::vector<worker_t>::iterator it = queue.begin(); it <
queue.end(); it++) {
if (it->identity.compare(identity) == 0) {
std::cout << "E: queue has duplicate worker identity " <<
identity.c_str() << std::endl;
found = true;
break;
}
}
if (!found) {
worker_t worker;
worker.identity = identity;
worker.expiry = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
queue.push_back(worker);
std::cout << "I: queue appends worker (" << identity << ")" << std::endl;
}
}
// Remove worker from queue, if present
static void
s_worker_delete (std::vector<worker_t> &queue, std::string &identity)
{
for (std::vector<worker_t>::iterator it = queue.begin(); it <
queue.end(); it++) {
if (it->identity.compare(identity) == 0) {
it = queue.erase(it);
std::cout << "I: queue deletes worker (" << identity << ")" << std::endl;
break;
}
}
}
// Reset worker expiry, worker must be present
static void
s_worker_refresh (std::vector<worker_t> &queue, std::string &identity)
{
bool found = false;
for (std::vector<worker_t>::iterator it = queue.begin(); it <
queue.end(); it++) {
if (it->identity.compare(identity) == 0) {
it->expiry = s_clock ()
+ HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
found = true;
std::cout << "I: queue refreshes worker (" << identity << ")" <<
std::endl;
break;
}
}
if (!found) {
std::cout << "E: queue reports worker " << identity << " not ready"
<< std::endl;
}
}
// Pop next available worker off queue, return identity
static std::string
s_worker_dequeue (std::vector<worker_t> &queue)
{
assert (queue.size());
std::string identity = queue[0].identity;
queue.erase(queue.begin());
std::cout << "I: queue accesses next available worker (" << identity << ")"
<< std::endl;
return identity;
}
// Look for & kill expired workers
static void
s_queue_purge (std::vector<worker_t> &queue)
{
int64_t clock = s_clock();
for (std::vector<worker_t>::iterator it = queue.begin(); it <
queue.end(); it++) {
if (clock > it->expiry) {
it = queue.erase(it)-1;
std::cout << "I: queue purges worker (" << it->identity << ")" <<
std::endl;
}
}
}
int main (void)
{
s_version_assert (2, 1);
// Prepare our context and sockets
zmq::context_t context(1);
zmq::socket_t frontend(context, ZMQ_ROUTER);
zmq::socket_t backend (context, ZMQ_ROUTER);
frontend.bind("tcp://*:5555"); // For clients
backend.bind ("tcp://*:5556"); // For workers
// Queue of available workers
std::vector<worker_t> queue;
// Send out heartbeats at regular intervals
int64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
while (1) {
zmq::pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
if (queue.size()) {
zmq::poll (items, 2, HEARTBEAT_INTERVAL);//HEARTBEAT_INTERVAL *
1000
} else {
zmq::poll (items, 1, HEARTBEAT_INTERVAL);//HEARTBEAT_INTERVAL *
1000
}
// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
zmsg msg (backend);
std::string identity(msg.unwrap ());
// Return reply to client if it's not a control message
if (msg.parts () == 1) {
if (strcmp (msg.address (), "READY") == 0) {
s_worker_delete (queue, identity);
s_worker_append (queue, identity);
}
else {
if (strcmp (msg.address (), "HEARTBEAT") == 0) {
s_worker_refresh (queue, identity);
} else {
std::cout << "E: queue recieved invalid message from
" << identity << std::endl;
msg.dump ();
}
}
}
else {
msg.send (frontend);
std::cout << "I: queue sends message via frontend (" << identity << ")" <<
std::endl;
s_worker_append (queue, identity);
}
}
if (items [1].revents & ZMQ_POLLIN) {
// Now get next client request, route to next worker
zmsg msg (frontend);
std::string identity = std::string(s_worker_dequeue (queue));
msg.push_front((char*)identity.c_str());
msg.send (backend);
}
// Send heartbeats to idle workers if it's time
if (s_clock () > heartbeat_at) {
for (std::vector<worker_t>::iterator it = queue.begin(); it <
queue.end(); it++) {
zmsg msg ("HEARTBEAT");
msg.wrap (it->identity.c_str(), NULL);
msg.send (backend);
std::cout << "I: queue sends heartbeat to worker (" << it->identity << ")
via backend" << std::endl;
}
heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
}
s_queue_purge(queue);
}
// We never exit the main loop
// But pretend to do the right shutdown anyhow
queue.clear();
return 0;
}
//
// Paranoid Pirate worker
//
//
// Andreas Hoelzlwimmer
<mailto:%3candreas.hoelzlwimmer at fh-hagenberg.at>
<andreas.hoelzlwimmer at fh-hagenberg.at>
//
#include "zmsg.hpp"
#include <iomanip>
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs
#define INTERVAL_INIT 1000 // Initial reconnect
#define INTERVAL_MAX 32000 // After exponential backoff
// Helper function that returns a new configured socket
// connected to the Hello World server
//
std::string identity;
static zmq::socket_t *
s_worker_socket (zmq::context_t &context) {
zmq::socket_t * worker = new zmq::socket_t(context, ZMQ_DEALER);
// Set random identity to make tracing easier
identity = s_set_id(*worker);
worker->connect ("tcp://localhost:5556");
// Configure socket to not wait at close time
int linger = 0;
worker->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
// Tell queue we're ready for work
std::cout << "I: (" << identity << ") worker ready" << std::endl;
s_send (*worker, "READY");
return worker;
}
int main (void)
{
s_version_assert (2, 1);
srandom ((unsigned) time (NULL));
zmq::context_t context (1);
zmq::socket_t * worker = s_worker_socket (context);
// If liveness hits zero, queue is considered disconnected
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;
// Send out heartbeats at regular intervals
int64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
int cycles = 0;
while (1) {
zmq::pollitem_t items [] = { { *worker, 0, ZMQ_POLLIN, 0 } };
zmq::poll (items, 1, HEARTBEAT_INTERVAL);//HEARTBEAT_INTERVAL * 1000
if (items [0].revents & ZMQ_POLLIN) {
// Get message
// - 3-part envelope + content -> request
// - 1-part "HEARTBEAT" -> heartbeat
zmsg msg (*worker);
if (msg.parts () == 3) {
// Simulate various problems, after a few cycles
cycles++;
if (cycles > 3 && within (5) == 0) {
std::cout << "I: worker (" << identity << ") simulating
a crash" << std::endl;
msg.clear ();
break;
}
else {
if (cycles > 3 && within (5) == 0) {
std::cout << "I: worker (" << identity << ")
simulating CPU overload" << std::endl;
sleep (5);
}
}
std::cout << "I: worker (" << identity << ") received normal
reply - " << msg.body() << std::endl;
msg.send (*worker);
liveness = HEARTBEAT_LIVENESS;
sleep (1); // Do some heavy work
}
else {
if (msg.parts () == 1
&& strcmp (msg.body (), "HEARTBEAT") == 0) {
liveness = HEARTBEAT_LIVENESS;
}
else {
std::cout << "E: worker (" << identity << ") recieved
invalid message" << std::endl;
msg.dump ();
}
}
interval = INTERVAL_INIT;
}
else
if (--liveness == 0) {
std::cout << "W: worker (" << identity << ") heartbeat failure,
can't reach queue" << std::endl;
std::cout << "W: worker (" << identity << ") reconnecting in "
<< interval << " msec..." << std::endl;
s_sleep (interval);
if (interval < INTERVAL_MAX) {
interval *= 2;
}
delete worker;
worker = s_worker_socket (context);
liveness = HEARTBEAT_LIVENESS;
}
// Send heartbeat to queue if it's time
if (s_clock () > heartbeat_at) {
heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
std::cout << "I: worker (" << identity << ") sends heartbeat" <<
std::endl;
s_send (*worker, "HEARTBEAT");
}
}
delete worker;
return 0;
}
//
// Lazy Pirate client
// Use zmq_poll to do a safe request-reply
// To run, start piserver and then randomly kill/restart it
//
#include "zhelpers.hpp"
#include <sstream>
#define REQUEST_TIMEOUT 2500 // msecs, (> 1000!)
#define REQUEST_RETRIES 3 // Before we abandon
// Helper function that returns a new configured socket
// connected to the Hello World server
//
static zmq::socket_t * s_client_socket (zmq::context_t & context)
{
std::cout << "I: client connecting to server..." << std::endl;
zmq::socket_t * client = new zmq::socket_t (context, ZMQ_REQ);
client->connect ("tcp://localhost:5555");
// Configure socket to not wait at close time
int linger = 0;
client->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
return client;
}
int main ()
{
zmq::context_t context (1);
zmq::socket_t * client = s_client_socket (context);
int sequence = 0;
int retries_left = REQUEST_RETRIES;
while (retries_left)
{
std::stringstream request;
request << ++sequence;
s_send (*client, request.str());
sleep (1);
bool expect_reply = true;
while (expect_reply)
{
// Poll socket for a reply, with timeout
zmq::pollitem_t items[] = { { *client, 0, ZMQ_POLLIN, 0 } };
zmq::poll (&items[0], 1, REQUEST_TIMEOUT);//corrected this, was
REQUEST_TIMEOUT * 1000
// If we got a reply, process it
if (items[0].revents & ZMQ_POLLIN)
{
// We got a reply from the server, must match sequence
std::string reply = s_recv (*client);
if (atoi (reply.c_str ()) == sequence)
{
std::cout << "I: client received: (" << reply << ") in reply from server" <<
std::endl;
retries_left = REQUEST_RETRIES;
expect_reply = false;
}
else
{
std::cout << "E: client received malformed reply from server: " << reply <<
std::endl;
}
}
else if (--retries_left == 0)
{
std::cout << "E: client reports; server seems to be offline, abandoning" <<
std::endl;
expect_reply = false;
break;
}
else
{
std::cout << "W: client reports; no response from server, retrying..." <<
std::endl;
// Old socket will be confused; close it and open a new one
delete client;
client = s_client_socket (context);
// Send request again, on new socket
s_send (*client, request.str());
}
}
}
delete client;
return 0;
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20140704/71039531/attachment.htm>
More information about the zeromq-dev
mailing list