[zeromq-dev] need help with PUB/SUB
sven.koebnick at t-online.de
sven.koebnick at t-online.de
Sat Mar 7 14:33:20 CET 2015
Oops ... I overlooked, that there's no time between connect and send
in client ;o)
please excuse this silly thing.
Result: I still don't
have an easy test case for my vanished messages ;o((((((
Am 2015-03-07
13:41, schrieb Sven Koebnick:
> Thanks for the fast response ;-)
>
Did you notice that all workers get the same first message? The bug
should be in the router or the client (main) I assume.
>
> Is there
some way to ask the socket if the connection has been established? I
didnt find one.
>
> Indeed in my larger system, I have the combination
you suggested of Pub as Rep in router, Sub and Req in workers.
> It is
asured, that the connection is there because sometimes the second or
third reply from Router to workers is lost, but not before ( I think)
the first Pub message is send to worker. But this exact situation I did
not manage to show in a minimal testcase.
>
> Are there any "dont
do's" or important inits, that I missed?
> In ZMQ2 all things worked
well. It started with switching to ZMQ4.
>
> If you want, I could draw
some small picture of the structure I use.
>
> What I noticed in your
API is, that some funcs take a message** type for NULLing the data
pointer as sign "this data belongs to zmq now" but some (e.g.
Send(socket*, byte*, size) for sending data directly without wrapping in
a zmsg.
> But I allready tried leaking ALL data for not free()ing data
that may linger in zmq, but only to little improvement.
> Also I
checked with tdmalloc, valgrind, efence and duma, but they all say that
I'm fine with memory access. (or does zmq somehow work arround those
memory libs?)
>
> mit freundlichen Grüßen
> Sven Koebnick
>
--------------------------------------------------------
> E = mc² ±
2dBA ----- everything is relative
>
---------------------------------------------------------
>
> Am
06.03.2015 um 17:29 schrieb Kenneth Adam Miller
<kennethadammiller at gmail.com>:
>
>> I notice that in your public code,
you wait until you get a request on the sub socket, and then publish on
the workers socket. The workers socket is a pub socket, and just
receiving a request on the sub socket doesn't mean that all workers have
connected. Why not, before you start publishing at all, have an
additional rep socket and wait in a loop to receive and respond to
exactly as many workers as you intend to can off because you start
listening on the sub socket at all. Then what you could do is have the
workers send a req message to that rep socket, and wait for a reply
before sending something to the sub socket. I'm not sure what the sub
socket is supposed to do, I suppose you identified the need for it since
it's a many to one relationship-you have many workers, one publisher.
But you can replace the sub socket's role in waiting for worker message
with a rep socket. In the worker, before sending to the rep socket, make
a connect call to the publisher.
>>
>> On Fri, Mar 6, 2015 at 10:58
AM, <sven.koebnick at t-online.de> wrote:
>>
>>> Hi all!
>>>
>>> after
migration from ZMQ2 to ZMQ4 I had problems with vanished messages (on
REP/REQ as well as on PUB/SUB).
>>>
>>> I now MANAGED TO BUILD SOME
DIRTY "MINIMAL" TESTCASE (see below) displaying at least >one< of my
problems.
>>>
>>> The testcase does following:
>>>
>>> - create a
router binding to one SUB socket (for clients) and one PUB socket (for
workers)
>>>
>>> - create some workers listening on a SUB socket
connected to the routers PUB port
>>>
>>> - WAIT some seconds !!!
>>>
>>> - send 10000 numbers on a PUB socket connected to routers SUB port
in full speed
>>>
>>> - workers expect numbers 0 to n to be send as
massages in correct order and complain on missfitting numbers
>>>
>>>
The result shows, that ABOUT 1400 TO 3000 OF 10000 MESSAGES GET LOST
until the rest reaches the workers.
>>>
>>> WHAT AM I DOING WRONG ?
>>>
>>> It seems to get a LITTLE better if I
>>>
>>> - increase the
threads in context with zmq_ctx_set()
>>>
>>> - increase the number of
workers
>>>
>>> - increase the SND and RCV highwatermarks using
set_sockopt()
>>>
>>> but it never get really good.
>>>
>>> When I
send 100.000 message, the first 3.500 get lost (?!?).
>>>
>>> When I
send 1000 messages, none arrives ;o(
>>>
>>> When I set the SNDHMW and
RCVHWM to 0 (zero) which should be "unlimited", nothing ever arrives.
>>>
>>> many thanks
>>>
>>> sven
>>>
>>>
=====SNIP===output==================================
>>>
>>> starting
Router ...
>>> thread id -1212605632
>>> starting 10 Workers ...
>>>
finished starting Workers ...
>>> sending messages ...
>>> sending
messages done
>>> worker 2 expected message 0, got message 4616
>>>
worker 5 expected message 0, got message 4616
>>> worker 9 expected
message 0, got message 4616
>>> worker 0 expected message 0, got message
4616
>>> worker 7 expected message 0, got message 4616
>>> worker 4
expected message 0, got message 4616
>>> worker 1 expected message 0,
got message 4616
>>> worker 8 expected message 0, got message 4616
>>>
worker 6 expected message 0, got message 4616
>>> worker 3 expected
message 0, got message 4616
>>> worker 9 ending after 5383 good
messages. last message was 9999
>>> worker 2 ending after 5383 good
messages. last message was 9999
>>> worker 7 ending after 5383 good
messages. last message was 9999
>>> worker 5 ending after 5383 good
messages. last message was 9999
>>> worker 8 ending after 5383 good
messages. last message was 9999
>>> worker 4 ending after 5383 good
messages. last message was 9999
>>> worker 1 ending after 5383 good
messages. last message was 9999
>>> worker 3 ending after 5383 good
messages. last message was 9999
>>> worker 6 ending after 5383 good
messages. last message was 9999
>>> worker 0 ending after 5383 good
messages. last message was 9999
>>> all own thread have terminated
>>>
router has terminated
>>>
>>>
=====SNIP===code==================================
>>>
>>> #include
<stdlib.h>
>>> #include <stdio.h>
>>> #include <pthread.h>
>>> #include
<unistd.h>
>>> #include <zmq.h>
>>> #include <czmq.h>
>>> #include
<zframe.h>
>>> #include <string>
>>>
>>> int numWorkers=10;
>>>
>>>
void sethwm(int o,void*s) {
>>> int opt;
>>> size_t len=4;
>>>
opt=500000;
>>> int rc=zmq_setsockopt(s,o,&opt,len);
>>> if (rc!=0)
>>>
printf("error setting HWMn");
>>> opt=-1;
>>>
rc=zmq_getsockopt(s,o,&opt,&len);
>>> if (rc!=0)
>>> printf("error
getting HWMn");
>>> }
>>>
>>> class Router {
>>> public: volatile bool
leave=false;
>>> private:
>>> void inline goRouter() {
>>> void
*clients=zmq_socket(ctx, ZMQ_SUB);
>>> sethwm(ZMQ_RCVHWM,clients);
>>>
void *workers=zmq_socket(ctx, ZMQ_PUB);
>>>
sethwm(ZMQ_SNDHWM,workers);
>>> int
rc=zmq_setsockopt(clients,ZMQ_SUBSCRIBE,"",0);
>>> if (rc!=0)
printf("Error in line %in",__LINE__ -1);
>>>
rc=zmq_bind(clients,"tcp://127.0.0.1:10000 [1]");
>>> if (rc!=0)
printf("Error %i %s in line
%in",zmq_errno(),zmq_strerror(zmq_errno()),__LINE__ -1);
>>>
rc=zmq_bind(workers,"tcp://127.0.0.1:10001 [2]");
>>> if (rc!=0)
printf("Error %i %s in line
%in",zmq_errno(),zmq_strerror(zmq_errno()),__LINE__ -1);
>>>
zmq_pollitem_t items[1];
>>> items[0].socket=clients;
>>>
items[0].events=ZMQ_POLLIN;
>>> int currentWorker=-1,nextWorker=0;
>>>
int workerCount=1;
>>> bool lastReceived=false;
>>> do {
>>>
items[0].revents=0;
>>> rc=zmq_poll(items,1,10);
>>> if (rc<0)
printf("Error in line %in",__LINE__ -1);
>>> if (rc==0) continue;
>>> if
(items[0].revents) {
>>> // we got a request
>>> zmsg_t*
request=zmsg_recv(items[0].socket);
>>> if (request==0) printf("Error in
line %in",__LINE__ -1);
>>> zmsg_first(request);
>>>
zmsg_next(request);
>>> zframe_t* data_f=zmsg_next(request);
>>> char*
data=(char*)zframe_data(data_f);
>>> rc=zmsg_send(&request,workers);
>>>
if (request!=0) printf("Error in line %in",__LINE__ -1);
>>> if
(rc<0)
>>> printf("error sending in router: %is
%sn",errno,strerror(errno));
>>> }
>>> } while (workerCount>0 &&
!leave);
>>> }
>>> public:
>>> static void* ctx;
>>> static void*
lauch_me(void*p) {
>>> static_cast<Router*>(p)->goRouter();
>>> return
0;
>>> }
>>> };
>>>
>>> class Worker {
>>> public: volatile bool
leave=false;
>>> private:
>>> void inline goWorker() {
>>> void
*router=zmq_socket(ctx, ZMQ_SUB);
>>> sethwm(ZMQ_RCVHWM,router);
>>>
char instanceName[10];
>>> memcpy(instanceName,"Worker000",10);
>>>
instanceName[6]+=instanceNumber/100;
>>>
instanceName[7]+=(instanceNumber%100)/10;
>>>
instanceName[8]+=instanceNumber%10;
>>> int
rc=zmq_setsockopt(router,ZMQ_SUBSCRIBE,0,0);
>>> if (rc!=0)
printf("Error in line %in",__LINE__ -1);
>>>
rc=zmq_connect(router,"tcp://127.0.0.1:10001 [2]");
>>> if (rc!=0)
printf("Error in line %in",__LINE__ -1);
>>> zmq_pollitem_t
items[1];
>>> items[0].socket=router;
>>>
items[0].events=ZMQ_POLLIN;
>>> int
lastMessageNumber=-1,currentMessageNumber=-1;
>>> bool
leaveByMessage=false;
>>> int OKcount=0;
>>> while (!leave) {
>>>
items[0].revents=0;
>>> rc=zmq_poll(items,1,10);
>>> if (rc<0)
printf("Error in line %in",__LINE__ -1);
>>> if (rc==0) continue;
>>> if
(items[0].revents) {
>>> zmsg_t* request=zmsg_recv(items[0].socket);
>>>
zframe_t *data_f=zmsg_first(request);
>>> char
*data=(char*)zframe_data(data_f);
>>> if (strncmp(data,"Die!",4)==0)
{
>>> leave=true; leaveByMessage=true;
>>> } else {
>>>
currentMessageNumber= *((int*)data);
>>> if (currentMessageNumber ==
lastMessageNumber+1)
>>> OKcount++;
>>> else
>>> printf("worker %i
expected message %i, got message
%in",instanceNumber,lastMessageNumber+1,currentMessageNumber);
>>>
lastMessageNumber=currentMessageNumber;
>>> }
>>> }
>>> }
>>>
printf("worker %i ending after %i good messages%s. last message was %i
n",instanceNumber,OKcount,leaveByMessage?"":" by hard
termination",lastMessageNumber);
>>> sleep(1);
>>>
rc=zmq_disconnect(router,"tcp://127.0.0.1:10001 [2]");
>>> if (rc!=0)
printf("Error in line %in",__LINE__ -1);
>>> rc=zmq_close (router);
>>>
if (rc!=0) printf("Error in line %in",__LINE__ -1);
>>> }
>>>
public:
>>> byte instanceNumber;
>>> static void* ctx;
>>> static void*
lauch_me(void*p) {
>>> static_cast<Worker*>(p)->goWorker();
>>> return
0;
>>> }
>>> };
>>>
>>> void* Router::ctx=0;
>>> void*
Worker::ctx=0;
>>>
>>> int main(int argc, char**argv) {
>>>
Router::ctx=zmq_ctx_new(); // create global context
>>>
zmq_ctx_set(Router::ctx,ZMQ_IO_THREADS,100);
>>>
Worker::ctx=Router::ctx; // use same context in worker and router
>>>
>>> Router *router=new Router();
>>> pthread_t r_tid;
>>>
printf("starting Router ...n");
>>> int ret=pthread_create(&r_tid, 0,
Router::lauch_me, router);
>>> if (ret!=0) exit(1);
>>> bool a=true;
>>>
// while (a) sleep (1);
>>> sleep(1);
>>> printf(" thread id
%lin",r_tid);
>>>
>>> Worker *workers[100];
>>> pthread_t
worker_ids[100];
>>> printf("starting %i Workers ...n",numWorkers);
>>>
for(int i=0; i<numWorkers; i++) {
>>> workers[i]=new Worker();
>>>
workers[i]->instanceNumber=i;
>>> int ret=pthread_create(&worker_ids[i],
0, Worker::lauch_me, workers[i]);
>>> if (ret!=0) exit(1);
>>> }
>>>
printf("finished starting Workers ...n");
>>> sleep(10);
>>>
>>>
printf("sending messages ...n");
>>>
>>> void*
socket=zmq_socket(Router::ctx,ZMQ_PUB);
>>>
sethwm(ZMQ_SNDHWM,socket);
>>> zmq_connect(socket,"tcp://127.0.0.1:10000
[1]");
>>> for (int i=0; i<10000; i++) {
>>> // if (i%100==0)
printf(".");
>>> int rc;
>>> int errorcount=0;
>>> int
*datas=(int*)malloc(4); *datas=i;
>>>
rc=zmq_send(socket,datas,sizeof(*datas),0);
>>> if (rc!=4)
printf("fehler beim senden von message %in",i);
>>> }
>>>
printf("sending messages donen");
>>> int
rc=zmq_send(socket,"Die!",4,0);
>>> if (rc!=4) {
>>> printf("cound not
send termination message to workers, killing them by static
variablen");
>>> for (int i=0; i<numWorkers; i++) {
>>>
workers[i]->leave=true;
>>> }
>>> }
>>> sleep(30);
>>>
router->leave=true;
>>> void*result;
>>> for (int i=0; i<numWorkers;
i++) {
>>> workers[i]->leave=true;
>>>
pthread_join(worker_ids[i],&result);
>>> }
>>> printf("all own thread
have terminatedn");
>>> pthread_join(r_tid,&result);
>>> printf("router
has terminatedn");
>>> }
>>>
>>> printf("cound not send termination
message to worker %i");
>>>
>>>
_______________________________________________
>>> zeromq-dev mailing
list
>>> zeromq-dev at lists.zeromq.org
>>>
http://lists.zeromq.org/mailman/listinfo/zeromq-dev [3]
>
>>
_______________________________________________
>> zeromq-dev mailing
list
>> zeromq-dev at lists.zeromq.org
>>
http://lists.zeromq.org/mailman/listinfo/zeromq-dev [3]
>
>
_______________________________________________
> zeromq-dev mailing
list
> zeromq-dev at lists.zeromq.org
>
http://lists.zeromq.org/mailman/listinfo/zeromq-dev [3]
Links:
------
[1] http://127.0.0.1:10000
[2]
http://127.0.0.1:10001
[3]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20150307/d8475f69/attachment.htm>
More information about the zeromq-dev
mailing list