[zeromq-dev] Error Using XSUB and XPUB in CZMQ

Mellitus Ezeme mellitus.ezeme at mail.utoronto.ca
Wed Jul 9 05:02:19 CEST 2014


Hi,


Thanks very much for your reply. I am sorry am not yet an expert on this and had to bother you again. I have applied the correction you gave me but a new development occurred. I want the client and the server to talk to each other as long as I want but unfortunately, they only exchange the first values and the server crashes. I suspect the proxy but I cant explain it. The code snippet is below.


Server:


// The server for the co-simulator
// Contains both the Pub and Sub socket for the server
// And the proxy queue

#include "czmq.h"

static void
subscriber_thread (void *args, zctx_t *ctx, void *pipe)
{
    // Subscribe to Power input

    void *subscriber = zsocket_new (ctx, ZMQ_SUB);
    int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
    if (rc != 0) {
        printf("Power subscriber Socket creation failed in the server.\n");
    }
    else {
        printf("Power subscriber Socket creation succesful in the server.\n");
    }

    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "P", 1);

    // Creating the number of messages it will receive

    int count = 0;

    while (1) {

    char *string = zstr_recv (subscriber);
    char symbol;
    float power_value, current_value;
    if (!string)

        break; // It was interrupted

    sscanf (string, "%c %01f %01f",&symbol, &power_value, &current_value);
    printf("The values are %01f watts and %01f amps for %d count \n", power_value, current_value, count);

    free(string);

    count++;
       }

    zsocket_destroy (ctx, subscriber);
  }

    // The publisher sends messages starting with "C"

static void
publisher_thread (void *args, zctx_t *ctx, void *pipe)
{
        void *publisher = zsocket_new (ctx, ZMQ_PUB);

        int rc = zsocket_connect(publisher, "tcp://localhost:6000");

    if (rc != 0) {
        printf("Comm publisher Socket creation failed in the server.\n");
    }
    else {
        printf("Comm publisher Socket creation succesful in the server.\n");
    }

    while (!zctx_interrupted) {
        char string[15];
        int breaker_value = randof(5);
        int fault_value = randof(10);
        char symbol = 'C';
        sprintf(string, "%c %d %d", symbol,breaker_value,fault_value);

    if (zstr_send (publisher, string) == -1)
        break;  // interrupted

    //printf("%s is the value sent for this session\n", string);
    zclock_sleep (100); // Wait for 1/100 seconds
    }

    zsocket_destroy (ctx, publisher);
 }

// Creating the default thread which will start the other threads

int main (void)
{
    // Start the child threads
    zctx_t *ctx = zctx_new();


    zthread_fork (ctx, publisher_thread, NULL);
    zthread_fork (ctx, subscriber_thread, NULL);

    // Creating the proxy sockets
    void *frontend = zsocket_new (ctx, ZMQ_XSUB);
    zsocket_bind (frontend, "tcp://*:6000");
    //assert (sc == 0);

    void *backend = zsocket_new (ctx, ZMQ_XPUB);
    zsocket_bind (backend, "tcp://*:6001");
    //assert (pc == 0);

    // calling the proxy function
    zmq_proxy (frontend, backend, NULL);


    // Tell attached threads to exit
    zctx_destroy (&ctx);
    return 0;
}


And the client code:


// Contains both the Pub and Sub socket for the client
//which is the power simulator


#include "czmq.h"

static void
subscriber_thread (void *args, zctx_t *ctx, void *pipe)
{
    // Subscribe to control input from Comm simulator

    void *subscriber_client = zsocket_new (ctx, ZMQ_SUB);
    int rc = zsocket_connect (subscriber_client,"tcp://localhost:6001");
    if (rc != 0) {
        printf("Comm subscriber Socket creation failed in the client.\n");
    }
    else {
        printf("Comm subscriber Socket creation succesful in the client.\n");
    }

    zmq_setsockopt (subscriber_client, ZMQ_SUBSCRIBE, "C", 1);

    // Creating the number of messages it will receive

    int count = 0;

    while (1) {

    char *string = zstr_recv (subscriber_client);
    int breaker_value;
    int fault_value;
    char symbol;
    if (!string)

        break; // It was interrupted

    sscanf (string, "%c %d %d",&symbol, &breaker_value, &fault_value);
    printf("The values are %c symbol and %d for breaker and %d for fault for %d count \n", symbol, breaker_value, fault_value, count);

    free(string);

    count++;
       }

    zsocket_destroy (ctx, subscriber_client);
  }

    // The publisher sends messages starting with "P"

static void
publisher_thread (void *args, zctx_t *ctx, void *pipe)
{
        void *publisher_client = zsocket_new (ctx, ZMQ_PUB);

        int rc = zsocket_connect (publisher_client, "tcp://localhost:6000");

    if (rc != 0) {
        printf("Power publisher Socket creation failed in the client.\n");
    }
    else {
        printf("Power publisher Socket creation succesful in the client.\n");
    }

    while (!zctx_interrupted) {
        char string[15];
        char symbol = 'P';
        float power_value = randof(15);
        float current_value = randof(20);
        sprintf(string, "%c %01f %01f",symbol,power_value,current_value);

    if (zstr_send (publisher_client, string) == -1)
        break;  // interrupted
    zclock_sleep (100); // Wait for 1/1000 seconds
    }
    zsocket_destroy (ctx, publisher_client);
 }

