[zeromq-dev] IPC address validity

Claudio Carbone erupter at libero.it
Tue Jan 8 18:42:00 CET 2013


Ok problems are getting worrisome.

I wrote this simple test program

#define THREAD_CONTROL_IPC "ipc://th_control"

int main (void)
     Zmqcpp::Context* mycontext = new Zmqcpp::Context(1);
     Zmqcpp::Publisher *ctrlpublisher = new Zmqcpp::Publisher(mycontext, 
THREAD_CONTROL_IPC, ZMQCPP_BIND);
     Zmqcpp::Subscriber *ctrlsubber = new 
Zmqcpp::Subscriber(mycontext,THREAD_CONTROL_IPC, ZMQCPP_CONNECT);
     ctrlsubber->SubscribeTopic(HEADER_CTRL);
     sleep(1);
     while(1)
     {
         //cout << "Sending messages..." << std::endl;
         ctrlpublisher->PubMsg(2,HEADER_CTRL,"Test");
         //cout << "Receiving messages..." << std::endl;
         ctrlsubber->RecvMsg();
         cout <<  "Received <CTRL> msg: " << ctrlsubber->RecvMsg() << 
std::endl;
         sleep(1);
     }
     return(0);
}

and it works: I receive one message per second.
Now I want to put this into a bigger program, and use the ipc sockets to 
message between threads.
And this is giving me headache: when ran normally I don't know if it 
works or not, but it does not hang.
When ran in debug mode it throws the following assert error:
Invalid argument (session_base.cpp:490)

when executing the publishing line in the main thread's main
ctrlpublisher->PubMsg(2,HEADER_CTRL,"Test");

Which strikes me as not possible as I'm not using pgm but rather ipc.
Following is the relevant source of the "/complete/" (in development) 
program

struct parameters{
     Zmqcpp::Context * zmqcont;
     void (*callback) (std::string);
     std::string ip;
     int conntype;
     std::list<std::string> topics;
};

void *th_subscriber (void * params);
void server_location_callback(std::string ip);

int main(int argc, char** argv) {

     Zmqcpp::Context* mycontext = new Zmqcpp::Context(1);

     Zmqcpp::Publisher *ctrlpublisher = new Zmqcpp::Publisher(mycontext, 
THREAD_CONTROL_IPC, ZMQCPP_BIND);

     ss.str("");
     ss.str(HEADER_SERVER_INFO);
     myparams.topics.insert(myparams.topics.end(),ss.str());
     myparams.conntype = ZMQCPP_CONNECT;
     myparams.zmqcont=mycontext;
     myparams.callback=&server_location_callback;

     pthread_t th_hndl_subber;
     pthread_create(&th_hndl_subber, NULL, th_subscriber, (void 
*)&myparams);

     while(1)
     {
         if (s_interrupted == 1) {
             fprintf(outstream, "\n");
             if (MAIN_DEBUG)
                 dbg_print(MAIN_PROC_NAME,"SIGINT interrupt received, 
killing node\n");
             else
                 fprintf(outstream, "\n!!!!!    KILL NODE COMMAND 
RECEIVED    !!!!!\n\n");
             break;
         }

         ctrlpublisher->PubMsg(2,HEADER_CTRL,"Test");

     }
     mycontext->~Context();
     return (EXIT_SUCCESS);
}

void *th_subscriber (void * params)
{
     struct parameters *myparams = (struct parameters *) params;
     Zmqcpp::Subscriber *msgsubber = new 
Zmqcpp::Subscriber(myparams->zmqcont,myparams->ip.c_str(), 
myparams->conntype);
     for (std::list<std::string>::iterator it = 
myparams->topics.begin(); it != myparams->topics.end(); it++)
     {
         cout << "Subscribing to <" << *it <<">"<<std::endl;
         msgsubber->SubscribeTopic(*it);
     }
     Saetta_Server::Server_Info serverinfomsg;
     Zmqcpp::Subscriber *ctrlsubber = new 
Zmqcpp::Subscriber(myparams->zmqcont,THREAD_CONTROL_IPC, ZMQCPP_CONNECT);
     ctrlsubber->SubscribeTopic(HEADER_CTRL);
     std::string server_ip;
     std::stringstream ss;
     std::string msgtype;
     std::string msgmaster = HEADER_SERVER_INFO;
     std::string temp;
     sleep(1);
     while(1)
     {
         ss.str("");
         msgtype.clear();
         ss << msgsubber->RecvMsg();
         msgtype = ss.str();

         if (!msgtype.compare(msgmaster))
         {
             //cout << mysubber->RecvMsg() << std::endl;
serverinfomsg.ParseFromString(msgsubber->RecvMsg());
             //cout << "Address: " << serverinfomsg.address() << std::endl;
             if (strcmp(server_ip.c_str(),serverinfomsg.address().c_str()))
             {
                 ss.str("");
                 server_ip.clear();
                 ss << serverinfomsg.address();
                 server_ip.assign(ss.str());
                 ss.str("");
                 cout << "Updated" << std::endl;
                 myparams->callback(server_ip.c_str());
             }
         }

         ss.str("");
         ss << ctrlsubber->RecvMsg();
         msgtype.clear();
         msgtype = ss.str();
         if (!msgtype.compare(HEADER_CTRL))
         {
             ss.str("");
             ss << "Control message: " << ctrlsubber->RecvMsg();
             temp.clear();
             temp.assign(ss.str());
             myparams->callback(temp.c_str());
         }
     }
     return(EXIT_SUCCESS);

}

void server_location_callback(std::string ip)
{
     cout << "AAAAAAAAAA  Callback fired!  AAAAAAAAAA" << std::endl;
     cout << "AAAAAAAAAA  New server address: " << ip.c_str() << "  
AAAAAAAAAA" << std::endl;
}


I fail to see the problem as the subscriber on pgm is working by itself 
and is not giving me problems, and the ipc couple has been tested in the 
other program reported above.
I know I use custom classes, but up until now I didn't have any problem 
with them.
If anyone wants to dig in the source, here is my entire project
https://github.com/erupter/saettang

Thank you for your patience.
Regards
Claudio
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20130108/d7a89076/attachment.htm>


More information about the zeromq-dev mailing list