[zeromq-dev] IPC address validity
Claudio Carbone
erupter at libero.it
Tue Jan 8 18:42:00 CET 2013
Ok problems are getting worrisome.
I wrote this simple test program
#define THREAD_CONTROL_IPC "ipc://th_control"
int main (void)
Zmqcpp::Context* mycontext = new Zmqcpp::Context(1);
Zmqcpp::Publisher *ctrlpublisher = new Zmqcpp::Publisher(mycontext,
THREAD_CONTROL_IPC, ZMQCPP_BIND);
Zmqcpp::Subscriber *ctrlsubber = new
Zmqcpp::Subscriber(mycontext,THREAD_CONTROL_IPC, ZMQCPP_CONNECT);
ctrlsubber->SubscribeTopic(HEADER_CTRL);
sleep(1);
while(1)
{
//cout << "Sending messages..." << std::endl;
ctrlpublisher->PubMsg(2,HEADER_CTRL,"Test");
//cout << "Receiving messages..." << std::endl;
ctrlsubber->RecvMsg();
cout << "Received <CTRL> msg: " << ctrlsubber->RecvMsg() <<
std::endl;
sleep(1);
}
return(0);
}
and it works: I receive one message per second.
Now I want to put this into a bigger program, and use the ipc sockets to
message between threads.
And this is giving me headache: when ran normally I don't know if it
works or not, but it does not hang.
When ran in debug mode it throws the following assert error:
Invalid argument (session_base.cpp:490)
when executing the publishing line in the main thread's main
ctrlpublisher->PubMsg(2,HEADER_CTRL,"Test");
Which strikes me as not possible as I'm not using pgm but rather ipc.
Following is the relevant source of the "/complete/" (in development)
program
struct parameters{
Zmqcpp::Context * zmqcont;
void (*callback) (std::string);
std::string ip;
int conntype;
std::list<std::string> topics;
};
void *th_subscriber (void * params);
void server_location_callback(std::string ip);
int main(int argc, char** argv) {
Zmqcpp::Context* mycontext = new Zmqcpp::Context(1);
Zmqcpp::Publisher *ctrlpublisher = new Zmqcpp::Publisher(mycontext,
THREAD_CONTROL_IPC, ZMQCPP_BIND);
ss.str("");
ss.str(HEADER_SERVER_INFO);
myparams.topics.insert(myparams.topics.end(),ss.str());
myparams.conntype = ZMQCPP_CONNECT;
myparams.zmqcont=mycontext;
myparams.callback=&server_location_callback;
pthread_t th_hndl_subber;
pthread_create(&th_hndl_subber, NULL, th_subscriber, (void
*)&myparams);
while(1)
{
if (s_interrupted == 1) {
fprintf(outstream, "\n");
if (MAIN_DEBUG)
dbg_print(MAIN_PROC_NAME,"SIGINT interrupt received,
killing node\n");
else
fprintf(outstream, "\n!!!!! KILL NODE COMMAND
RECEIVED !!!!!\n\n");
break;
}
ctrlpublisher->PubMsg(2,HEADER_CTRL,"Test");
}
mycontext->~Context();
return (EXIT_SUCCESS);
}
void *th_subscriber (void * params)
{
struct parameters *myparams = (struct parameters *) params;
Zmqcpp::Subscriber *msgsubber = new
Zmqcpp::Subscriber(myparams->zmqcont,myparams->ip.c_str(),
myparams->conntype);
for (std::list<std::string>::iterator it =
myparams->topics.begin(); it != myparams->topics.end(); it++)
{
cout << "Subscribing to <" << *it <<">"<<std::endl;
msgsubber->SubscribeTopic(*it);
}
Saetta_Server::Server_Info serverinfomsg;
Zmqcpp::Subscriber *ctrlsubber = new
Zmqcpp::Subscriber(myparams->zmqcont,THREAD_CONTROL_IPC, ZMQCPP_CONNECT);
ctrlsubber->SubscribeTopic(HEADER_CTRL);
std::string server_ip;
std::stringstream ss;
std::string msgtype;
std::string msgmaster = HEADER_SERVER_INFO;
std::string temp;
sleep(1);
while(1)
{
ss.str("");
msgtype.clear();
ss << msgsubber->RecvMsg();
msgtype = ss.str();
if (!msgtype.compare(msgmaster))
{
//cout << mysubber->RecvMsg() << std::endl;
serverinfomsg.ParseFromString(msgsubber->RecvMsg());
//cout << "Address: " << serverinfomsg.address() << std::endl;
if (strcmp(server_ip.c_str(),serverinfomsg.address().c_str()))
{
ss.str("");
server_ip.clear();
ss << serverinfomsg.address();
server_ip.assign(ss.str());
ss.str("");
cout << "Updated" << std::endl;
myparams->callback(server_ip.c_str());
}
}
ss.str("");
ss << ctrlsubber->RecvMsg();
msgtype.clear();
msgtype = ss.str();
if (!msgtype.compare(HEADER_CTRL))
{
ss.str("");
ss << "Control message: " << ctrlsubber->RecvMsg();
temp.clear();
temp.assign(ss.str());
myparams->callback(temp.c_str());
}
}
return(EXIT_SUCCESS);
}
void server_location_callback(std::string ip)
{
cout << "AAAAAAAAAA Callback fired! AAAAAAAAAA" << std::endl;
cout << "AAAAAAAAAA New server address: " << ip.c_str() << "
AAAAAAAAAA" << std::endl;
}
I fail to see the problem as the subscriber on pgm is working by itself
and is not giving me problems, and the ipc couple has been tested in the
other program reported above.
I know I use custom classes, but up until now I didn't have any problem
with them.
If anyone wants to dig in the source, here is my entire project
https://github.com/erupter/saettang
Thank you for your patience.
Regards
Claudio
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20130108/d7a89076/attachment.htm>
More information about the zeromq-dev
mailing list