[zeromq-dev] FW: Error Using XSUB and XPUB in CZMQ

Doron Somech somdoron at gmail.com
Wed Jul 9 18:04:15 CEST 2014


My suggestion to you is to start over from scratch, do something very
simple and make it work, then add some more and make it work and continue
until you have what you want.


On Wed, Jul 9, 2014 at 3:32 PM, Mellitus Ezeme <
mellitus.ezeme at mail.utoronto.ca> wrote:

>  Greetings people,
>
>
>  Please, I am marginally close to making this project work and I believe
> this group can help me pull through this. The Server/Client now
> communicates but they only exchange one message and the server
> crashes(segmentation fault). The mail trail has the details and the code.
>
>
>  Mel.
>  ------------------------------
> *From:* zeromq-dev-bounces at lists.zeromq.org <
> zeromq-dev-bounces at lists.zeromq.org> on behalf of Mellitus Ezeme <
> mellitus.ezeme at mail.utoronto.ca>
> *Sent:* Tuesday, July 8, 2014 11:02 PM
> *To:* Procter, Stephen
> *Cc:* ZeroMQ development list
>
> *Subject:* Re: [zeromq-dev] Error Using XSUB and XPUB in CZMQ
>
>
> Hi,
>
>
>  Thanks very much for your reply. I am sorry am not yet an expert on this
> and had to bother you again. I have applied the correction you gave me but
> a new development occurred. I want the client and the server to talk to
> each other as long as I want but unfortunately, they only exchange the
> first values and the server crashes. I suspect the proxy but I cant explain
> it. The code snippet is below.
>
>
>  Server:
>
>
>  // The server for the co-simulator
> // Contains both the Pub and Sub socket for the server
> // And the proxy queue
>
> #include "czmq.h"
>
> static void
> subscriber_thread (void *args, zctx_t *ctx, void *pipe)
> {
>     // Subscribe to Power input
>
>     void *subscriber = zsocket_new (ctx, ZMQ_SUB);
>     int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
>     if (rc != 0) {
>         printf("Power subscriber Socket creation failed in the server.\n");
>     }
>     else {
>         printf("Power subscriber Socket creation succesful in the
> server.\n");
>     }
>
>     zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "P", 1);
>
>     // Creating the number of messages it will receive
>
>     int count = 0;
>
>     while (1) {
>
>     char *string = zstr_recv (subscriber);
>     char symbol;
>     float power_value, current_value;
>     if (!string)
>
>         break; // It was interrupted
>
>     sscanf (string, "%c %01f %01f",&symbol, &power_value, &current_value);
>     printf("The values are %01f watts and %01f amps for %d count \n",
> power_value, current_value, count);
>
>     free(string);
>
>     count++;
>        }
>
>     zsocket_destroy (ctx, subscriber);
>   }
>
>     // The publisher sends messages starting with "C"
>
> static void
> publisher_thread (void *args, zctx_t *ctx, void *pipe)
> {
>         void *publisher = zsocket_new (ctx, ZMQ_PUB);
>
>         int rc = zsocket_connect(publisher, "tcp://localhost:6000");
>
>     if (rc != 0) {
>         printf("Comm publisher Socket creation failed in the server.\n");
>     }
>     else {
>         printf("Comm publisher Socket creation succesful in the
> server.\n");
>     }
>
>     while (!zctx_interrupted) {
>         char string[15];
>         int breaker_value = randof(5);
>         int fault_value = randof(10);
>         char symbol = 'C';
>         sprintf(string, "%c %d %d", symbol,breaker_value,fault_value);
>
>     if (zstr_send (publisher, string) == -1)
>         break;  // interrupted
>
>     //printf("%s is the value sent for this session\n", string);
>     zclock_sleep (100); // Wait for 1/100 seconds
>     }
>
>     zsocket_destroy (ctx, publisher);
>  }
>
> // Creating the default thread which will start the other threads
>
> int main (void)
> {
>     // Start the child threads
>     zctx_t *ctx = zctx_new();
>
>
>     zthread_fork (ctx, publisher_thread, NULL);
>     zthread_fork (ctx, subscriber_thread, NULL);
>
>     // Creating the proxy sockets
>     void *frontend = zsocket_new (ctx, ZMQ_XSUB);
>     zsocket_bind (frontend, "tcp://*:6000");
>     //assert (sc == 0);
>
>     void *backend = zsocket_new (ctx, ZMQ_XPUB);
>     zsocket_bind (backend, "tcp://*:6001");
>     //assert (pc == 0);
>
>     // calling the proxy function
>     zmq_proxy (frontend, backend, NULL);
>
>
>     // Tell attached threads to exit
>     zctx_destroy (&ctx);
>     return 0;
> }
>
>
>  And the client code:
>
>
>  // Contains both the Pub and Sub socket for the client
> //which is the power simulator
>
>
> #include "czmq.h"
>
> static void
> subscriber_thread (void *args, zctx_t *ctx, void *pipe)
> {
>     // Subscribe to control input from Comm simulator
>
>     void *subscriber_client = zsocket_new (ctx, ZMQ_SUB);
>     int rc = zsocket_connect (subscriber_client,"tcp://localhost:6001");
>     if (rc != 0) {
>         printf("Comm subscriber Socket creation failed in the client.\n");
>     }
>     else {
>         printf("Comm subscriber Socket creation succesful in the
> client.\n");
>     }
>
>     zmq_setsockopt (subscriber_client, ZMQ_SUBSCRIBE, "C", 1);
>
>     // Creating the number of messages it will receive
>
>     int count = 0;
>
>     while (1) {
>
>     char *string = zstr_recv (subscriber_client);
>     int breaker_value;
>     int fault_value;
>     char symbol;
>     if (!string)
>
>         break; // It was interrupted
>
>     sscanf (string, "%c %d %d",&symbol, &breaker_value, &fault_value);
>     printf("The values are %c symbol and %d for breaker and %d for fault
> for %d count \n", symbol, breaker_value, fault_value, count);
>
>     free(string);
>
>     count++;
>        }
>
>     zsocket_destroy (ctx, subscriber_client);
>   }
>
>     // The publisher sends messages starting with "P"
>
> static void
> publisher_thread (void *args, zctx_t *ctx, void *pipe)
> {
>         void *publisher_client = zsocket_new (ctx, ZMQ_PUB);
>
>         int rc = zsocket_connect (publisher_client,
> "tcp://localhost:6000");
>
>     if (rc != 0) {
>         printf("Power publisher Socket creation failed in the client.\n");
>     }
>     else {
>         printf("Power publisher Socket creation succesful in the
> client.\n");
>     }
>
>     while (!zctx_interrupted) {
>         char string[15];
>         char symbol = 'P';
>         float power_value = randof(15);
>         float current_value = randof(20);
>         sprintf(string, "%c %01f %01f",symbol,power_value,current_value);
>
>     if (zstr_send (publisher_client, string) == -1)
>         break;  // interrupted
>     zclock_sleep (100); // Wait for 1/1000 seconds
>     }
>     zsocket_destroy (ctx, publisher_client);
>  }
>
> // Creating the default thread which will start the other threads
>
> int main (void)
> {
>     zctx_t *ctx = zctx_new();
>
>     // Start the threads
>     zthread_fork (ctx, publisher_thread, NULL);
>     zthread_fork (ctx, subscriber_thread, NULL);
>
>     while (1) {
>      ;
>     }
>
>     // Tell attached threads to exit
>     zctx_destroy (&ctx);
>
>     return 0;
> }
>
>
>  Once more, thanks for your help so far.
>
>
>  Mel.
>  ------------------------------
> *From:* Procter, Stephen <Stephen.Procter at salik.ae>
> *Sent:* Tuesday, July 8, 2014 12:25 AM
> *To:* Mellitus Ezeme
> *Cc:* ZeroMQ development list
> *Subject:* RE: [zeromq-dev] Error Using XSUB and XPUB in CZMQ
>
>
> Hi Mel,
>
>
>
> The assertion in the server code is failing because a successful call to zsocket_bind
> returns the port number rather than the usual zero.
>
>
>
> Regards,
>
>
>
> Steve
>
>
>
>
>
> *From:* zeromq-dev-bounces at lists.zeromq.org [mailto:
> zeromq-dev-bounces at lists.zeromq.org] *On Behalf Of *Mellitus Ezeme
> *Sent:* Tuesday, July 08, 2014 3:20 AM
> *To:* ZeroMQ development list
> *Subject:* Re: [zeromq-dev] Error Using XSUB and XPUB in CZMQ
>
>
>
> Hello,
>
>
>
> Thanks very much. I have used a loop to regulate the client side. Do you
> have any idea what I can do on the server side to rectify the proxy problem
> that keeps failing?
>
>
>
> Mel
>   ------------------------------
>
> *From:* zeromq-dev-bounces at lists.zeromq.org <
> zeromq-dev-bounces at lists.zeromq.org> on behalf of Doron Somech <
> somdoron at gmail.com>
> *Sent:* Monday, July 7, 2014 6:32 PM
> *To:* ZeroMQ development list
> *Subject:* Re: [zeromq-dev] Error Using XSUB and XPUB in CZMQ
>
>
>
> On the client code your main method destroy the socket immediately after
> the fork of the publisher and subscriber.
>
> Block the client main method.
>
> On Jul 7, 2014 11:06 AM, "Pieter Hintjens" <ph at imatix.com> wrote:
>
> The subscribe error seems impossible; this would happen if you tried
> to subscribe on the pipe, for instance, yet not if you subscribe on a
> ZMQ_SUB socket.
>
> Can you strip down that case to a minimal example?
>
> On Mon, Jul 7, 2014 at 4:47 PM, Mellitus Ezeme
> <mellitus.ezeme at mail.utoronto.ca> wrote:
> > Hi,
> >
> > I tried using XSUB and XPUB to link two applications and I was surprised
> the
> > socket creation for XSUB and XPUB kept returning error as I try to use
> it as
> > a proxy. First, I made two of them bound to the endpoints of the frontend
> > and backend respectively and then connected the PUB and SUB and this is
> the
> > error I get from the server side.
> >
> >
> > Comm publisher Socket creation succesful in the server.
> >
> > Power subscriber Socket creation succesful in the server.
> > co_simServer: co_simServer.c:88: main: Assertion `sc == 0' failed.
> > Aborted (core dumped)
> >
> >
> > Below is the server code using CZMQ:
> >
> >
> > // The server for the co-simulator
> > // Contains both the Pub and Sub socket for the server
> > // And the proxy queue
> >
> > #include "czmq.h"
> >
> > static void
> > subscriber_thread (void *args, zctx_t *ctx, void *pipe)
> > {
> >     // Subscribe to Power input from client
> >
> >     void *subscriber = zsocket_new (ctx, ZMQ_SUB);
> >     int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
> >     if (rc != 0) {
> >         printf("Power subscriber Socket creation failed in the
> server.\n");
> >     }
> >     else {
> >         printf("Power subscriber Socket creation succesful in the
> > server.\n");
> >     }
> >
> >     zsocket_set_subscribe (subscriber, "P");
> >
> >     // Creating the number of messages it will receive
> >
> >     int count = 0;
> >
> >     while (count < 10) {
> >
> >     char *string = zstr_recv (subscriber);
> >     char symbol;
> >     float power_value, current_value;
> >     if (!string)
> >
> >         break; // It was interrupted
> >
> >     sscanf (string, "%c %01f %01f",&symbol, &power_value,
> &current_value);
> >     printf("The values are %01f watts and %01f amps for power and current
> > respectively for %d count \n", power_value, current_value, count);
> >
> >     free(string);
> >
> >     count++;
> >        }
> >
> >     zsocket_destroy (ctx, subscriber);
> >   }
> >
> >     // The publisher sends messages starting with "C"
> >
> > static void
> > publisher_thread (void *args, zctx_t *ctx, void *pipe)
> > {
> >         void *publisher = zsocket_new (ctx, ZMQ_PUB);
> >
> >         int rc = zsocket_connect(publisher, "tcp://localhost:6000");
> >
> >     if (rc != 0) {
> >         printf("Comm publisher Socket creation failed in the server.\n");
> >     }
> >     else {
> >         printf("Comm publisher Socket creation succesful in the
> server.\n");
> >     }
> >
> >     while (!zctx_interrupted) {
> >         char string[15];
> >         int breaker_value = randof(5);
> >         int fault_value = randof(10);
> >         char symbol = 'C';
> >         sprintf(string, "%c %d %d", symbol,breaker_value,fault_value);
> >
> >     if (zstr_send (publisher, string) == -1)
> >         break;  // interrupted
> >     zclock_sleep (100); // Wait for 1/100 seconds
> >     }
> >  }
> >
> > // Creating the default thread which will start the other threads
> >
> > int main (void)
> > {
> >     // Start the child threads
> >     zctx_t *ctx = zctx_new();
> >     zthread_fork (ctx, publisher_thread, NULL);
> >     zthread_fork (ctx, subscriber_thread, NULL);
> >
> >     // Creating the proxy sockets
> >     void *frontend = zsocket_new (ctx, ZMQ_XSUB);
> >     int sc = zsocket_bind (frontend, "tcp://*:6000");
> >     assert (sc == 0);
> >
> >     void *backend = zsocket_new (ctx, ZMQ_XPUB);
> >     int pc = zsocket_bind (backend, "tcp://*:6001");
> >     assert (pc == 0);
> >
> >     // calling the proxy fucntion
> >     zmq_proxy (frontend, backend, NULL);
> >
> >     // Tell attached threads to exit
> >     zctx_destroy (&ctx);
> >     return 0;
> > }
> >
> >
> > And on the Client side, I have this error:
> >
> >
> > Comm subscriber Socket creation failed in the client.
> > Power publisher Socket creation succesful in the client.
> > ZMQ_SUBSCRIBE is not valid on PAIR sockets
> > co_SimClient: zsockopt.c:877: zsocket_set_subscribe: Assertion `0'
> failed.
> > Aborted (core dumped)
> >
> >
> > I am so surprised at the highlighted error because the document said we
> must
> > always subscribe to receive a message and similar socket was created
> > successfully on the server side.
> >
> > Below is the client code:
> >
> >
> > // Contains both the Pub and Sub socket for the client
> > //which is the power simulator
> >
> >
> > #include "czmq.h"
> >
> > static void
> > subscriber_thread (void *args, zctx_t *ctx, void *pipe)
> > {
> >     // Subscribe to control input from Comm simulator from server
> >
> >     void *subscriber = zsocket_new (ctx, ZMQ_SUB);
> >     int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
> >     if (rc != 0) {
> >         printf("Comm subscriber Socket creation failed in the
> client.\n");
> >     }
> >     else {
> >         printf("Comm subscriber Socket creation succesful in the
> > client.\n");
> >     }
> >
> >     // Error is being reported here but it works in server side
> >     zsocket_set_subscribe (subscriber, "C");
> >
> >     // Creating the number of messages it will receive
> >
> >     int count = 0;
> >
> >     while (count < 10) {
> >
> >     char *string = zstr_recv (subscriber);
> >     int breaker_value;
> >     int fault_value;
> >     char symbol;
> >     if (!string)
> >
> >         break; // It was interrupted
> >
> >     sscanf (string, "%c %d %d",&symbol, &breaker_value, &fault_value);
> >     printf("The values are %c symbol and %d for breaker and %d for fault
> for
> > %d count \n", symbol, breaker_value, fault_value, count);
> >
> >     free(string);
> >
> >     count++;
> >        }
> >
> >     zsocket_destroy (ctx, subscriber);
> >   }
> >
> >     // The publisher sends messages starting with "P" to the server
> >
> > static void
> > publisher_thread (void *args, zctx_t *ctx, void *pipe)
> > {
> >         void *publisher = zsocket_new (ctx, ZMQ_PUB);
> >
> >         int rc = zsocket_connect (publisher, "tcp://localhost:6000");
> >
> >     if (rc != 0) {
> >         printf("Power publisher Socket creation failed in the
> client.\n");
> >     }
> >     else {
> >         printf("Power publisher Socket creation succesful in the
> > client.\n");
> >     }
> >
> >     while (!zctx_interrupted) {
> >         char string[15];
> >         char symbol = 'P';
> >         float power_value = randof(15);
> >         float current_value = randof(20);
> >         sprintf(string, "%c %01f %01f",symbol,power_value,current_value);
> >
> >     if (zstr_send (publisher, string) == -1)
> >         break;  // interrupted
> >     zclock_sleep (10); // Wait for 1/100 seconds
> >     }
> >  }
> >
> > // Creating the default thread which will start the other threads
> >
> > int main (void)
> > {
> >     // Start the child threads
> >     zctx_t *ctx = zctx_new();
> >     zthread_fork (ctx, publisher_thread, NULL);
> >     zthread_fork (ctx, subscriber_thread, NULL);
> >
> >
> >     // Tell attached threads to exit
> >     zctx_destroy (&ctx);
> >
> >     return 0;
> > }
> >
> >
> > Any help will be highly appreciated.
> >
> > Mel.
> >
> >
> >
> >
> >
> > _______________________________________________
> > zeromq-dev mailing list
> > zeromq-dev at lists.zeromq.org
> > http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> >
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
> ------------------------------
> This transmission is intended solely for the person or organization to
> whom it is addressed. It may contain privileged and confidential
> information. If you are not the intended recipient, you should not copy,
> distribute or take any action in reliance on it. If you have received this
> transmission in error, please notify us immediately by e-mail at
> info at salik.ae
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20140709/ede56c97/attachment.htm>


More information about the zeromq-dev mailing list