// Creating the default thread which will start the other threads

int main (void)
{
    zctx_t *ctx = zctx_new();

    // Start the threads
    zthread_fork (ctx, publisher_thread, NULL);
    zthread_fork (ctx, subscriber_thread, NULL);

    while (1) {
     ;
    }

    // Tell attached threads to exit
    zctx_destroy (&ctx);

    return 0;
}


Once more, thanks for your help so far.


Mel.

________________________________
From: Procter, Stephen <Stephen.Procter at salik.ae>
Sent: Tuesday, July 8, 2014 12:25 AM
To: Mellitus Ezeme
Cc: ZeroMQ development list
Subject: RE: [zeromq-dev] Error Using XSUB and XPUB in CZMQ

Hi Mel,

The assertion in the server code is failing because a successful call to zsocket_bind returns the port number rather than the usual zero.

Regards,

Steve


From: zeromq-dev-bounces at lists.zeromq.org [mailto:zeromq-dev-bounces at lists.zeromq.org] On Behalf Of Mellitus Ezeme
Sent: Tuesday, July 08, 2014 3:20 AM
To: ZeroMQ development list
Subject: Re: [zeromq-dev] Error Using XSUB and XPUB in CZMQ


Hello,



Thanks very much. I have used a loop to regulate the client side. Do you have any idea what I can do on the server side to rectify the proxy problem that keeps failing?



Mel

________________________________
From: zeromq-dev-bounces at lists.zeromq.org<mailto:zeromq-dev-bounces at lists.zeromq.org> <zeromq-dev-bounces at lists.zeromq.org<mailto:zeromq-dev-bounces at lists.zeromq.org>> on behalf of Doron Somech <somdoron at gmail.com<mailto:somdoron at gmail.com>>
Sent: Monday, July 7, 2014 6:32 PM
To: ZeroMQ development list
Subject: Re: [zeromq-dev] Error Using XSUB and XPUB in CZMQ


On the client code your main method destroy the socket immediately after the fork of the publisher and subscriber.

Block the client main method.
On Jul 7, 2014 11:06 AM, "Pieter Hintjens" <ph at imatix.com<mailto:ph at imatix.com>> wrote:
The subscribe error seems impossible; this would happen if you tried
to subscribe on the pipe, for instance, yet not if you subscribe on a
ZMQ_SUB socket.

Can you strip down that case to a minimal example?

