[zeromq-dev] Error Using XSUB and XPUB in CZMQ
Mellitus Ezeme
mellitus.ezeme at mail.utoronto.ca
Mon Jul 7 16:47:08 CEST 2014
Hi,
I tried using XSUB and XPUB to link two applications and I was surprised the socket creation for XSUB and XPUB kept returning error as I try to use it as a proxy. First, I made two of them bound to the endpoints of the frontend and backend respectively and then connected the PUB and SUB and this is the error I get from the server side.
Comm publisher Socket creation succesful in the server.
Power subscriber Socket creation succesful in the server.
co_simServer: co_simServer.c:88: main: Assertion `sc == 0' failed.
Aborted (core dumped)
Below is the server code using CZMQ:
// The server for the co-simulator
// Contains both the Pub and Sub socket for the server
// And the proxy queue
#include "czmq.h"
static void
subscriber_thread (void *args, zctx_t *ctx, void *pipe)
{
// Subscribe to Power input from client
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
if (rc != 0) {
printf("Power subscriber Socket creation failed in the server.\n");
}
else {
printf("Power subscriber Socket creation succesful in the server.\n");
}
zsocket_set_subscribe (subscriber, "P");
// Creating the number of messages it will receive
int count = 0;
while (count < 10) {
char *string = zstr_recv (subscriber);
char symbol;
float power_value, current_value;
if (!string)
break; // It was interrupted
sscanf (string, "%c %01f %01f",&symbol, &power_value, ¤t_value);
printf("The values are %01f watts and %01f amps for power and current respectively for %d count \n", power_value, current_value, count);
free(string);
count++;
}
zsocket_destroy (ctx, subscriber);
}
// The publisher sends messages starting with "C"
static void
publisher_thread (void *args, zctx_t *ctx, void *pipe)
{
void *publisher = zsocket_new (ctx, ZMQ_PUB);
int rc = zsocket_connect(publisher, "tcp://localhost:6000");
if (rc != 0) {
printf("Comm publisher Socket creation failed in the server.\n");
}
else {
printf("Comm publisher Socket creation succesful in the server.\n");
}
while (!zctx_interrupted) {
char string[15];
int breaker_value = randof(5);
int fault_value = randof(10);
char symbol = 'C';
sprintf(string, "%c %d %d", symbol,breaker_value,fault_value);
if (zstr_send (publisher, string) == -1)
break; // interrupted
zclock_sleep (100); // Wait for 1/100 seconds
}
}
// Creating the default thread which will start the other threads
int main (void)
{
// Start the child threads
zctx_t *ctx = zctx_new();
zthread_fork (ctx, publisher_thread, NULL);
zthread_fork (ctx, subscriber_thread, NULL);
// Creating the proxy sockets
void *frontend = zsocket_new (ctx, ZMQ_XSUB);
int sc = zsocket_bind (frontend, "tcp://*:6000");
assert (sc == 0);
void *backend = zsocket_new (ctx, ZMQ_XPUB);
int pc = zsocket_bind (backend, "tcp://*:6001");
assert (pc == 0);
// calling the proxy fucntion
zmq_proxy (frontend, backend, NULL);
// Tell attached threads to exit
zctx_destroy (&ctx);
return 0;
}
And on the Client side, I have this error:
Comm subscriber Socket creation failed in the client.
Power publisher Socket creation succesful in the client.
ZMQ_SUBSCRIBE is not valid on PAIR sockets
co_SimClient: zsockopt.c:877: zsocket_set_subscribe: Assertion `0' failed.
Aborted (core dumped)
I am so surprised at the highlighted error because the document said we must always subscribe to receive a message and similar socket was created successfully on the server side.
Below is the client code:
// Contains both the Pub and Sub socket for the client
//which is the power simulator
#include "czmq.h"
static void
subscriber_thread (void *args, zctx_t *ctx, void *pipe)
{
// Subscribe to control input from Comm simulator from server
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
int rc = zsocket_connect (subscriber,"tcp://localhost:6001");
if (rc != 0) {
printf("Comm subscriber Socket creation failed in the client.\n");
}
else {
printf("Comm subscriber Socket creation succesful in the client.\n");
}
// Error is being reported here but it works in server side
zsocket_set_subscribe (subscriber, "C");
// Creating the number of messages it will receive
int count = 0;
while (count < 10) {
char *string = zstr_recv (subscriber);
int breaker_value;
int fault_value;
char symbol;
if (!string)
break; // It was interrupted
sscanf (string, "%c %d %d",&symbol, &breaker_value, &fault_value);
printf("The values are %c symbol and %d for breaker and %d for fault for %d count \n", symbol, breaker_value, fault_value, count);
free(string);
count++;
}
zsocket_destroy (ctx, subscriber);
}
// The publisher sends messages starting with "P" to the server
static void
publisher_thread (void *args, zctx_t *ctx, void *pipe)
{
void *publisher = zsocket_new (ctx, ZMQ_PUB);
int rc = zsocket_connect (publisher, "tcp://localhost:6000");
if (rc != 0) {
printf("Power publisher Socket creation failed in the client.\n");
}
else {
printf("Power publisher Socket creation succesful in the client.\n");
}
while (!zctx_interrupted) {
char string[15];
char symbol = 'P';
float power_value = randof(15);
float current_value = randof(20);
sprintf(string, "%c %01f %01f",symbol,power_value,current_value);
if (zstr_send (publisher, string) == -1)
break; // interrupted
zclock_sleep (10); // Wait for 1/100 seconds
}
}
// Creating the default thread which will start the other threads
int main (void)
{
// Start the child threads
zctx_t *ctx = zctx_new();
zthread_fork (ctx, publisher_thread, NULL);
zthread_fork (ctx, subscriber_thread, NULL);
// Tell attached threads to exit
zctx_destroy (&ctx);
return 0;
}
Any help will be highly appreciated.
Mel.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20140707/016ef6b5/attachment.htm>
More information about the zeromq-dev
mailing list