[zeromq-dev] need help with PUB/SUB
sven.koebnick at t-online.de
sven.koebnick at t-online.de
Fri Mar 6 16:58:24 CET 2015
Hi all!
after migration from ZMQ2 to ZMQ4 I had problems with
vanished messages (on REP/REQ as well as on PUB/SUB).
I now MANAGED TO
BUILD SOME DIRTY "MINIMAL" TESTCASE (see below) displaying at least
>one< of my problems.
The testcase does following:
- create a router
binding to one SUB socket (for clients) and one PUB socket (for workers)
- create some workers listening on a SUB socket connected to the
routers PUB port
- WAIT some seconds !!!
- send 10000 numbers on a
PUB socket connected to routers SUB port in full speed
- workers
expect numbers 0 to n to be send as massages in correct order and
complain on missfitting numbers
The result shows, that ABOUT 1400 TO
3000 OF 10000 MESSAGES GET LOST until the rest reaches the workers.
WHAT AM I DOING WRONG ?
It seems to get a LITTLE better if I
-
increase the threads in context with zmq_ctx_set()
- increase the
number of workers
- increase the SND and RCV highwatermarks using
set_sockopt()
but it never get really good.
When I send 100.000
message, the first 3.500 get lost (?!?).
When I send 1000 messages,
none arrives ;o(
When I set the SNDHMW and RCVHWM to 0 (zero) which
should be "unlimited", nothing ever arrives.
many thanks
sven
=====SNIP===output==================================
starting Router
...
thread id -1212605632
starting 10 Workers ...
finished starting
Workers ...
sending messages ...
sending messages done
worker 2 expected
message 0, got message 4616
worker 5 expected message 0, got message
4616
worker 9 expected message 0, got message 4616
worker 0 expected
message 0, got message 4616
worker 7 expected message 0, got message
4616
worker 4 expected message 0, got message 4616
worker 1 expected
message 0, got message 4616
worker 8 expected message 0, got message
4616
worker 6 expected message 0, got message 4616
worker 3 expected
message 0, got message 4616
worker 9 ending after 5383 good messages.
last message was 9999
worker 2 ending after 5383 good messages. last
message was 9999
worker 7 ending after 5383 good messages. last message
was 9999
worker 5 ending after 5383 good messages. last message was
9999
worker 8 ending after 5383 good messages. last message was 9999
worker 4 ending after 5383 good messages. last message was 9999
worker
1 ending after 5383 good messages. last message was 9999
worker 3
ending after 5383 good messages. last message was 9999
worker 6 ending
after 5383 good messages. last message was 9999
worker 0 ending after
5383 good messages. last message was 9999
all own thread have
terminated
router has
terminated
=====SNIP===code==================================
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include
<unistd.h>
#include <zmq.h>
#include <czmq.h>
#include
<zframe.h>
#include <string>
int numWorkers=10;
void sethwm(int
o,void*s) {
int opt;
size_t len=4;
opt=500000;
int
rc=zmq_setsockopt(s,o,&opt,len);
if (rc!=0)
printf("error setting
HWMn");
opt=-1;
rc=zmq_getsockopt(s,o,&opt,&len);
if (rc!=0)
printf("error getting HWMn");
}
class Router {
public: volatile bool
leave=false;
private:
void inline goRouter() {
void
*clients=zmq_socket(ctx, ZMQ_SUB);
sethwm(ZMQ_RCVHWM,clients);
void
*workers=zmq_socket(ctx, ZMQ_PUB);
sethwm(ZMQ_SNDHWM,workers);
int
rc=zmq_setsockopt(clients,ZMQ_SUBSCRIBE,"",0);
if (rc!=0) printf("Error
in line %in",__LINE__ -1);
rc=zmq_bind(clients,"tcp://127.0.0.1:10000");
if (rc!=0) printf("Error
%i %s in line %in",zmq_errno(),zmq_strerror(zmq_errno()),__LINE__ -1);
rc=zmq_bind(workers,"tcp://127.0.0.1:10001");
if (rc!=0) printf("Error
%i %s in line %in",zmq_errno(),zmq_strerror(zmq_errno()),__LINE__ -1);
zmq_pollitem_t items[1];
items[0].socket=clients;
items[0].events=ZMQ_POLLIN;
int currentWorker=-1,nextWorker=0;
int
workerCount=1;
bool lastReceived=false;
do {
items[0].revents=0;
rc=zmq_poll(items,1,10);
if (rc<0) printf("Error in line %in",__LINE__
-1);
if (rc==0) continue;
if (items[0].revents) {
// we got a
request
zmsg_t* request=zmsg_recv(items[0].socket);
if (request==0)
printf("Error in line %in",__LINE__ -1);
zmsg_first(request);
zmsg_next(request);
zframe_t* data_f=zmsg_next(request);
char*
data=(char*)zframe_data(data_f);
rc=zmsg_send(&request,workers);
if
(request!=0) printf("Error in line %in",__LINE__ -1);
if (rc<0)
printf("error sending in router: %is %sn",errno,strerror(errno));
}
}
while (workerCount>0 && !leave);
}
public:
static void* ctx;
static
void* lauch_me(void*p) {
static_cast<Router*>(p)->goRouter();
return
0;
}
};
class Worker {
public: volatile bool leave=false;
private:
void inline goWorker() {
void *router=zmq_socket(ctx, ZMQ_SUB);
sethwm(ZMQ_RCVHWM,router);
char instanceName[10];
memcpy(instanceName,"Worker000",10);
instanceName[6]+=instanceNumber/100;
instanceName[7]+=(instanceNumber%100)/10;
instanceName[8]+=instanceNumber%10;
int
rc=zmq_setsockopt(router,ZMQ_SUBSCRIBE,0,0);
if (rc!=0) printf("Error
in line %in",__LINE__ -1);
rc=zmq_connect(router,"tcp://127.0.0.1:10001");
if (rc!=0)
printf("Error in line %in",__LINE__ -1);
zmq_pollitem_t items[1];
items[0].socket=router;
items[0].events=ZMQ_POLLIN;
int
lastMessageNumber=-1,currentMessageNumber=-1;
bool
leaveByMessage=false;
int OKcount=0;
while (!leave) {
items[0].revents=0;
rc=zmq_poll(items,1,10);
if (rc<0) printf("Error
in line %in",__LINE__ -1);
if (rc==0) continue;
if (items[0].revents)
{
zmsg_t* request=zmsg_recv(items[0].socket);
zframe_t
*data_f=zmsg_first(request);
char *data=(char*)zframe_data(data_f);
if
(strncmp(data,"Die!",4)==0) {
leave=true; leaveByMessage=true;
} else
{
currentMessageNumber= *((int*)data);
if (currentMessageNumber ==
lastMessageNumber+1)
OKcount++;
else
printf("worker %i expected
message %i, got message
%in",instanceNumber,lastMessageNumber+1,currentMessageNumber);
lastMessageNumber=currentMessageNumber;
}
}
}
printf("worker %i
ending after %i good messages%s. last message was %i
n",instanceNumber,OKcount,leaveByMessage?"":" by hard
termination",lastMessageNumber);
sleep(1);
rc=zmq_disconnect(router,"tcp://127.0.0.1:10001");
if (rc!=0)
printf("Error in line %in",__LINE__ -1);
rc=zmq_close (router);
if
(rc!=0) printf("Error in line %in",__LINE__ -1);
}
public:
byte
instanceNumber;
static void* ctx;
static void* lauch_me(void*p) {
static_cast<Worker*>(p)->goWorker();
return 0;
}
};
void*
Router::ctx=0;
void* Worker::ctx=0;
int main(int argc, char**argv) {
Router::ctx=zmq_ctx_new(); // create global context
zmq_ctx_set(Router::ctx,ZMQ_IO_THREADS,100);
Worker::ctx=Router::ctx;
// use same context in worker and router
Router *router=new Router();
pthread_t r_tid;
printf("starting Router ...n");
int
ret=pthread_create(&r_tid, 0, Router::lauch_me, router);
if (ret!=0)
exit(1);
bool a=true;
// while (a) sleep (1);
sleep(1);
printf("
thread id %lin",r_tid);
Worker *workers[100];
pthread_t
worker_ids[100];
printf("starting %i Workers ...n",numWorkers);
for(int i=0; i<numWorkers; i++) {
workers[i]=new Worker();
workers[i]->instanceNumber=i;
int ret=pthread_create(&worker_ids[i], 0,
Worker::lauch_me, workers[i]);
if (ret!=0) exit(1);
}
printf("finished starting Workers ...n");
sleep(10);
printf("sending
messages ...n");
void* socket=zmq_socket(Router::ctx,ZMQ_PUB);
sethwm(ZMQ_SNDHWM,socket);
zmq_connect(socket,"tcp://127.0.0.1:10000");
for (int i=0; i<10000;
i++) {
// if (i%100==0) printf(".");
int rc;
int errorcount=0;
int
*datas=(int*)malloc(4); *datas=i;
rc=zmq_send(socket,datas,sizeof(*datas),0);
if (rc!=4) printf("fehler
beim senden von message %in",i);
}
printf("sending messages donen");
int rc=zmq_send(socket,"Die!",4,0);
if (rc!=4) {
printf("cound not
send termination message to workers, killing them by static
variablen");
for (int i=0; i<numWorkers; i++) {
workers[i]->leave=true;
}
}
sleep(30);
router->leave=true;
void*result;
for (int i=0; i<numWorkers; i++) {
workers[i]->leave=true;
pthread_join(worker_ids[i],&result);
}
printf("all own thread have terminatedn");
pthread_join(r_tid,&result);
printf("router has terminatedn");
}
printf("cound not send termination message to worker %i");
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20150306/61849d74/attachment.htm>
More information about the zeromq-dev
mailing list