On Mon, Jul 7, 2014 at 4:47 PM, Mellitus Ezeme
<mellitus.ezeme at mail.utoronto.ca<mailto:mellitus.ezeme at mail.utoronto.ca>> wrote:
> Hi,
>
> I tried using XSUB and XPUB to link two applications and I was surprised the
> socket creation for XSUB and XPUB kept returning error as I try to use it as
> a proxy. First, I made two of them bound to the endpoints of the frontend
> and backend respectively and then connected the PUB and SUB and this is the
> error I get from the server side.
>
>
> Comm publisher Socket creation succesful in the server.
>
> Power subscriber Socket creation succesful in the server.
> co_simServer: co_simServer.c:88: main: Assertion `sc == 0' failed.
> Aborted (core dumped)
>
>
> Below is the server code using CZMQ:
>
>
> // The server for the co-simulator
> // Contains both the Pub and Sub socket for the server
> // And the proxy queue
>
> #include "czmq.h"
>
> static void
> subscriber_thread (void *args, zctx_t *ctx, void *pipe)
> {
>     // Subscribe to Power input from client
>
>     void *subscriber = zsocket_new (ctx, ZMQ_SUB);
>     int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
>     if (rc != 0) {
>         printf("Power subscriber Socket creation failed in the server.\n");
>     }
>     else {
>         printf("Power subscriber Socket creation succesful in the
> server.\n");
>     }
>
>     zsocket_set_subscribe (subscriber, "P");
>
>     // Creating the number of messages it will receive
>
>     int count = 0;
>
>     while (count < 10) {
>
>     char *string = zstr_recv (subscriber);
>     char symbol;
>     float power_value, current_value;
>     if (!string)
>
>         break; // It was interrupted
>
>     sscanf (string, "%c %01f %01f",&symbol, &power_value, &current_value);
>     printf("The values are %01f watts and %01f amps for power and current
> respectively for %d count \n", power_value, current_value, count);
>
>     free(string);
>
>     count++;
>        }
>
>     zsocket_destroy (ctx, subscriber);
>   }
>
>     // The publisher sends messages starting with "C"
>
> static void
> publisher_thread (void *args, zctx_t *ctx, void *pipe)
> {
>         void *publisher = zsocket_new (ctx, ZMQ_PUB);
>
>         int rc = zsocket_connect(publisher, "tcp://localhost:6000");
>
>     if (rc != 0) {
>         printf("Comm publisher Socket creation failed in the server.\n");
>     }
>     else {
>         printf("Comm publisher Socket creation succesful in the server.\n");
>     }
>
>     while (!zctx_interrupted) {
>         char string[15];
>         int breaker_value = randof(5);
>         int fault_value = randof(10);
>         char symbol = 'C';
>         sprintf(string, "%c %d %d", symbol,breaker_value,fault_value);
>
>     if (zstr_send (publisher, string) == -1)
>         break;  // interrupted
>     zclock_sleep (100); // Wait for 1/100 seconds
>     }
>  }
>
> // Creating the default thread which will start the other threads
>
> int main (void)
> {
>     // Start the child threads
>     zctx_t *ctx = zctx_new();
>     zthread_fork (ctx, publisher_thread, NULL);
>     zthread_fork (ctx, subscriber_thread, NULL);
>
>     // Creating the proxy sockets
>     void *frontend = zsocket_new (ctx, ZMQ_XSUB);
>     int sc = zsocket_bind (frontend, "tcp://*:6000");
>     assert (sc == 0);
>
>     void *backend = zsocket_new (ctx, ZMQ_XPUB);
>     int pc = zsocket_bind (backend, "tcp://*:6001");
>     assert (pc == 0);
>
>     // calling the proxy fucntion
>     zmq_proxy (frontend, backend, NULL);
>
>     // Tell attached threads to exit
>     zctx_destroy (&ctx);
>     return 0;
> }
>
>
> And on the Client side, I have this error:
>
>
> Comm subscriber Socket creation failed in the client.
> Power publisher Socket creation succesful in the client.
> ZMQ_SUBSCRIBE is not valid on PAIR sockets
> co_SimClient: zsockopt.c:877: zsocket_set_subscribe: Assertion `0' failed.
> Aborted (core dumped)
>
>
> I am so surprised at the highlighted error because the document said we must
> always subscribe to receive a message and similar socket was created
> successfully on the server side.
>
> Below is the client code:
>
>
> // Contains both the Pub and Sub socket for the client
> //which is the power simulator
>
>
> #include "czmq.h"
>
> static void
> subscriber_thread (void *args, zctx_t *ctx, void *pipe)
> {
>     // Subscribe to control input from Comm simulator from server
>
>     void *subscriber = zsocket_new (ctx, ZMQ_SUB);
>     int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
>     if (rc != 0) {
>         printf("Comm subscriber Socket creation failed in the client.\n");
>     }
>     else {
>         printf("Comm subscriber Socket creation succesful in the
> client.\n");
>     }
>
>     // Error is being reported here but it works in server side
>     zsocket_set_subscribe (subscriber, "C");
>
>     // Creating the number of messages it will receive
>
>     int count = 0;
>
>     while (count < 10) {
>
>     char *string = zstr_recv (subscriber);
>     int breaker_value;
>     int fault_value;
>     char symbol;
>     if (!string)
>
>         break; // It was interrupted
>
>     sscanf (string, "%c %d %d",&symbol, &breaker_value, &fault_value);
>     printf("The values are %c symbol and %d for breaker and %d for fault for
> %d count \n", symbol, breaker_value, fault_value, count);
>
>     free(string);
>
>     count++;
>        }
>
>     zsocket_destroy (ctx, subscriber);
>   }
>
>     // The publisher sends messages starting with "P" to the server
>
> static void
> publisher_thread (void *args, zctx_t *ctx, void *pipe)
> {
>         void *publisher = zsocket_new (ctx, ZMQ_PUB);
>
>         int rc = zsocket_connect (publisher, "tcp://localhost:6000");
>
>     if (rc != 0) {
>         printf("Power publisher Socket creation failed in the client.\n");
>     }
>     else {
>         printf("Power publisher Socket creation succesful in the
> client.\n");
>     }
>
>     while (!zctx_interrupted) {
>         char string[15];
>         char symbol = 'P';
>         float power_value = randof(15);
>         float current_value = randof(20);
>         sprintf(string, "%c %01f %01f",symbol,power_value,current_value);
>
>     if (zstr_send (publisher, string) == -1)
>         break;  // interrupted
>     zclock_sleep (10); // Wait for 1/100 seconds
>     }
>  }
>
> // Creating the default thread which will start the other threads
>
> int main (void)
> {
>     // Start the child threads
>     zctx_t *ctx = zctx_new();
>     zthread_fork (ctx, publisher_thread, NULL);
>     zthread_fork (ctx, subscriber_thread, NULL);
>
>
>     // Tell attached threads to exit
>     zctx_destroy (&ctx);
>
>     return 0;
> }
>
>
> Any help will be highly appreciated.
>
> Mel.
>
>
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org<mailto:zeromq-dev at lists.zeromq.org>
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
_______________________________________________
zeromq-dev mailing list
zeromq-dev at lists.zeromq.org<mailto:zeromq-dev at lists.zeromq.org>
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

________________________________
This transmission is intended solely for the person or organization to whom it is addressed. It may contain privileged and confidential information. If you are not the intended recipient, you should not copy, distribute or take any action in reliance on it. If you have received this transmission in error, please notify us immediately by e-mail at info at salik.ae
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20140709/317b4976/attachment.htm>


More information about the zeromq-dev mailing list