[zeromq-dev] Error Using XSUB and XPUB in CZMQ

Mellitus Ezeme mellitus.ezeme at mail.utoronto.ca
Mon Jul 7 16:47:08 CEST 2014


Hi,

I tried using XSUB and XPUB to link two applications and I was surprised the socket creation for XSUB and XPUB kept returning error as I try to use it as a proxy. First, I made two of them bound to the endpoints of the frontend and backend respectively and then connected the PUB and SUB and this is the error I get from the server side.


Comm publisher Socket creation succesful in the server.

Power subscriber Socket creation succesful in the server.
co_simServer: co_simServer.c:88: main: Assertion `sc == 0' failed.
Aborted (core dumped)


Below is the server code using CZMQ:


// The server for the co-simulator
// Contains both the Pub and Sub socket for the server
// And the proxy queue

#include "czmq.h"

static void
subscriber_thread (void *args, zctx_t *ctx, void *pipe)
{
    // Subscribe to Power input from client

    void *subscriber = zsocket_new (ctx, ZMQ_SUB);
    int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
    if (rc != 0) {
        printf("Power subscriber Socket creation failed in the server.\n");
    }
    else {
        printf("Power subscriber Socket creation succesful in the server.\n");
    }

    zsocket_set_subscribe (subscriber, "P");

    // Creating the number of messages it will receive

    int count = 0;

    while (count < 10) {

    char *string = zstr_recv (subscriber);
    char symbol;
    float power_value, current_value;
    if (!string)

        break; // It was interrupted

    sscanf (string, "%c %01f %01f",&symbol, &power_value, &current_value);
    printf("The values are %01f watts and %01f amps for power and current respectively for %d count \n", power_value, current_value, count);

    free(string);

    count++;
       }

    zsocket_destroy (ctx, subscriber);
  }

    // The publisher sends messages starting with "C"

static void
publisher_thread (void *args, zctx_t *ctx, void *pipe)
{
        void *publisher = zsocket_new (ctx, ZMQ_PUB);

        int rc = zsocket_connect(publisher, "tcp://localhost:6000");

    if (rc != 0) {
        printf("Comm publisher Socket creation failed in the server.\n");
    }
    else {
        printf("Comm publisher Socket creation succesful in the server.\n");
    }

    while (!zctx_interrupted) {
        char string[15];
        int breaker_value = randof(5);
        int fault_value = randof(10);
        char symbol = 'C';
        sprintf(string, "%c %d %d", symbol,breaker_value,fault_value);

    if (zstr_send (publisher, string) == -1)
        break;  // interrupted
    zclock_sleep (100); // Wait for 1/100 seconds
    }
 }

// Creating the default thread which will start the other threads

int main (void)
{
    // Start the child threads
    zctx_t *ctx = zctx_new();
    zthread_fork (ctx, publisher_thread, NULL);
    zthread_fork (ctx, subscriber_thread, NULL);

    // Creating the proxy sockets
    void *frontend = zsocket_new (ctx, ZMQ_XSUB);
    int sc = zsocket_bind (frontend, "tcp://*:6000");
    assert (sc == 0);

    void *backend = zsocket_new (ctx, ZMQ_XPUB);
    int pc = zsocket_bind (backend, "tcp://*:6001");
    assert (pc == 0);

    // calling the proxy fucntion
    zmq_proxy (frontend, backend, NULL);

    // Tell attached threads to exit
    zctx_destroy (&ctx);
    return 0;
}


And on the Client side, I have this error:


Comm subscriber Socket creation failed in the client.
Power publisher Socket creation succesful in the client.
ZMQ_SUBSCRIBE is not valid on PAIR sockets
co_SimClient: zsockopt.c:877: zsocket_set_subscribe: Assertion `0' failed.
Aborted (core dumped)


I am so surprised at the highlighted error because the document said we must always subscribe to receive a message and similar socket was created successfully on the server side.

Below is the client code:


// Contains both the Pub and Sub socket for the client
//which is the power simulator


#include "czmq.h"

static void
subscriber_thread (void *args, zctx_t *ctx, void *pipe)
{
    // Subscribe to control input from Comm simulator from server

    void *subscriber = zsocket_new (ctx, ZMQ_SUB);
    int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
    if (rc != 0) {
        printf("Comm subscriber Socket creation failed in the client.\n");
    }
    else {
        printf("Comm subscriber Socket creation succesful in the client.\n");
    }

    // Error is being reported here but it works in server side
    zsocket_set_subscribe (subscriber, "C");

    // Creating the number of messages it will receive

    int count = 0;

    while (count < 10) {

    char *string = zstr_recv (subscriber);
    int breaker_value;
    int fault_value;
    char symbol;
    if (!string)

        break; // It was interrupted

    sscanf (string, "%c %d %d",&symbol, &breaker_value, &fault_value);
    printf("The values are %c symbol and %d for breaker and %d for fault for %d count \n", symbol, breaker_value, fault_value, count);

    free(string);

    count++;
       }

    zsocket_destroy (ctx, subscriber);
  }

    // The publisher sends messages starting with "P" to the server

static void
publisher_thread (void *args, zctx_t *ctx, void *pipe)
{
        void *publisher = zsocket_new (ctx, ZMQ_PUB);

        int rc = zsocket_connect (publisher, "tcp://localhost:6000");

    if (rc != 0) {
        printf("Power publisher Socket creation failed in the client.\n");
    }
    else {
        printf("Power publisher Socket creation succesful in the client.\n");
    }

    while (!zctx_interrupted) {
        char string[15];
        char symbol = 'P';
        float power_value = randof(15);
        float current_value = randof(20);
        sprintf(string, "%c %01f %01f",symbol,power_value,current_value);

    if (zstr_send (publisher, string) == -1)
        break;  // interrupted
    zclock_sleep (10); // Wait for 1/100 seconds
    }
 }

// Creating the default thread which will start the other threads

int main (void)
{
    // Start the child threads
    zctx_t *ctx = zctx_new();
    zthread_fork (ctx, publisher_thread, NULL);
    zthread_fork (ctx, subscriber_thread, NULL);


    // Tell attached threads to exit
    zctx_destroy (&ctx);

    return 0;
}


Any help will be highly appreciated.

Mel.



-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20140707/016ef6b5/attachment.htm>


More information about the zeromq-dev mailing list