[zeromq-dev] 0MQ programming models

Marcin Gozdalik gozdal at gmail.com
Thu Jun 10 18:31:56 CEST 2010


2010/6/9 Martin Sustrik <sustrik at 250bpm.com>

> Marcin,
>
> > The typical architecture would like this:
> >
> > C1 \              /W1
> > C2  ---- queue --- W2
> > ..                 ..
> > CN /              \WM
> >
> > Queue handles workers registration, load-balancing and retrying. What is
> > the best way to implement it using 0MQ?
>
> Have a look here:
>
> http://www.zeromq.org/blog:multithreaded-server
>
> You want basically the same thing, the only difference being that the
> queue should be standalone application (zmq_queue) rather than
> in-process component.
>

Martin, thanks for the quick answer. I have tried it and see some
deterministic crashes. I don't know if this is a problem with ZeroMQ 2.0.7 I
have used or my code.

I am launching in one process several client threads:

void client_thread (zmq::context_t *ctx)
{
  //  This client is a requester.
  zmq::socket_t s (*ctx, ZMQ_REQ);

  //  Connect to the server.
  s.connect ("ipc://queue");

  while (!stop_clients) {
    //  Send the request. No point in filling the content in as server
    //  is a dummy and won't use it anyway.
    zmq::message_t request (req_size);
    memset (request.data (), 0, request.size ());
    s.send (request);

    //  Get the reply.
    zmq::message_t reply;
    s.recv (&reply);
  }
}

In other process I am launching the queue:

    zmq::context_t ctx (thread_count);

    //  Create an endpoint for worker threads to connect to.
    //  We are using XREQ socket so that processing of one request
    //  won't block other requests.
    zmq::socket_t workers (ctx, ZMQ_XREQ);
    string bind_addr =
string("tcp://")+interface+string(":")+boost::lexical_cast<string>(port);
    workers.bind (bind_addr.c_str());

    //  Create an endpoint for client applications to connect to.
    //  We are usign XREP socket so that processing of one request
    //  won't block other requests.
    zmq::socket_t clients (ctx, ZMQ_XREP);
    clients.bind ("ipc://queue");

    //  Use queue device as a dispatcher of messages from clients to worker
    //  threads.
    zmq::device (ZMQ_QUEUE, clients, workers);

And in another process I am launching the workers:

void worker_thread (zmq::context_t *ctx)
{
  zmq::socket_t s (*ctx, ZMQ_REP);

  s.connect ("inproc://workers");

  pt::time_duration td = pt::microsec(1000000/reqs_sec);

  while (!stop_workers) {

    //  Get a request from the dispatcher.
    zmq::message_t request;
    s.recv (&request);

    //  Our server does no real processing. So let's sleep for a while
    //  to simulate actual processing.
    this_thread::sleep (td);

    //  Send the reply. No point in filling the data in as the client
    //  is a dummy and won't check it anyway.
    zmq::message_t reply (resp_size);
    memset (reply.data (), 0, reply.size ());
    s.send (reply);
  }
}

and a queue from TCP socket to inproc workers:

    zmq::context_t ctx (thread_count);

    //  Create an endpoint for worker threads to connect to.
    //  We are using XREQ socket so that processing of one request
    //  won't block other requests.
    zmq::socket_t workers (ctx, ZMQ_XREQ);
    workers.bind ("inproc://workers");

    //  Connect to queue
    //  We are usign XREP socket so that processing of one request
    //  won't block other requests.
    zmq::socket_t clients (ctx, ZMQ_XREP);
    string connect_addr =
string("tcp://")+host+string(":")+boost::lexical_cast<string>(port);
    clients.connect (connect_addr.c_str());

    //  Use queue device as a dispatcher of messages from clients to worker
    //  threads.
    zmq::device (ZMQ_QUEUE, clients, workers);

After running queue, worker and client I get a SIGSEGV in client:

Program received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7ffff5c49710 (LWP 9812)]
zmq::zmq_engine_t::in_event (this=0x688e70) at zmq_engine.cpp:123
123         inout->flush ();
(gdb) bt
#0  zmq::zmq_engine_t::in_event (this=0x688e70) at zmq_engine.cpp:123
#1  0x00007ffff7bba195 in zmq::epoll_t::loop (this=0x66a4e0) at
epoll.cpp:197
#2  0x00007ffff7bc9a67 in zmq::thread_t::thread_routine (arg_=0x66a520) at
thread.cpp:99
#3  0x00007ffff79879ca in start_thread (arg=<value optimized out>) at
pthread_create.c:300
#4  0x00007ffff6f376cd in clone () at
../sysdeps/unix/sysv/linux/x86_64/clone.S:112
#5  0x0000000000000000 in ?? ()

The sources are available at http://gozdal.com/zeromq.tar.bz2 (together with
CMakeLists.txt for building them - you need boost to build them).

-- 
Marcin Gozdalik
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20100610/7938b084/attachment.html>


More information about the zeromq-dev mailing list