[zeromq-dev] vanished messages (hoping, this will be the last help request ; o)

sven.koebnick at t-online.de sven.koebnick at t-online.de
Wed Mar 18 14:47:49 CET 2015


 

Hi again ! 

I worked on some demontrating testcase and build up the
router code below (see end of this mail). 

Please excuse the ugly look
of the code, it was typed without planing and worsened during test ;o)


It is doing the following: 

 	* create a router (binding 2 ZMQ_ROUTER
sockets on TCP)
 	* create 20 workers (connecting with ZMQ_REQ to one of
the router sockets)
 	* create 30 clients (connecting with ZMQ_REQ to
the other router socket)

Each client will fire 300 (numbered) messages
to the router, expecting the number to be sent back as answer. Each
message has 2008 byte. 

The router has a list of connected workers and
does its round robin the spread the requests to the workers. 

The code
gives the following output: 

sven at Micro-PC:~/workspace/ZMQRouterTest>
Debug/ZMQRouterTest 
starting Router ...
creating 20 workers
...
creating 30 clients ...
starting all clients ...
polling in client
12 for answer 73 returned after 30 secs with zmq_strerror()=Resource
temporarily unavailable
polling in client 22 for answer 110 returned
after 30 secs with zmq_strerror()=Resource temporarily
unavailable
polling in client 29 for answer 119 returned after 30 secs
with zmq_strerror()=Resource temporarily unavailable
polling in client 9
for answer 118 returned after 30 secs with zmq_strerror()=Resource
temporarily unavailable
polling in client 13 for answer 249 returned
after 30 secs with zmq_strerror()=Resource temporarily
unavailable
polling in client 10 for answer 239 returned after 30 secs
with zmq_strerror()=Resource temporarily unavailable
router has
terminated
sven at Micro-PC:~/workspace/ZMQRouterTest> 

The HighWaterMarks
are all set to 250.000 to be completely oversized, not risking the
ROUTER sockets to reach it and drop messages. 

Nonetheless, the output
shows, that 6 clients did not get a reply, the first one for request
number 73 (pretty early). 

WHAT IS WRONG IN THIS CODE LEADING TO LOST
MESSAGES? 

If I put in some printf's (currently commented out) to get
some debugging information, I have to increase message numbers and the
first losses are later than #73 as in the example.
Also when not started
in debugger, the clients or message numbers my need to be increased to
show the problem.
For the worker and client threads, message numbers and
message size, I put in some cmd line args:
 e.g. calling: _ZMQRouterTest
10 20 100 3000_
 starts 10 workers and 20 clients, each sending 100
messages of 3000 (+8) bytes 

The code reflects exactly my stress
testing case for the larger system, that fails since switching from zmq2
to zmq4. 

I am now searching this bug for months !!! 

thanks for every
help 

sven 

PS: can I somehow be informed about a ZMQ_ROUTER socket
reaching the mute state? I did not find any requesting function for it.


======SNIP========================== 

/*
 * ZMQRouterTest.cpp
 *
 *
Created on: Feb 9, 2015
 * Author: sven
 */

#include
<stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include
<unistd.h>
#include <zmq.h>
#include <czmq.h>
#include <zmsg.h>
#include
<zframe.h>
#include <string>
#include <map>

bool
printOutput=false;

