[zeromq-dev] Error Using XSUB and XPUB in CZMQ

Doron Somech somdoron at gmail.com
Tue Jul 8 00:32:05 CEST 2014


On the client code your main method destroy the socket immediately after
the fork of the publisher and subscriber.

Block the client main method.
On Jul 7, 2014 11:06 AM, "Pieter Hintjens" <ph at imatix.com> wrote:

> The subscribe error seems impossible; this would happen if you tried
> to subscribe on the pipe, for instance, yet not if you subscribe on a
> ZMQ_SUB socket.
>
> Can you strip down that case to a minimal example?
>
> On Mon, Jul 7, 2014 at 4:47 PM, Mellitus Ezeme
> <mellitus.ezeme at mail.utoronto.ca> wrote:
> > Hi,
> >
> > I tried using XSUB and XPUB to link two applications and I was surprised
> the
> > socket creation for XSUB and XPUB kept returning error as I try to use
> it as
> > a proxy. First, I made two of them bound to the endpoints of the frontend
> > and backend respectively and then connected the PUB and SUB and this is
> the
> > error I get from the server side.
> >
> >
> > Comm publisher Socket creation succesful in the server.
> >
> > Power subscriber Socket creation succesful in the server.
> > co_simServer: co_simServer.c:88: main: Assertion `sc == 0' failed.
> > Aborted (core dumped)
> >
> >
> > Below is the server code using CZMQ:
> >
> >
> > // The server for the co-simulator
> > // Contains both the Pub and Sub socket for the server
> > // And the proxy queue
> >
> > #include "czmq.h"
> >
> > static void
> > subscriber_thread (void *args, zctx_t *ctx, void *pipe)
> > {
> >     // Subscribe to Power input from client
> >
> >     void *subscriber = zsocket_new (ctx, ZMQ_SUB);
> >     int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
> >     if (rc != 0) {
> >         printf("Power subscriber Socket creation failed in the
> server.\n");
> >     }
> >     else {
> >         printf("Power subscriber Socket creation succesful in the
> > server.\n");
> >     }
> >
> >     zsocket_set_subscribe (subscriber, "P");
> >
> >     // Creating the number of messages it will receive
> >
> >     int count = 0;
> >
> >     while (count < 10) {
> >
> >     char *string = zstr_recv (subscriber);
> >     char symbol;
> >     float power_value, current_value;
> >     if (!string)
> >
> >         break; // It was interrupted
> >
> >     sscanf (string, "%c %01f %01f",&symbol, &power_value,
> &current_value);
> >     printf("The values are %01f watts and %01f amps for power and current
> > respectively for %d count \n", power_value, current_value, count);
> >
> >     free(string);
> >
> >     count++;
> >        }
> >
> >     zsocket_destroy (ctx, subscriber);
> >   }
> >
> >     // The publisher sends messages starting with "C"
> >
> > static void
> > publisher_thread (void *args, zctx_t *ctx, void *pipe)
> > {
> >         void *publisher = zsocket_new (ctx, ZMQ_PUB);
> >
> >         int rc = zsocket_connect(publisher, "tcp://localhost:6000");
> >
> >     if (rc != 0) {
> >         printf("Comm publisher Socket creation failed in the server.\n");
> >     }
> >     else {
> >         printf("Comm publisher Socket creation succesful in the
> server.\n");
> >     }
> >
> >     while (!zctx_interrupted) {
> >         char string[15];
> >         int breaker_value = randof(5);
> >         int fault_value = randof(10);
> >         char symbol = 'C';
> >         sprintf(string, "%c %d %d", symbol,breaker_value,fault_value);
> >
> >     if (zstr_send (publisher, string) == -1)
> >         break;  // interrupted
> >     zclock_sleep (100); // Wait for 1/100 seconds
> >     }
> >  }
> >
> > // Creating the default thread which will start the other threads
> >
> > int main (void)
> > {
> >     // Start the child threads
> >     zctx_t *ctx = zctx_new();
> >     zthread_fork (ctx, publisher_thread, NULL);
> >     zthread_fork (ctx, subscriber_thread, NULL);
> >
> >     // Creating the proxy sockets
> >     void *frontend = zsocket_new (ctx, ZMQ_XSUB);
> >     int sc = zsocket_bind (frontend, "tcp://*:6000");
> >     assert (sc == 0);
> >
> >     void *backend = zsocket_new (ctx, ZMQ_XPUB);
> >     int pc = zsocket_bind (backend, "tcp://*:6001");
> >     assert (pc == 0);
> >
> >     // calling the proxy fucntion
> >     zmq_proxy (frontend, backend, NULL);
> >
> >     // Tell attached threads to exit
> >     zctx_destroy (&ctx);
> >     return 0;
> > }
> >
> >
> > And on the Client side, I have this error:
> >
> >
> > Comm subscriber Socket creation failed in the client.
> > Power publisher Socket creation succesful in the client.
> > ZMQ_SUBSCRIBE is not valid on PAIR sockets
> > co_SimClient: zsockopt.c:877: zsocket_set_subscribe: Assertion `0'
> failed.
> > Aborted (core dumped)
> >
> >
> > I am so surprised at the highlighted error because the document said we
> must
> > always subscribe to receive a message and similar socket was created
> > successfully on the server side.
> >
> > Below is the client code:
> >
> >
> > // Contains both the Pub and Sub socket for the client
> > //which is the power simulator
> >
> >
> > #include "czmq.h"
> >
> > static void
> > subscriber_thread (void *args, zctx_t *ctx, void *pipe)
> > {
> >     // Subscribe to control input from Comm simulator from server
> >
> >     void *subscriber = zsocket_new (ctx, ZMQ_SUB);
> >     int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
> >     if (rc != 0) {
> >         printf("Comm subscriber Socket creation failed in the
> client.\n");
> >     }
> >     else {
> >         printf("Comm subscriber Socket creation succesful in the
> > client.\n");
> >     }
> >
> >     // Error is being reported here but it works in server side
> >     zsocket_set_subscribe (subscriber, "C");
> >
> >     // Creating the number of messages it will receive
> >
> >     int count = 0;
> >
> >     while (count < 10) {
> >
> >     char *string = zstr_recv (subscriber);
> >     int breaker_value;
> >     int fault_value;
> >     char symbol;
> >     if (!string)
> >
> >         break; // It was interrupted
> >
> >     sscanf (string, "%c %d %d",&symbol, &breaker_value, &fault_value);
> >     printf("The values are %c symbol and %d for breaker and %d for fault
> for
> > %d count \n", symbol, breaker_value, fault_value, count);
> >
> >     free(string);
> >
> >     count++;
> >        }
> >
> >     zsocket_destroy (ctx, subscriber);
> >   }
> >
> >     // The publisher sends messages starting with "P" to the server
> >
> > static void
> > publisher_thread (void *args, zctx_t *ctx, void *pipe)
> > {
> >         void *publisher = zsocket_new (ctx, ZMQ_PUB);
> >
> >         int rc = zsocket_connect (publisher, "tcp://localhost:6000");
> >
> >     if (rc != 0) {
> >         printf("Power publisher Socket creation failed in the
> client.\n");
> >     }
> >     else {
> >         printf("Power publisher Socket creation succesful in the
> > client.\n");
> >     }
> >
> >     while (!zctx_interrupted) {
> >         char string[15];
> >         char symbol = 'P';
> >         float power_value = randof(15);
> >         float current_value = randof(20);
> >         sprintf(string, "%c %01f %01f",symbol,power_value,current_value);
> >
> >     if (zstr_send (publisher, string) == -1)
> >         break;  // interrupted
> >     zclock_sleep (10); // Wait for 1/100 seconds
> >     }
> >  }
> >
> > // Creating the default thread which will start the other threads
> >
> > int main (void)
> > {
> >     // Start the child threads
> >     zctx_t *ctx = zctx_new();
> >     zthread_fork (ctx, publisher_thread, NULL);
> >     zthread_fork (ctx, subscriber_thread, NULL);
> >
> >
> >     // Tell attached threads to exit
> >     zctx_destroy (&ctx);
> >
> >     return 0;
> > }
> >
> >
> > Any help will be highly appreciated.
> >
> > Mel.
> >
> >
> >
> >
> >
> > _______________________________________________
> > zeromq-dev mailing list
> > zeromq-dev at lists.zeromq.org
> > http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> >
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20140708/af9518be/attachment.htm>


More information about the zeromq-dev mailing list