[zeromq-dev] Error Using XSUB and XPUB in CZMQ
Pieter Hintjens
ph at imatix.com
Mon Jul 7 17:05:06 CEST 2014
The subscribe error seems impossible; this would happen if you tried
to subscribe on the pipe, for instance, yet not if you subscribe on a
ZMQ_SUB socket.
Can you strip down that case to a minimal example?
On Mon, Jul 7, 2014 at 4:47 PM, Mellitus Ezeme
<mellitus.ezeme at mail.utoronto.ca> wrote:
> Hi,
>
> I tried using XSUB and XPUB to link two applications and I was surprised the
> socket creation for XSUB and XPUB kept returning error as I try to use it as
> a proxy. First, I made two of them bound to the endpoints of the frontend
> and backend respectively and then connected the PUB and SUB and this is the
> error I get from the server side.
>
>
> Comm publisher Socket creation succesful in the server.
>
> Power subscriber Socket creation succesful in the server.
> co_simServer: co_simServer.c:88: main: Assertion `sc == 0' failed.
> Aborted (core dumped)
>
>
> Below is the server code using CZMQ:
>
>
> // The server for the co-simulator
> // Contains both the Pub and Sub socket for the server
> // And the proxy queue
>
> #include "czmq.h"
>
> static void
> subscriber_thread (void *args, zctx_t *ctx, void *pipe)
> {
> // Subscribe to Power input from client
>
> void *subscriber = zsocket_new (ctx, ZMQ_SUB);
> int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
> if (rc != 0) {
> printf("Power subscriber Socket creation failed in the server.\n");
> }
> else {
> printf("Power subscriber Socket creation succesful in the
> server.\n");
> }
>
> zsocket_set_subscribe (subscriber, "P");
>
> // Creating the number of messages it will receive
>
> int count = 0;
>
> while (count < 10) {
>
> char *string = zstr_recv (subscriber);
> char symbol;
> float power_value, current_value;
> if (!string)
>
> break; // It was interrupted
>
> sscanf (string, "%c %01f %01f",&symbol, &power_value, ¤t_value);
> printf("The values are %01f watts and %01f amps for power and current
> respectively for %d count \n", power_value, current_value, count);
>
> free(string);
>
> count++;
> }
>
> zsocket_destroy (ctx, subscriber);
> }
>
> // The publisher sends messages starting with "C"
>
> static void
> publisher_thread (void *args, zctx_t *ctx, void *pipe)
> {
> void *publisher = zsocket_new (ctx, ZMQ_PUB);
>
> int rc = zsocket_connect(publisher, "tcp://localhost:6000");
>
> if (rc != 0) {
> printf("Comm publisher Socket creation failed in the server.\n");
> }
> else {
> printf("Comm publisher Socket creation succesful in the server.\n");
> }
>
> while (!zctx_interrupted) {
> char string[15];
> int breaker_value = randof(5);
> int fault_value = randof(10);
> char symbol = 'C';
> sprintf(string, "%c %d %d", symbol,breaker_value,fault_value);
>
> if (zstr_send (publisher, string) == -1)
> break; // interrupted
> zclock_sleep (100); // Wait for 1/100 seconds
> }
> }
>
> // Creating the default thread which will start the other threads
>
> int main (void)
> {
> // Start the child threads
> zctx_t *ctx = zctx_new();
> zthread_fork (ctx, publisher_thread, NULL);
> zthread_fork (ctx, subscriber_thread, NULL);
>
> // Creating the proxy sockets
> void *frontend = zsocket_new (ctx, ZMQ_XSUB);
> int sc = zsocket_bind (frontend, "tcp://*:6000");
> assert (sc == 0);
>
> void *backend = zsocket_new (ctx, ZMQ_XPUB);
> int pc = zsocket_bind (backend, "tcp://*:6001");
> assert (pc == 0);
>
> // calling the proxy fucntion
> zmq_proxy (frontend, backend, NULL);
>
> // Tell attached threads to exit
> zctx_destroy (&ctx);
> return 0;
> }
>
>
> And on the Client side, I have this error:
>
>
> Comm subscriber Socket creation failed in the client.
> Power publisher Socket creation succesful in the client.
> ZMQ_SUBSCRIBE is not valid on PAIR sockets
> co_SimClient: zsockopt.c:877: zsocket_set_subscribe: Assertion `0' failed.
> Aborted (core dumped)
>
>
> I am so surprised at the highlighted error because the document said we must
> always subscribe to receive a message and similar socket was created
> successfully on the server side.
>
> Below is the client code:
>
>
> // Contains both the Pub and Sub socket for the client
> //which is the power simulator
>
>
> #include "czmq.h"
>
> static void
> subscriber_thread (void *args, zctx_t *ctx, void *pipe)
> {
> // Subscribe to control input from Comm simulator from server
>
> void *subscriber = zsocket_new (ctx, ZMQ_SUB);
> int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
> if (rc != 0) {
> printf("Comm subscriber Socket creation failed in the client.\n");
> }
> else {
> printf("Comm subscriber Socket creation succesful in the
> client.\n");
> }
>
> // Error is being reported here but it works in server side
> zsocket_set_subscribe (subscriber, "C");
>
> // Creating the number of messages it will receive
>
> int count = 0;
>
> while (count < 10) {
>
> char *string = zstr_recv (subscriber);
> int breaker_value;
> int fault_value;
> char symbol;
> if (!string)
>
> break; // It was interrupted
>
> sscanf (string, "%c %d %d",&symbol, &breaker_value, &fault_value);
> printf("The values are %c symbol and %d for breaker and %d for fault for
> %d count \n", symbol, breaker_value, fault_value, count);
>
> free(string);
>
> count++;
> }
>
> zsocket_destroy (ctx, subscriber);
> }
>
> // The publisher sends messages starting with "P" to the server
>
> static void
> publisher_thread (void *args, zctx_t *ctx, void *pipe)
> {
> void *publisher = zsocket_new (ctx, ZMQ_PUB);
>
> int rc = zsocket_connect (publisher, "tcp://localhost:6000");
>
> if (rc != 0) {
> printf("Power publisher Socket creation failed in the client.\n");
> }
> else {
> printf("Power publisher Socket creation succesful in the
> client.\n");
> }
>
> while (!zctx_interrupted) {
> char string[15];
> char symbol = 'P';
> float power_value = randof(15);
> float current_value = randof(20);
> sprintf(string, "%c %01f %01f",symbol,power_value,current_value);
>
> if (zstr_send (publisher, string) == -1)
> break; // interrupted
> zclock_sleep (10); // Wait for 1/100 seconds
> }
> }
>
> // Creating the default thread which will start the other threads
>
> int main (void)
> {
> // Start the child threads
> zctx_t *ctx = zctx_new();
> zthread_fork (ctx, publisher_thread, NULL);
> zthread_fork (ctx, subscriber_thread, NULL);
>
>
> // Tell attached threads to exit
> zctx_destroy (&ctx);
>
> return 0;
> }
>
>
> Any help will be highly appreciated.
>
> Mel.
>
>
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
More information about the zeromq-dev
mailing list