[zeromq-dev] need help with PUB/SUB
Kenneth Adam Miller
kennethadammiller at gmail.com
Sat Mar 7 17:56:58 CET 2015
Replied inline.
On Sat, Mar 7, 2015 at 8:33 AM, <sven.koebnick at t-online.de> wrote:
> Oops ... I overlooked, that there's no time between connect and send in
> client ;o)
>
> please excuse this silly thing.
>
>
>
> Result: I still don't have an easy test case for my vanished messages
> ;o((((((
>
>
>
> Am 2015-03-07 13:41, schrieb Sven Koebnick:
>
> Thanks for the fast response ;-)
> Did you notice that all workers get the same first message? The bug should
> be in the router or the client (main) I assume.
>
> Is there some way to ask the socket if the connection has been
> established? I didnt find one.
>
>
No, the canonical way to ensure that messages go through is to make sure
that the server, or whichever side does bind, is already running and bound
before making a connect call. Then when you make the connect call it will
go through. The other way to ensure reliability is to have a application
layer message loss failback strategy. So, when you, on the client side, do
connect and then send, wait for a response in a loop. Use exponential
backoff and retry to send, and continual message loss means that the
listener is not available.
>
> Indeed in my larger system, I have the combination you suggested of Pub as
> Rep in router, Sub and Req in workers.
> It is asured, that the connection is there because sometimes the second or
> third reply from Router to workers is lost, but not before ( I think) the
> first Pub message is send to worker. But this exact situation I did not
> manage to show in a minimal testcase.
>
>
Ok, well my replies are going off the source that you have provided so far.
I might be better off modifying that if you didn't understand, but I don't
have anything to go by when you mention your other structures, so I can't
do much there.
>
> Are there any "dont do's" or important inits, that I missed?
> In ZMQ2 all things worked well. It started with switching to ZMQ4.
>
> If you want, I could draw some small picture of the structure I use.
>
>
Please do this.
>
> What I noticed in your API is, that some funcs take a message** type for
> NULLing the data pointer as sign "this data belongs to zmq now" but some
> (e.g. Send(socket*, byte*, size) for sending data directly without wrapping
> in a zmsg.
> But I allready tried leaking ALL data for not free()ing data that may
> linger in zmq, but only to little improvement.
> Also I checked with tdmalloc, valgrind, efence and duma, but they all say
> that I'm fine with memory access. (or does zmq somehow work arround those
> memory libs?)
>
>
Reference the zmq api for the specific library you are using; more than
likely, there is a convention that is strictly obeyed by the API regarding
resource ownership. Otherwise, I would advise you to abandon delete and
free calls in favor of RAII and language facilities like std::unique_ptr.
>
> mit freundlichen Grüßen
> Sven Koebnick
> --------------------------------------------------------
> E = mc² ± 2dBA ----- everything is relative
> ---------------------------------------------------------
>
> Am 06.03.2015 um 17:29 schrieb Kenneth Adam Miller <
> kennethadammiller at gmail.com>:
>
> I notice that in your public code, you wait until you get a request on
> the sub socket, and then publish on the workers socket. The workers socket
> is a pub socket, and just receiving a request on the sub socket doesn't
> mean that all workers have connected. Why not, before you start publishing
> at all, have an additional rep socket and wait in a loop to receive and
> respond to exactly as many workers as you intend to can off because you
> start listening on the sub socket at all. Then what you could do is have
> the workers send a req message to that rep socket, and wait for a reply
> before sending something to the sub socket. I'm not sure what the sub
> socket is supposed to do, I suppose you identified the need for it since
> it's a many to one relationship-you have many workers, one publisher. But
> you can replace the sub socket's role in waiting for worker message with a
> rep socket. In the worker, before sending to the rep socket, make a connect
> call to the publisher.
>
> On Fri, Mar 6, 2015 at 10:58 AM, <sven.koebnick at t-online.de> wrote:
>
>> Hi all!
>>
>>
>> after migration from ZMQ2 to ZMQ4 I had problems with vanished messages
>> (on REP/REQ as well as on PUB/SUB).
>>
>> I now *managed to build some DIRTY "minimal" testcase* (see below)
>> displaying at least >one< of my problems.
>>
>> The testcase does following:
>>
>> - create a router binding to one SUB socket (for clients) and one PUB
>> socket (for workers)
>>
>> - create some workers listening on a SUB socket connected to the routers
>> PUB port
>>
>> - WAIT some seconds !!!
>>
>> - send 10000 numbers on a PUB socket connected to routers SUB port in
>> full speed
>>
>> - workers expect numbers 0 to n to be send as massages in correct order
>> and complain on missfitting numbers
>>
>>
>>
>> The result shows, that *about 1400 to 3000 of 10000 messages get lost*
>> until the rest reaches the workers.
>>
>> WHAT AM I DOING WRONG ?
>>
>>
>> It seems to get a *little* better if I
>>
>> - increase the threads in context with zmq_ctx_set()
>>
>> - increase the number of workers
>>
>> - increase the SND and RCV highwatermarks using set_sockopt()
>>
>> but it never get really good.
>>
>>
>>
>> When I send 100.000 message, the first 3.500 get lost (?!?).
>>
>> When I send 1000 messages, none arrives ;o(
>>
>> When I set the SNDHMW and RCVHWM to 0 (zero) which should be "unlimited",
>> nothing ever arrives.
>>
>> many thanks
>>
>> sven
>>
>> =====SNIP===output==================================
>>
>> starting Router ...
>> thread id -1212605632
>> starting 10 Workers ...
>> finished starting Workers ...
>> sending messages ...
>> sending messages done
>> worker 2 expected message 0, got message 4616
>> worker 5 expected message 0, got message 4616
>> worker 9 expected message 0, got message 4616
>> worker 0 expected message 0, got message 4616
>> worker 7 expected message 0, got message 4616
>> worker 4 expected message 0, got message 4616
>> worker 1 expected message 0, got message 4616
>> worker 8 expected message 0, got message 4616
>> worker 6 expected message 0, got message 4616
>> worker 3 expected message 0, got message 4616
>> worker 9 ending after 5383 good messages. last message was 9999
>> worker 2 ending after 5383 good messages. last message was 9999
>> worker 7 ending after 5383 good messages. last message was 9999
>> worker 5 ending after 5383 good messages. last message was 9999
>> worker 8 ending after 5383 good messages. last message was 9999
>> worker 4 ending after 5383 good messages. last message was 9999
>> worker 1 ending after 5383 good messages. last message was 9999
>> worker 3 ending after 5383 good messages. last message was 9999
>> worker 6 ending after 5383 good messages. last message was 9999
>> worker 0 ending after 5383 good messages. last message was 9999
>> all own thread have terminated
>> router has terminated
>>
>> =====SNIP===code==================================
>>
>> #include <stdlib.h>
>> #include <stdio.h>
>> #include <pthread.h>
>> #include <unistd.h>
>> #include <zmq.h>
>> #include <czmq.h>
>> #include <zframe.h>
>> #include <string>
>>
>> int numWorkers=10;
>>
>> void sethwm(int o,void*s) {
>> int opt;
>> size_t len=4;
>> opt=500000;
>> int rc=zmq_setsockopt(s,o,&opt,len);
>> if (rc!=0)
>> printf("error setting HWM\n");
>> opt=-1;
>> rc=zmq_getsockopt(s,o,&opt,&len);
>> if (rc!=0)
>> printf("error getting HWM\n");
>> }
>>
>> class Router {
>> public: volatile bool leave=false;
>> private:
>> void inline goRouter() {
>> void *clients=zmq_socket(ctx, ZMQ_SUB);
>> sethwm(ZMQ_RCVHWM,clients);
>> void *workers=zmq_socket(ctx, ZMQ_PUB);
>> sethwm(ZMQ_SNDHWM,workers);
>> int rc=zmq_setsockopt(clients,ZMQ_SUBSCRIBE,"",0);
>> if (rc!=0) printf("Error in line %i\n",__LINE__ -1);
>> rc=zmq_bind(clients,"tcp://127.0.0.1:10000");
>> if (rc!=0) printf("Error %i %s in line
>> %i\n",zmq_errno(),zmq_strerror(zmq_errno()),__LINE__ -1);
>> rc=zmq_bind(workers,"tcp://127.0.0.1:10001");
>> if (rc!=0) printf("Error %i %s in line
>> %i\n",zmq_errno(),zmq_strerror(zmq_errno()),__LINE__ -1);
>> zmq_pollitem_t items[1];
>> items[0].socket=clients;
>> items[0].events=ZMQ_POLLIN;
>> int currentWorker=-1,nextWorker=0;
>> int workerCount=1;
>> bool lastReceived=false;
>> do {
>> items[0].revents=0;
>> rc=zmq_poll(items,1,10);
>> if (rc<0) printf("Error in line %i\n",__LINE__ -1);
>> if (rc==0) continue;
>> if (items[0].revents) {
>> // we got a request
>> zmsg_t* request=zmsg_recv(items[0].socket);
>> if (request==0) printf("Error in line %i\n",__LINE__ -1);
>> zmsg_first(request);
>> zmsg_next(request);
>> zframe_t* data_f=zmsg_next(request);
>> char* data=(char*)zframe_data(data_f);
>> rc=zmsg_send(&request,workers);
>> if (request!=0) printf("Error in line %i\n",__LINE__ -1);
>> if (rc<0)
>> printf("error sending in router: %is
>> %s\n",errno,strerror(errno));
>> }
>> } while (workerCount>0 && !leave);
>> }
>> public:
>> static void* ctx;
>> static void* lauch_me(void*p) {
>> static_cast<Router*>(p)->goRouter();
>> return 0;
>> }
>> };
>>
>> class Worker {
>> public: volatile bool leave=false;
>> private:
>> void inline goWorker() {
>> void *router=zmq_socket(ctx, ZMQ_SUB);
>> sethwm(ZMQ_RCVHWM,router);
>> char instanceName[10];
>> memcpy(instanceName,"Worker000",10);
>> instanceName[6]+=instanceNumber/100;
>> instanceName[7]+=(instanceNumber%100)/10;
>> instanceName[8]+=instanceNumber%10;
>> int rc=zmq_setsockopt(router,ZMQ_SUBSCRIBE,0,0);
>> if (rc!=0) printf("Error in line %i\n",__LINE__ -1);
>> rc=zmq_connect(router,"tcp://127.0.0.1:10001");
>> if (rc!=0) printf("Error in line %i\n",__LINE__ -1);
>> zmq_pollitem_t items[1];
>> items[0].socket=router;
>> items[0].events=ZMQ_POLLIN;
>> int lastMessageNumber=-1,currentMessageNumber=-1;
>> bool leaveByMessage=false;
>> int OKcount=0;
>> while (!leave) {
>> items[0].revents=0;
>> rc=zmq_poll(items,1,10);
>> if (rc<0) printf("Error in line %i\n",__LINE__ -1);
>> if (rc==0) continue;
>> if (items[0].revents) {
>> zmsg_t* request=zmsg_recv(items[0].socket);
>> zframe_t *data_f=zmsg_first(request);
>> char *data=(char*)zframe_data(data_f);
>> if (strncmp(data,"Die!",4)==0) {
>> leave=true; leaveByMessage=true;
>> } else {
>> currentMessageNumber= *((int*)data);
>> if (currentMessageNumber == lastMessageNumber+1)
>> OKcount++;
>> else
>> printf("worker %i expected message %i, got
>> message %i\n",instanceNumber,lastMessageNumber+1,currentMessageNumber);
>> lastMessageNumber=currentMessageNumber;
>> }
>> }
>> }
>> printf("worker %i ending after %i good messages%s. last message
>> was %i \n",instanceNumber,OKcount,leaveByMessage?"":" by hard
>> termination",lastMessageNumber);
>> sleep(1);
>> rc=zmq_disconnect(router,"tcp://127.0.0.1:10001");
>> if (rc!=0) printf("Error in line %i\n",__LINE__ -1);
>> rc=zmq_close (router);
>> if (rc!=0) printf("Error in line %i\n",__LINE__ -1);
>> }
>> public:
>> byte instanceNumber;
>> static void* ctx;
>> static void* lauch_me(void*p) {
>> static_cast<Worker*>(p)->goWorker();
>> return 0;
>> }
>> };
>>
>> void* Router::ctx=0;
>> void* Worker::ctx=0;
>>
>> int main(int argc, char**argv) {
>> Router::ctx=zmq_ctx_new(); // create global context
>> zmq_ctx_set(Router::ctx,ZMQ_IO_THREADS,100);
>> Worker::ctx=Router::ctx; // use same context in worker and router
>>
>> Router *router=new Router();
>> pthread_t r_tid;
>> printf("starting Router ...\n");
>> int ret=pthread_create(&r_tid, 0, Router::lauch_me, router);
>> if (ret!=0) exit(1);
>> bool a=true;
>> // while (a) sleep (1);
>> sleep(1);
>> printf(" thread id %li\n",r_tid);
>>
>> Worker *workers[100];
>> pthread_t worker_ids[100];
>> printf("starting %i Workers ...\n",numWorkers);
>> for(int i=0; i<numWorkers; i++) {
>> workers[i]=new Worker();
>> workers[i]->instanceNumber=i;
>> int ret=pthread_create(&worker_ids[i], 0, Worker::lauch_me,
>> workers[i]);
>> if (ret!=0) exit(1);
>> }
>> printf("finished starting Workers ...\n");
>> sleep(10);
>>
>> printf("sending messages ...\n");
>>
>> void* socket=zmq_socket(Router::ctx,ZMQ_PUB);
>> sethwm(ZMQ_SNDHWM,socket);
>> zmq_connect(socket,"tcp://127.0.0.1:10000");
>> for (int i=0; i<10000; i++) {
>> // if (i%100==0) printf(".");
>> int rc;
>> int errorcount=0;
>> int *datas=(int*)malloc(4); *datas=i;
>> rc=zmq_send(socket,datas,sizeof(*datas),0);
>> if (rc!=4) printf("fehler beim senden von message %i\n",i);
>> }
>> printf("sending messages done\n");
>> int rc=zmq_send(socket,"Die!",4,0);
>> if (rc!=4) {
>> printf("cound not send termination message to workers, killing
>> them by static variable\n");
>> for (int i=0; i<numWorkers; i++) {
>> workers[i]->leave=true;
>> }
>> }
>> sleep(30);
>> router->leave=true;
>> void*result;
>> for (int i=0; i<numWorkers; i++) {
>> workers[i]->leave=true;
>> pthread_join(worker_ids[i],&result);
>> }
>> printf("all own thread have terminated\n");
>> pthread_join(r_tid,&result);
>> printf("router has terminated\n");
>> }
>>
>>
>>
>> printf("cound not send termination message to worker %i");
>>
>>
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
>> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
> _______________________________________________
> zeromq-dev mailing listzeromq-dev at lists.zeromq.orghttp://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20150307/02823b48/attachment.htm>
More information about the zeromq-dev
mailing list