int WORKERNUM=20;
int CLIENTNUM=30;
int
MSGNUM=300;
int MSGSIZE=2000;
int POLLTIMEOUT=30;
int HWM=250000;
class
Router {
 std::map<int,int> requestNumbers;
 std::map<int,int>
answerNumbers;
 void inline goRouter() {
 void *clients=zmq_socket(ctx,
ZMQ_ROUTER);
 void *workers=zmq_socket(ctx, ZMQ_ROUTER);
 int arg=1;

zmq_setsockopt(clients,ZMQ_IMMEDIATE,&arg,4);

zmq_setsockopt(workers,ZMQ_IMMEDIATE,&arg,4);

zmq_setsockopt(clients,ZMQ_ROUTER_MANDATORY,&arg,4);

zmq_setsockopt(workers,ZMQ_ROUTER_MANDATORY,&arg,4);
 arg=HWM;

zmq_setsockopt(clients,ZMQ_SNDHWM,&arg,4);

zmq_setsockopt(workers,ZMQ_SNDHWM,&arg,4);

zmq_setsockopt(clients,ZMQ_RCVHWM,&arg,4);

zmq_setsockopt(workers,ZMQ_RCVHWM,&arg,4);
 int
rc=zmq_bind(clients,"tcp://127.0.0.1:10000");

rc=zmq_bind(workers,"tcp://127.0.0.1:10001");
 zmq_pollitem_t items[2];

items[0].socket=clients;
 items[1].socket=workers;

items[0].events=ZMQ_POLLIN;
 items[1].events=ZMQ_POLLIN;
 int
currentWorker=-1,nextWorker=0;
 std::string workerAddr[WORKERNUM];
 int
workerCount=1;
 bool lastReceived=false;
 do {

items[0].revents=0;items[1].revents=0;
 rc=zmq_poll(items,2,-1);
 if
(items[0].revents) {
 // we got a request
 zmsg_t*
request=zmsg_recv(items[0].socket);
 zmsg_first(request);

zmsg_next(request);
 zframe_t* data_f=zmsg_next(request);
 char*
data=(char*)zframe_data(data_f);
 zframe_t* zero_f=zframe_new("",0);

int testedWorkers=0;
 while (workerAddr[nextWorker]=="" &&
testedWorkers<WORKERNUM) {
 nextWorker++;nextWorker%=WORKERNUM;

testedWorkers++;
 }
 if (workerAddr[nextWorker]=="") break; // end this
router, all workers are gone
 zframe_t*
addr_f=zframe_new(workerAddr[nextWorker].c_str(),workerAddr[nextWorker].length()+1);

if (strncmp(data,"Die!",4)==0) {
 //kill this worker entry, the worker
will terminate sson.
 workerAddr[nextWorker].assign("");

workerCount--;
 }
 zmsg_prepend(request,&zero_f);

zmsg_prepend(request,&addr_f);
 int currentMessageNumber=
((int*)data)[0];
 int currentMessageSender= ((int*)data)[1];
 if
(strncmp("Die!",data,4)!=0) {
 if (currentMessageNumber !=
requestNumbers[currentMessageSender]+1)
 printf("expected request %i
from client %i, but got request
%in",requestNumbers[currentMessageSender]+1,currentMessageSender,currentMessageNumber);

else
 printOutput && printf("forwarding request %i from client %i to
worker
%in",requestNumbers[currentMessageSender]+1,currentMessageSender,nextWorker);

}
 requestNumbers[currentMessageSender]=currentMessageNumber;

rc=zmsg_send(&request,items[1].socket);

nextWorker++;nextWorker%=WORKERNUM;
 if (rc<0)
 printf("error sending in
router: %is %sn",errno,strerror(errno));
 }
 if (items[1].revents) {
 //
we got an answer from worker
 zmsg_t* reply=zmsg_recv(items[1].socket);

zframe_t *addr_f=zmsg_first(reply);
 /*zframe_t
*zero_f=*/zmsg_next(reply);
 zframe_t *data_f=zmsg_next(reply);
 char*
addr=(char*)zframe_data(addr_f);
 char*
data=(char*)zframe_data(data_f);
 if (strncmp(data,"Hello",5)==0) {
 //
new worker arrived
 workerCount++;
 currentWorker++;
 if
(currentWorker>=WORKERNUM) {
 printf("only %i workers
supported",WORKERNUM);
 exit(1);
 }

workerAddr[currentWorker].assign(addr);
// printf("welcomed a new
worker: %sn",addr);
 } else {
 // real reply
 zframe_t* f1;
 zframe_t*
f2;

 f1=zmsg_pop(reply);
 f2=zmsg_pop(reply);
 zframe_destroy(&f2);


zframe_t* f3=zmsg_first(reply);
 zframe_t* f4=zmsg_next(reply);

zframe_t* f5=zmsg_next(reply);
 char*data=(char*)zframe_data(f5);
 int
currentMessageNumber= ((int*)data)[0];
 int currentMessageSender=
((int*)data)[1];
 if (strncmp("Die!",data,4)!=0) {
 if
(currentMessageNumber != requestNumbers[currentMessageSender] ||

currentMessageNumber != answerNumbers[currentMessageSender]+1)

printf("expected answer %i for client %i, but got answer
%in",answerNumbers[currentMessageSender]+1,currentMessageSender,currentMessageNumber);

else
 printOutput && printf("forwarding answer %i for client %i, answer
%in",answerNumbers[currentMessageSender]+1,currentMessageSender,currentMessageNumber);

}
 answerNumbers[currentMessageSender]=currentMessageNumber;

zframe_destroy(&f1); // kill adress frame of worker

zmsg_send(&reply,items[0].socket);
 if (workerCount==1) {

workerCount--;
 lastReceived=true;
 }
 }
 }
 } while (workerCount>0);

}
 public:
 static void* ctx;
 static void* lauch_me(void*p) {

static_cast<Router*>(p)->goRouter();
 return 0;
 }
};

class Worker {

void inline goWorker() {
 void *router=zmq_socket(ctx, ZMQ_REQ);
 int
arg=1;
 zmq_setsockopt(router,ZMQ_IMMEDIATE,&arg,4);
 arg=HWM;

zmq_setsockopt(router,ZMQ_SNDHWM,&arg,4);

zmq_setsockopt(router,ZMQ_RCVHWM,&arg,4);
 char instanceName[10];

memcpy(instanceName,"Worker00",8);
 instanceName[6]+=instanceNumber/10;

instanceName[7]+=instanceNumber%10;
 instanceName[8]=0;

zmq_setsockopt(router,ZMQ_IDENTITY,instanceName,strlen(instanceName)+1);
//include tailing NULL in ID

zmq_connect(router,"tcp://127.0.0.1:10001");
 zmq_pollitem_t items[1];

items[0].socket=router;
 items[0].events=ZMQ_POLLIN;
 bool leave=false;

// construct initial "Hello"
 zframe_t *hello_f=zframe_new("Hello",5);

zframe_send(&hello_f,items[0].socket,0);
 while (!leave) {

items[0].revents=0;
 zmq_poll(items,1,-1);
 if (items[0].revents) {

zmsg_t* request=zmsg_recv(items[0].socket);
 zframe_t*
addr_f=zmsg_first(request);
 char* addr=(char*)zframe_data(addr_f);

zmsg_next(request);
 zframe_t *data_f=zmsg_next(request);
 char
*data=(char*)zframe_data(data_f);
 if (strncmp(data,"Die!",4)==0)

leave=true;
 printOutput && printf("got request %i from client
%in",((int*)data)[0],((int*)data)[1]);

zmsg_send(&request,items[0].socket);

 }
 }
 // leave reuested
//
printf("worker %i disconnecting n",instanceNumber);
 sleep(1);

zmq_disconnect(router,"tcp://127.0.0.1:10001");
 zmq_close (router);
//
printf("worker %i ending n",instanceNumber);
 }
 public:
 byte
instanceNumber;
 static void* ctx;
 static void* lauch_me(void*p) {

static_cast<Worker*>(p)->goWorker();
 return 0;
 }
};

class Client {

public: static volatile bool wait4go;
 void inline goClient() {
 void
*socket=zmq_socket(ctx, ZMQ_REQ);
 int arg=1;

zmq_setsockopt(socket,ZMQ_IMMEDIATE,&arg,4);
 arg=HWM;

zmq_setsockopt(socket,ZMQ_SNDHWM,&arg,4);

zmq_setsockopt(socket,ZMQ_RCVHWM,&arg,4);
 char instanceName[10];

memcpy(instanceName,"Client00",8);
 instanceName[6]+=instanceNumber/10;

instanceName[7]+=instanceNumber%10;
 instanceName[8]=0;

zmq_setsockopt(socket,ZMQ_IDENTITY,instanceName,strlen(instanceName)+1);
//include tailing NULL in ID

zmq_connect(socket,"tcp://127.0.0.1:10000");
 zmq_pollitem_t items[1];

items[0].socket=socket;
 items[0].events=ZMQ_POLLIN;
 while (wait4go)
sleep(1);
 for (int i=1; i<MSGNUM; i++) {
 int rc;
 int
*datas=(int*)malloc(8+MSGSIZE);
 datas[0]=i; datas[1]=instanceNumber;

rc=zmq_send(socket,(void*)datas,8+MSGSIZE,0);
 while (
 (rc !=8+MSGSIZE)
&&
 (errno==EAGAIN || errno==EINTR) &&
 datas!=0
 ) {
 printf("error
sending in client: %sn",zmq_strerror(zmq_errno()));

rc=zmq_send(socket,datas,8+MSGSIZE,0);
 }
 items[0].revents=0;

rc=zmq_poll(items,1,POLLTIMEOUT*1000);
 if (rc<=0) {
 printf("polling in
client %i for answer %i returned after %i secs with
zmq_strerror()=%sn",

instanceNumber,i,POLLTIMEOUT,zmq_strerror(zmq_errno()));
 break;
 } else
{
 zmsg_t* reply=zmsg_recv(socket);
 zframe_t*
data_f=zmsg_first(reply);
 int* data=(int*)zframe_data(data_f);
 if
(*data != i)
 printf("expected message reply %i, but got %in",i,*data);

}
 }
 printOutput && printf("client %i ending n",instanceNumber);

sleep(1);
 zmq_disconnect(socket,"tcp://127.0.0.1:10000");
 zmq_close
(socket);
 }
 public:
 byte instanceNumber;
 static void* ctx;
 static
void* lauch_me(void*p) {
 static_cast<Client*>(p)->goClient();
 return
0;
 }
};

void* Router::ctx=0;
void* Worker::ctx=0;
void*
Client::ctx=0;
volatile bool Client::wait4go=true;

int main(int argc,
char**argv) {
 if (argc > 1) WORKERNUM=atoi(argv[1]);
 if (argc > 2)
CLIENTNUM=atoi(argv[2]);
 if (argc > 3) MSGNUM=atoi(argv[3]);
 if (argc
> 4) MSGSIZE=atoi(argv[4]);
 Router::ctx=zmq_ctx_new(); // create global
context
 zmq_ctx_set(Router::ctx,ZMQ_IO_THREADS,10);

Worker::ctx=Router::ctx; // use same context in worker and router

Client::ctx=Router::ctx; // use same context in client and router


Router *router=new Router();
 printf("starting Router ...n");


pthread_t r_tid;
 int ret=pthread_create(&r_tid, 0, Router::lauch_me,
router);
 if (ret!=0) exit(1);
 sleep(1);

 printf("creating %i workers
...n",WORKERNUM);
 Worker *workers[WORKERNUM];
 pthread_t
worker_ids[WORKERNUM];
 for(int i=0; i<WORKERNUM; i++) {
 workers[i]=new
Worker();
 workers[i]->instanceNumber=i;
 int
ret=pthread_create(&worker_ids[i], 0, Worker::lauch_me, workers[i]);
 if
(ret!=0) exit(1);
// printf("worker thread id %lin",worker_ids[i]);
 }

sleep(2);

 printf("creating %i clients ...n",CLIENTNUM);
 Client
*clients[CLIENTNUM];
 pthread_t client_ids[CLIENTNUM];
 for(int i=0;
i<CLIENTNUM; i++) {
 clients[i]=new Client();

clients[i]->instanceNumber=i;
 int ret=pthread_create(&client_ids[i], 0,
Client::lauch_me, clients[i]);
 if (ret!=0) exit(1);
// printf("client
thread id %lin",client_ids[i]);
 }
 sleep(10);
 printf("starting all
clients ...n");
 Client::wait4go=false;

 void*result;
 for (int i=0;
i<CLIENTNUM; i++) {
 pthread_join(client_ids[i],&result);
//
printf("client %i has terminatedn",i);
 }

 void*
socket=zmq_socket(Router::ctx,ZMQ_REQ);
 int one=1;

zmq_setsockopt(socket,ZMQ_IMMEDIATE,&one,4);

zmq_connect(socket,"tcp://127.0.0.1:10000");
 for (int i=0; i<WORKERNUM;
i++) {
 zmq_send(socket,"Die!",4,0);
 zmsg_recv(socket);
 }
 for (int
i=0; i<WORKERNUM; i++) {
 pthread_join(worker_ids[i],&result);
//
printf("worker %i has terminatedn",i);
 }
 pthread_join(r_tid,&result);

printf("router has terminatedn");
// sometimes the thing does not start
fine ... check the cause
!!!!
}

sven at Micro-PC:~/workspace/ZMQRouterTest> Debug/ZMQRouterTest

starting Router ...
creating 20 workers ...
creating 30 clients
...
starting all clients ...
polling in client 12 for answer 73 returned
after 30 secs with zmq_strerror()=Resource temporarily
unavailable
polling in client 22 for answer 110 returned after 30 secs
with zmq_strerror()=Resource temporarily unavailable
polling in client
29 for answer 119 returned after 30 secs with zmq_strerror()=Resource
temporarily unavailable
polling in client 9 for answer 118 returned
after 30 secs with zmq_strerror()=Resource temporarily
unavailable
polling in client 13 for answer 249 returned after 30 secs
with zmq_strerror()=Resource temporarily unavailable
polling in client
10 for answer 239 returned after 30 secs with zmq_strerror()=Resource
temporarily unavailable
router has
terminated
sven at Micro-PC:~/workspace/ZMQRouterTest> 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20150318/507b395d/attachment.htm>


More information about the zeromq-dev mailing list