[zeromq-dev] Something's broken in session_base or there's something I really ought to know about multithreading.

Claudio Carbone erupter at libero.it
Mon Jan 14 13:34:09 CET 2013


Problem: creating a PMG socket outside of the main thread triggers an 
assert error on line 490 of session_base.cpp.
Previously the test case was using a custom library I've been writing 
but I have rewrote my test case stripping all my custom code, now it 
just runs out of plain ZMQ instructions. So it can't be my library.

The problem is still there: try to run it through gdb and it will exit 
with an assert error.
Run it normally and it will function.

Please guys have a look at this.
Only thing I should really mention is: change "wlan0" to "eth0" if you 
don't have a wireless interface, or either change the name accordingly 
to your installed interface.

Thanks
Claudio




/*
  * File:   main.cpp
  * Author: erupter
  *
  * Created on January 9, 2013, 2:03 PM
  */

#include <cstdlib>
#include <net/if.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
#include <list>
#include <zhelpers.h>
#define THREAD_CONTROL_IPC      "ipc://saetta_th_control"
#define MULTICAST_ADDRESS       "239.192.1.1"
#define MULTICAST_PORT          5678
#define ROUTER_PORT             5679
#define HEADER_CTRL             "CTRL"
#define HEADER_SERVER_INFO      "SERVER_INFO"

using namespace std;
std::string _zmq_sub_skt_string;
std::string _local_ip_address;
std::stringstream ss;
struct parameters{
     void * zmqcont;
     void (*callback) (std::string);
     std::string ip;
     int conntype;
     std::list<std::string> topics;
};
volatile sig_atomic_t s_interrupted = 0;
volatile sig_atomic_t alarm_expired = 0;


void *th_subscriber (void * params);
void server_location_callback(std::string ip);
static inline bool get_iface_address(std::string interf);
static void s_signal_handler (int signal_value);
static void s_catch_signals (void);
/*
  *
  */
int main(int argc, char** argv) {

     struct parameters myparams;
     s_catch_signals();
     printf( "Requested interface wlan0, attempting to fetch address...\n");
     if (!get_iface_address("wlan0"))
         return (EXIT_FAILURE);
     ss << "epgm://" << _local_ip_address << ";" << MULTICAST_ADDRESS << 
":" << MULTICAST_PORT;
     _zmq_sub_skt_string = ss.str();
     myparams.ip = ss.str();
     void *mycontext = zmq_ctx_new();

     ss.str("");
     ss.str(HEADER_SERVER_INFO);
     myparams.topics.insert(myparams.topics.end(),ss.str());
     myparams.conntype = ZMQCPP_CONNECT;
     myparams.zmqcont=mycontext;
     myparams.callback=&server_location_callback;

     ss.str("");
     ss << THREAD_CONTROL_IPC;
     void * zmq_ctrlpublisher = zmq_socket(mycontext, ZMQ_PUB);
     zmq_bind(zmq_ctrlpublisher,THREAD_CONTROL_IPC);

     pthread_t th_hndl_subber;
     pthread_create(&th_hndl_subber, NULL, th_subscriber, (void 
*)&myparams);


     while(1)
     {
         if (s_interrupted == 1) {
             printf("\n\n!!!!!    KILL NODE COMMAND RECEIVED !!!!!\n\n");
             s_sendmore(zmq_ctrlpublisher,(const char*)HEADER_CTRL);
             s_send(zmq_ctrlpublisher,(const char*)"KILL");
             sleep(1);
             printf("Main thread shutting down.\n");
             break;
         }
         s_sendmore(zmq_ctrlpublisher,(const char*)HEADER_CTRL);
         s_send(zmq_ctrlpublisher,(const char*)"Test");
         sleep(1);
     }

     void *status;
     pthread_join(th_hndl_subber, &status);
     printf("Main thread cleaning.\n");
     zmq_close(zmq_ctrlpublisher);
     zmq_ctx_destroy(mycontext);
     printf("Main thread cleaned succesfully.");
     return (EXIT_SUCCESS);
}

