[zeromq-dev] Error Using XSUB and XPUB in CZMQ

Pieter Hintjens ph at imatix.com
Mon Jul 7 17:05:06 CEST 2014


The subscribe error seems impossible; this would happen if you tried
to subscribe on the pipe, for instance, yet not if you subscribe on a
ZMQ_SUB socket.

Can you strip down that case to a minimal example?

On Mon, Jul 7, 2014 at 4:47 PM, Mellitus Ezeme
<mellitus.ezeme at mail.utoronto.ca> wrote:
> Hi,
>
> I tried using XSUB and XPUB to link two applications and I was surprised the
> socket creation for XSUB and XPUB kept returning error as I try to use it as
> a proxy. First, I made two of them bound to the endpoints of the frontend
> and backend respectively and then connected the PUB and SUB and this is the
> error I get from the server side.
>
>
> Comm publisher Socket creation succesful in the server.
>
> Power subscriber Socket creation succesful in the server.
> co_simServer: co_simServer.c:88: main: Assertion `sc == 0' failed.
> Aborted (core dumped)
>
>
> Below is the server code using CZMQ:
>
>
> // The server for the co-simulator
> // Contains both the Pub and Sub socket for the server
> // And the proxy queue
>
> #include "czmq.h"
>
> static void
> subscriber_thread (void *args, zctx_t *ctx, void *pipe)
> {
>     // Subscribe to Power input from client
>
>     void *subscriber = zsocket_new (ctx, ZMQ_SUB);
>     int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
>     if (rc != 0) {
>         printf("Power subscriber Socket creation failed in the server.\n");
>     }
>     else {
>         printf("Power subscriber Socket creation succesful in the
> server.\n");
>     }
>
>     zsocket_set_subscribe (subscriber, "P");
>
>     // Creating the number of messages it will receive
>
>     int count = 0;
>
>     while (count < 10) {
>
>     char *string = zstr_recv (subscriber);
>     char symbol;
>     float power_value, current_value;
>     if (!string)
>
>         break; // It was interrupted
>
>     sscanf (string, "%c %01f %01f",&symbol, &power_value, &current_value);
>     printf("The values are %01f watts and %01f amps for power and current
> respectively for %d count \n", power_value, current_value, count);
>
>     free(string);
>
>     count++;
>        }
>
>     zsocket_destroy (ctx, subscriber);
>   }
>
>     // The publisher sends messages starting with "C"
>
> static void
> publisher_thread (void *args, zctx_t *ctx, void *pipe)
> {
>         void *publisher = zsocket_new (ctx, ZMQ_PUB);
>
>         int rc = zsocket_connect(publisher, "tcp://localhost:6000");
>
>     if (rc != 0) {
>         printf("Comm publisher Socket creation failed in the server.\n");
>     }
>     else {
>         printf("Comm publisher Socket creation succesful in the server.\n");
>     }
>
>     while (!zctx_interrupted) {
>         char string[15];
>         int breaker_value = randof(5);
>         int fault_value = randof(10);
>         char symbol = 'C';
>         sprintf(string, "%c %d %d", symbol,breaker_value,fault_value);
>
>     if (zstr_send (publisher, string) == -1)
>         break;  // interrupted
>     zclock_sleep (100); // Wait for 1/100 seconds
>     }
>  }
>
> // Creating the default thread which will start the other threads
>
> int main (void)
> {
>     // Start the child threads
>     zctx_t *ctx = zctx_new();
>     zthread_fork (ctx, publisher_thread, NULL);
>     zthread_fork (ctx, subscriber_thread, NULL);
>
>     // Creating the proxy sockets
>     void *frontend = zsocket_new (ctx, ZMQ_XSUB);
>     int sc = zsocket_bind (frontend, "tcp://*:6000");
>     assert (sc == 0);
>
>     void *backend = zsocket_new (ctx, ZMQ_XPUB);
>     int pc = zsocket_bind (backend, "tcp://*:6001");
>     assert (pc == 0);
>
>     // calling the proxy fucntion
>     zmq_proxy (frontend, backend, NULL);
>
>     // Tell attached threads to exit
>     zctx_destroy (&ctx);
>     return 0;
> }
>
>
> And on the Client side, I have this error:
>
>
> Comm subscriber Socket creation failed in the client.
> Power publisher Socket creation succesful in the client.
> ZMQ_SUBSCRIBE is not valid on PAIR sockets
> co_SimClient: zsockopt.c:877: zsocket_set_subscribe: Assertion `0' failed.
> Aborted (core dumped)
>
>
> I am so surprised at the highlighted error because the document said we must
> always subscribe to receive a message and similar socket was created
> successfully on the server side.
>
> Below is the client code:
>
>
> // Contains both the Pub and Sub socket for the client
> //which is the power simulator
>
>
> #include "czmq.h"
>
> static void
> subscriber_thread (void *args, zctx_t *ctx, void *pipe)
> {
>     // Subscribe to control input from Comm simulator from server
>
>     void *subscriber = zsocket_new (ctx, ZMQ_SUB);
>     int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
>     if (rc != 0) {
>         printf("Comm subscriber Socket creation failed in the client.\n");
>     }
>     else {
>         printf("Comm subscriber Socket creation succesful in the
> client.\n");
>     }
>
>     // Error is being reported here but it works in server side
>     zsocket_set_subscribe (subscriber, "C");
>
>     // Creating the number of messages it will receive
>
>     int count = 0;
>
>     while (count < 10) {
>
>     char *string = zstr_recv (subscriber);
>     int breaker_value;
>     int fault_value;
>     char symbol;
>     if (!string)
>
>         break; // It was interrupted
>
>     sscanf (string, "%c %d %d",&symbol, &breaker_value, &fault_value);
>     printf("The values are %c symbol and %d for breaker and %d for fault for
> %d count \n", symbol, breaker_value, fault_value, count);
>
>     free(string);
>
>     count++;
>        }
>
>     zsocket_destroy (ctx, subscriber);
>   }
>
>     // The publisher sends messages starting with "P" to the server
>
> static void
> publisher_thread (void *args, zctx_t *ctx, void *pipe)
> {
>         void *publisher = zsocket_new (ctx, ZMQ_PUB);
>
>         int rc = zsocket_connect (publisher, "tcp://localhost:6000");
>
>     if (rc != 0) {
>         printf("Power publisher Socket creation failed in the client.\n");
>     }
>     else {
>         printf("Power publisher Socket creation succesful in the
> client.\n");
>     }
>
>     while (!zctx_interrupted) {
>         char string[15];
>         char symbol = 'P';
>         float power_value = randof(15);
>         float current_value = randof(20);
>         sprintf(string, "%c %01f %01f",symbol,power_value,current_value);
>
>     if (zstr_send (publisher, string) == -1)
>         break;  // interrupted
>     zclock_sleep (10); // Wait for 1/100 seconds
>     }
>  }
>
> // Creating the default thread which will start the other threads
>
> int main (void)
> {
>     // Start the child threads
>     zctx_t *ctx = zctx_new();
>     zthread_fork (ctx, publisher_thread, NULL);
>     zthread_fork (ctx, subscriber_thread, NULL);
>
>
>     // Tell attached threads to exit
>     zctx_destroy (&ctx);
>
>     return 0;
> }
>
>
> Any help will be highly appreciated.
>
> Mel.
>
>
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>



More information about the zeromq-dev mailing list