2010/6/9 Martin Sustrik <span dir="ltr"><<a href="mailto:sustrik@250bpm.com" target="_blank">sustrik@250bpm.com</a>></span><br><div class="gmail_quote"><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">


Marcin,<br>
<div><div></div><div><br>> The typical architecture would like this:<br>
><br>
> C1 \              /W1<br>
> C2  ---- queue --- W2<br>
> ..                 ..<br>
> CN /              \WM<br>
><br>
> Queue handles workers registration, load-balancing and retrying. What is<br>
> the best way to implement it using 0MQ?<br>
<br>
</div></div>Have a look here:<br>
<br>
<a href="http://www.zeromq.org/blog:multithreaded-server" target="_blank">http://www.zeromq.org/blog:multithreaded-server</a><br>
<br>
You want basically the same thing, the only difference being that the<br>
queue should be standalone application (zmq_queue) rather than<br>
in-process component.<br></blockquote><div><br></div><div>Martin, thanks for the quick answer. I have tried it and see some deterministic crashes. I don't know if this is a problem with ZeroMQ 2.0.7 I have used or my code.</div>


<div><br></div><div>I am launching in one process several client threads:</div><div><br></div><div><div>void client_thread (zmq::context_t *ctx)</div><div>{</div><div>  //  This client is a requester.</div><div>  zmq::socket_t s (*ctx, ZMQ_REQ);</div>


<div><br></div><div>  //  Connect to the server.</div><div>  s.connect ("ipc://queue");</div><div><br></div><div>  while (!stop_clients) {</div><div>    //  Send the request. No point in filling the content in as server</div>


<div>    //  is a dummy and won't use it anyway.</div><div>    zmq::message_t request (req_size);</div><div>    memset (request.data (), 0, request.size ());</div><div>    s.send (request);</div><div><br></div><div>    //  Get the reply. </div>


<div>    zmq::message_t reply;</div><div>    s.recv (&reply);</div><div>  }</div><div>}</div></div><div><br></div><div>In other process I am launching the queue:</div><div><br></div><div><div>    zmq::context_t ctx (thread_count);</div>


<div><br></div><div>    //  Create an endpoint for worker threads to connect to.</div><div>    //  We are using XREQ socket so that processing of one request</div><div>    //  won't block other requests.</div><div>    zmq::socket_t workers (ctx, ZMQ_XREQ);</div>


<div>    string bind_addr = string("tcp://")+interface+string(":")+boost::lexical_cast<string>(port);</div><div>    workers.bind (bind_addr.c_str());</div><div><br></div><div>    //  Create an endpoint for client applications to connect to.</div>


<div>    //  We are usign XREP socket so that processing of one request</div><div>    //  won't block other requests.</div><div>    zmq::socket_t clients (ctx, ZMQ_XREP);</div><div>    clients.bind ("ipc://queue");</div>


<div><br></div><div>    //  Use queue device as a dispatcher of messages from clients to worker</div><div>    //  threads.</div><div>    zmq::device (ZMQ_QUEUE, clients, workers);</div></div><div><br></div><div>And in another process I am launching the workers:</div>


<div><br></div><div><div>void worker_thread (zmq::context_t *ctx)</div><div>{</div><div>  zmq::socket_t s (*ctx, ZMQ_REP);</div><div><br></div><div>  s.connect ("inproc://workers");</div><div><br></div><div>  pt::time_duration td = pt::microsec(1000000/reqs_sec);</div>


<div><br></div><div>  while (!stop_workers) {</div><div><br></div><div>    //  Get a request from the dispatcher.</div><div>    zmq::message_t request;</div><div>    s.recv (&request);</div><div><br></div><div>    //  Our server does no real processing. So let's sleep for a while</div>


<div>    //  to simulate actual processing.</div><div>    this_thread::sleep (td);</div><div><br></div><div>    //  Send the reply. No point in filling the data in as the client</div><div>    //  is a dummy and won't check it anyway.</div>


<div>    zmq::message_t reply (resp_size);</div><div>    memset (reply.data (), 0, reply.size ());</div><div>    s.send (reply);</div><div>  }</div><div>}</div></div><div><br></div><div>and a queue from TCP socket to inproc workers:</div>


<div><br></div><div><div>    zmq::context_t ctx (thread_count);</div><div><br></div><div>    //  Create an endpoint for worker threads to connect to.</div><div>    //  We are using XREQ socket so that processing of one request</div>


<div>    //  won't block other requests.</div><div>    zmq::socket_t workers (ctx, ZMQ_XREQ);</div><div>    workers.bind ("inproc://workers");</div><div><br></div><div>    //  Connect to queue</div><div>    //  We are usign XREP socket so that processing of one request</div>


<div>    //  won't block other requests.</div><div>    zmq::socket_t clients (ctx, ZMQ_XREP);</div><div>    string connect_addr = string("tcp://")+host+string(":")+boost::lexical_cast<string>(port);</div>


<div>    clients.connect (connect_addr.c_str());</div><div><br></div><div>    //  Use queue device as a dispatcher of messages from clients to worker</div><div>    //  threads.</div><div>    zmq::device (ZMQ_QUEUE, clients, workers);</div>


</div><div><br></div><div>After running queue, worker and client I get a SIGSEGV in client:</div><div><br></div><div><div>Program received signal SIGSEGV, Segmentation fault.</div><div>[Switching to Thread 0x7ffff5c49710 (LWP 9812)]</div>

<div>zmq::zmq_engine_t::in_event (this=0x688e70) at zmq_engine.cpp:123</div><div>123         inout->flush ();</div><div>(gdb) bt</div><div>#0  zmq::zmq_engine_t::in_event (this=0x688e70) at zmq_engine.cpp:123</div><div>

#1  0x00007ffff7bba195 in zmq::epoll_t::loop (this=0x66a4e0) at epoll.cpp:197</div><div>#2  0x00007ffff7bc9a67 in zmq::thread_t::thread_routine (arg_=0x66a520) at thread.cpp:99</div><div>#3  0x00007ffff79879ca in start_thread (arg=<value optimized out>) at pthread_create.c:300</div>

<div>#4  0x00007ffff6f376cd in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:112</div><div>#5  0x0000000000000000 in ?? ()</div></div><div><br></div><div>The sources are available at <a href="http://gozdal.com/zeromq.tar.bz2">http://gozdal.com/zeromq.tar.bz2</a> (together with CMakeLists.txt for building them - you need boost to build them).</div>

<div><br></div></div>-- <br>Marcin Gozdalik<br>