[zeromq-dev] Recv() picking up identity delimiter (empty msg)
Matt Weinstein
mattweinstein at gmail.com
Tue Jun 15 01:42:37 CEST 2010
Folks -
This code is based on the trivial client server example with the
client converted to internal threads, using inproc instead of TCP,
running on 2.0.7.
Here's the output (Leopard/Xcode 3.1.2):
[Session started at 2010-06-14 18:31:48 -0400.]
request size 0
terminate called after throwing an instance of 'zmq::error_t'
what(): Operation cannot be accomplished in current state
When using inproc: on the client side (the REQ-XREP pair), the
identity delimiter is not being stripped off at the REP side. This
only happens when you use inproc, the code works fine when the client
is running over TCP (undefine BROKEN :-) )
I'd been seeing the same behavior in the app I'm developing.
Any thoughts? Am I wrong or missing something?
Thanks!
Best,
Matt
PS The trivial client server example needs to be edited to the latest
API interface, e.g. removing user thread count from zmq::context.
#include <assert.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <zmq.hpp>
#include <iostream>
using namespace std;
#define BROKEN
void* internal_client(void *arg)
{
zmq::context_t *ctx = (zmq::context_t*) arg;
// This client is a requester.
zmq::socket_t s (*ctx, ZMQ_REQ);
// Connect to the server.
#ifdef BROKEN
s.connect ("inproc://clients");
#else
s.connect ("tcp://localhost:5555");
#endif BROKEN
// Send 20 requests and receive 20 replies.
for (int i = 0; i != 20; i++) {
// Send the request. No point in filling the content in as
server
// is a dummy and won't use it anyway.
zmq::message_t request (10);
memset (request.data (), 0, request.size ());
s.send (request);
// Get the reply.
zmq::message_t reply;
s.recv (&reply);
}
return 0;
}
void *worker_routine (void *arg)
{
// This is the body of the worker thread(s).
zmq::context_t *ctx = (zmq::context_t*) arg;
// Worker thread is a 'replier', i.e. it receives requests and
returns
// replies.
zmq::socket_t s (*ctx, ZMQ_REP);
// Connect to the dispatcher (queue) running in the main thread.
s.connect ("inproc://workers");
while (true) {
// Get a request from the dispatcher.
zmq::message_t request;
s.recv (&request);
cout << "request size " << request.size() << endl;
// Our server does no real processing. So let's sleep for a
while
// to simulate actual processing.
sleep (1);
// Send the reply. No point in filling the data in as the
client
// is a dummy and won't check it anyway.
zmq::message_t reply (10);
memset (reply.data (), 0, reply.size ());
s.send (reply);
}
}
int main ()
{
// One I/O thread.
zmq::context_t ctx (1);
// Create an endpoint for worker threads to connect to.
// We are using XREQ socket so that processing of one request
// won't block other requests.
zmq::socket_t workers (ctx, ZMQ_XREQ);
workers.bind ("inproc://workers");
// Create an endpoint for client applications to connect to.
// We are usign XREP socket so that processing of one request
// won't block other requests.
zmq::socket_t clients (ctx, ZMQ_XREP);
#ifdef BROKEN
clients.bind ("inproc://clients");
#else
clients.bind ("tcp://lo0:5555");
#endif BROKEN
// Launch worker threads.
for (int i = 0; i != 1; i++) {
pthread_t worker;
int rc = pthread_create (&worker, NULL, worker_routine,
(void*) &ctx);
assert (rc == 0);
}
// Launch client threads.
for (int i = 0; i != 1; i++) {
pthread_t worker;
int rc = pthread_create (&worker, NULL, internal_client,
(void*) &ctx);
assert (rc == 0);
}
// Use queue device as a dispatcher of messages from clients to
worker
// threads.
zmq::device (ZMQ_QUEUE, clients, workers);
return 0;
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.zeromq.org/pipermail/zeromq-dev/attachments/20100614/d8520df8/attachment-0001.htm
More information about the zeromq-dev
mailing list