[zeromq-dev] inproc: If first connect occurs before bind, then retries don't seem to succeed...

Andy Ballingall TF ballingall at thefoundry.co.uk
Wed Dec 19 12:40:35 CET 2012


Hi,

I've been following the discussions about needing to call
zsocket_bind() before zsocket_connect(). In one discussion, it was
suggested that in an asynchronous system where you're not able to
guarantee order, you could retry the connect until it succeeds:

(See http://grokbase.com/p/zeromq/zeromq-dev/12aj8bn29c/inproc-need-to-bind-to-an-address-before-connect
)

However, I've not been able to get this to work in my use case, a
simplified version of which is attached below. Briefly, the idea is a
logging thread which coordinates the logging of messages sent from
other threads (one of which may be the logging thread's parent... is
this an issue?). The only data shared between threads is the czmq
context.

In the example below, the parent first spawns the logger thread, then
a worker thread. Then both the worker thread and the parent try to
connect to the logger thread to send a number of messages.

If I introduce a small pause (e.g. 1 millisecond or more) *after*
starting the logger thread but before connecting, then everything
seems to work. If there is no pause, then not only does the initial
connect never work, but even after trying hundreds of times over a
number of seconds, connection still fails (CONNECTION REFUSED).

I'd prefer to use this retry connection approach if possible. Should
it work? Or am I doing something else dumb?

The small program below demonstrates the problem reliably on my 4 core
centos 6 VM (running on windows 7 via VMware). It runs with one
argument - the number of milliseconds to wait after creating the
logger thread. Set this value to 0 to see the problem, or a value
greater than 1 to see it working.

When working, messages from the parent and worker threads should be
atomically echoed to stdout. When not working, neither thread manages
to connect.

Thanks!
Andy

//START OF bindwait.c
////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// This test creates a logger thread which listens via an inproc
'PULL' socket for
// messages to output, and then creates a worker thread. The worker thread and
// the parent then both attempt to send messages to the logger thread using
// a 'PUSH' connection.
//
// COMPILING:
// (assumes zmq and czmq have been installed)
// gcc -o bindwait -I/usr/local/include/ -lzmq -lczmq  bindwait.c
//
// RUN TO SHOW PROBLEM:
//
// bindwait 0
//
// RUN WITHOUT PROBLEM - by specifying a period in milliseconds to wait
// after starting the logger thread:
//
// bindwait 200


#include "czmq.h"
#include <stdio.h>

// Open a connection to the logger thread and send it a message a
number of times
static void
send_log_messages( zctx_t* ctx, const char* message, int num_sends,
int ms_between_sends)
{
  int num_tries = 500;
  int sleep_ms = 10;
  int rc,tries;

  void* src_socket = zsocket_new(ctx, ZMQ_PUSH);
  assert(src_socket);

  // zsocket_connect() will fail if the corresponding zsocket_bind() in the
  // logger thread hasn't completed, so try a number of times until it succeeds:
  for(tries = 1; tries <= num_tries; tries++) {
    rc = zsocket_connect(src_socket, "inproc://PushPull");

    if (rc == 0)
      break;
    else
      zclock_sleep(sleep_ms);
  }

  if (rc != 0) {
    printf("Child failed to connect after %d tries, error = %d - %s\n",
           tries, zmq_errno(), zmq_strerror(zmq_errno()) );
  }
  else {
    // Send some messages to the logger thread!
    if (src_socket) {

      int m;
      for (m = 0; m < num_sends; m++) {
        int rc = zstr_send (src_socket, message);
        assert(rc == 0);
        zclock_sleep(ms_between_sends);
      }
    }
  }
  zsocket_destroy (ctx, src_socket);
}

// writes a log message which has arrived
static int
output_log_message(zloop_t* reactor, zmq_pollitem_t* log_poller, void*
dst_socket)
{
  assert(dst_socket);
  char* message = zstr_recv_nowait(dst_socket);
  printf("Message received: %s \n", message);
  free(message);
  return 0;
}


// listens for log messages arriving
static void*
logger_thread(void* arg)
{
  zctx_t* ctx = ( zctx_t* )arg;

  void* dst_socket = zsocket_new(ctx, ZMQ_PULL);
  assert(dst_socket);
  int rc = zsocket_bind(dst_socket, "inproc://PushPull");
  assert(rc == 0);

  zloop_t* reactor = zloop_new();
  assert(reactor);

  zmq_pollitem_t log_poller = { dst_socket, 0, ZMQ_POLLIN };

  rc = zloop_poller(reactor, &log_poller, output_log_message, dst_socket);
  assert(rc == 0);
  zloop_start(reactor);

  printf("Logger Thread Exiting...\n");
  return 0;
}


// A thread which generates log messages
static void*
worker_thread(void* arg)
{
  zctx_t* ctx = (zctx_t*)arg;

  send_log_messages( ctx, "Worker!", 20, 10);

  printf("Worker Thread Exiting...\n");
  return 0;
}



int main(int argc, char* argv[])
{
  int thread_wait_ms;
  if ( argc != 2 ) {
    printf("Usage: %s <milliseconds to sleep after starting logger
thread>\n",  argv[0]);
    exit(-1);
  }
  else {
    sscanf( argv[1], "%d", &thread_wait_ms );
  }



  // Create a context which will be shared by all threads
  zctx_t* ctx = zctx_new();
  assert(ctx);

  // Create a thread which will listen for log messages to output
  int rc = zthread_new(logger_thread, (void*)ctx);
  assert(rc == 0);

  if (thread_wait_ms > 0) {
    printf("Waiting for %d milliseconds after logger thread
creation...\n", thread_wait_ms);
    zclock_sleep(thread_wait_ms);
  }

  // Create a thread which will generate some log messages to be output
  rc = zthread_new(worker_thread, (void*)ctx);
  assert(rc == 0);

  // We're going to send log messages from the parent too:
  send_log_messages( ctx, "Parent!", 20, 10);

  // Wait a bit before exiting:
  zclock_sleep(1000);
  zctx_destroy (&ctx);
}

//END OF bindwait.c
////////////////////////////////////////////////////////////////////////////////////////////////////////////////





--
Andy Ballingall
Senior Software Engineer

The Foundry
6th Floor, The Communications Building,
48, Leicester Square,
London, WC2H 7LT, UK
Tel: +44 (0)20 7968 6828 - Fax: +44 (0)20 7930 8906
Web: http://www.thefoundry.co.uk/

The Foundry Visionmongers Ltd.
Registered in England and Wales No: 4642027



More information about the zeromq-dev mailing list