static inline bool get_iface_address(std::string interf)
{
     int s;
     struct ifreq ifr = {};
     s = socket(PF_INET, SOCK_DGRAM, 0);

     strncpy(ifr.ifr_name, interf.c_str(), sizeof(ifr.ifr_name));

     if (ioctl(s, SIOCGIFADDR, &ifr) >= 0)
     {
         _local_ip_address = inet_ntoa(((struct sockaddr_in 
*)&ifr.ifr_addr)->sin_addr);
         printf("Detected interface IP address is %s.\nCreating sockets 
on this address.\n", _local_ip_address.c_str());
         return true;
     }
     printf("Unable to get interface IP address: is the interface 
configured?\n");
     return false;

}

void *th_subscriber (void * params)
{
     int th_continue=1;
     std::string server_ip;
     std::stringstream ss;
     std::string msgtype;
     std::string msgmaster = HEADER_SERVER_INFO;
     std::string temp;
     struct parameters *myparams = (struct parameters *) params;
     void * zmq_subscriber = zmq_socket(myparams->zmqcont,ZMQ_SUB);
zmq_connect(zmq_subscriber,(&(myparams->ip))->c_str());
     for (std::list<std::string>::iterator it = 
myparams->topics.begin(); it != myparams->topics.end(); it++)
     {
         cout << "Subscribing to <" << *it <<">"<<std::endl;
         printf("Subscribing to: <%s>\n",it->c_str());
         zmq_setsockopt(zmq_subscriber,ZMQ_SUBSCRIBE,(*it).c_str(), 
(*it).length());
     }

     ss.str("");
     ss << THREAD_CONTROL_IPC;

     void * zmq_ctrlsub = zmq_socket(myparams->zmqcont,ZMQ_SUB);
     zmq_connect(zmq_ctrlsub,THREAD_CONTROL_IPC);
zmq_setsockopt(zmq_ctrlsub,ZMQ_SUBSCRIBE,HEADER_CTRL,((std::string)HEADER_CTRL).length());

     sleep(1);
     while(th_continue==1)
     {

         ss.str("");
         ss << s_recv(zmq_ctrlsub);
         msgtype.clear();
         msgtype = ss.str();
         if (!msgtype.compare(HEADER_CTRL))
         {
             ss.str("");
             ss << "Control message: " << s_recv(zmq_ctrlsub);
             temp.clear();
             temp.assign(ss.str());
             myparams->callback(temp.c_str());
             if (temp.compare("Control message: KILL")==0)
                 th_continue=0;

         }
     }
     printf("Child thread cleaning resources.\n");
     zmq_close(zmq_ctrlsub);
     zmq_close(zmq_subscriber);
     printf("Child thread terminating.\n");
     return(EXIT_SUCCESS);

}

void server_location_callback(std::string ip)
{

     cout << ip.c_str() <<  std::endl;
}

static void s_signal_handler (int signal_value)
{
     if (signal_value == SIGTERM || signal_value == SIGSTOP || 
signal_value == SIGINT)
         s_interrupted = 1;
     if (signal_value == SIGALRM)
         //signal(SIGALRM,alarm_wakeup);
         alarm_expired = 1;
     if (signal_value == SIGTSTP)
     {
         printf("SIGTSTP received\n");
     }
}

static void s_catch_signals (void)
{
     struct sigaction action;
     action.sa_handler = s_signal_handler;
     action.sa_flags = 0;
     sigemptyset (&action.sa_mask);
     sigaction (SIGINT, &action, NULL);
     sigaction (SIGTERM, &action, NULL);
     sigaction (SIGALRM, &action, NULL);
     sigaction (SIGSTOP, &action, NULL);
     sigaction (SIGTSTP, &action, NULL);
}



More information about the zeromq-dev mailing list