[zeromq-dev] Something's broken in session_base or there's something I really ought to know about multithreading.
Claudio Carbone
erupter at libero.it
Mon Jan 14 13:34:09 CET 2013
Problem: creating a PMG socket outside of the main thread triggers an
assert error on line 490 of session_base.cpp.
Previously the test case was using a custom library I've been writing
but I have rewrote my test case stripping all my custom code, now it
just runs out of plain ZMQ instructions. So it can't be my library.
The problem is still there: try to run it through gdb and it will exit
with an assert error.
Run it normally and it will function.
Please guys have a look at this.
Only thing I should really mention is: change "wlan0" to "eth0" if you
don't have a wireless interface, or either change the name accordingly
to your installed interface.
Thanks
Claudio
/*
* File: main.cpp
* Author: erupter
*
* Created on January 9, 2013, 2:03 PM
*/
#include <cstdlib>
#include <net/if.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
#include <list>
#include <zhelpers.h>
#define THREAD_CONTROL_IPC "ipc://saetta_th_control"
#define MULTICAST_ADDRESS "239.192.1.1"
#define MULTICAST_PORT 5678
#define ROUTER_PORT 5679
#define HEADER_CTRL "CTRL"
#define HEADER_SERVER_INFO "SERVER_INFO"
using namespace std;
std::string _zmq_sub_skt_string;
std::string _local_ip_address;
std::stringstream ss;
struct parameters{
void * zmqcont;
void (*callback) (std::string);
std::string ip;
int conntype;
std::list<std::string> topics;
};
volatile sig_atomic_t s_interrupted = 0;
volatile sig_atomic_t alarm_expired = 0;
void *th_subscriber (void * params);
void server_location_callback(std::string ip);
static inline bool get_iface_address(std::string interf);
static void s_signal_handler (int signal_value);
static void s_catch_signals (void);
/*
*
*/
int main(int argc, char** argv) {
struct parameters myparams;
s_catch_signals();
printf( "Requested interface wlan0, attempting to fetch address...\n");
if (!get_iface_address("wlan0"))
return (EXIT_FAILURE);
ss << "epgm://" << _local_ip_address << ";" << MULTICAST_ADDRESS <<
":" << MULTICAST_PORT;
_zmq_sub_skt_string = ss.str();
myparams.ip = ss.str();
void *mycontext = zmq_ctx_new();
ss.str("");
ss.str(HEADER_SERVER_INFO);
myparams.topics.insert(myparams.topics.end(),ss.str());
myparams.conntype = ZMQCPP_CONNECT;
myparams.zmqcont=mycontext;
myparams.callback=&server_location_callback;
ss.str("");
ss << THREAD_CONTROL_IPC;
void * zmq_ctrlpublisher = zmq_socket(mycontext, ZMQ_PUB);
zmq_bind(zmq_ctrlpublisher,THREAD_CONTROL_IPC);
pthread_t th_hndl_subber;
pthread_create(&th_hndl_subber, NULL, th_subscriber, (void
*)&myparams);
while(1)
{
if (s_interrupted == 1) {
printf("\n\n!!!!! KILL NODE COMMAND RECEIVED !!!!!\n\n");
s_sendmore(zmq_ctrlpublisher,(const char*)HEADER_CTRL);
s_send(zmq_ctrlpublisher,(const char*)"KILL");
sleep(1);
printf("Main thread shutting down.\n");
break;
}
s_sendmore(zmq_ctrlpublisher,(const char*)HEADER_CTRL);
s_send(zmq_ctrlpublisher,(const char*)"Test");
sleep(1);
}
void *status;
pthread_join(th_hndl_subber, &status);
printf("Main thread cleaning.\n");
zmq_close(zmq_ctrlpublisher);
zmq_ctx_destroy(mycontext);
printf("Main thread cleaned succesfully.");
return (EXIT_SUCCESS);
}
static inline bool get_iface_address(std::string interf)
{
int s;
struct ifreq ifr = {};
s = socket(PF_INET, SOCK_DGRAM, 0);
strncpy(ifr.ifr_name, interf.c_str(), sizeof(ifr.ifr_name));
if (ioctl(s, SIOCGIFADDR, &ifr) >= 0)
{
_local_ip_address = inet_ntoa(((struct sockaddr_in
*)&ifr.ifr_addr)->sin_addr);
printf("Detected interface IP address is %s.\nCreating sockets
on this address.\n", _local_ip_address.c_str());
return true;
}
printf("Unable to get interface IP address: is the interface
configured?\n");
return false;
}
void *th_subscriber (void * params)
{
int th_continue=1;
std::string server_ip;
std::stringstream ss;
std::string msgtype;
std::string msgmaster = HEADER_SERVER_INFO;
std::string temp;
struct parameters *myparams = (struct parameters *) params;
void * zmq_subscriber = zmq_socket(myparams->zmqcont,ZMQ_SUB);
zmq_connect(zmq_subscriber,(&(myparams->ip))->c_str());
for (std::list<std::string>::iterator it =
myparams->topics.begin(); it != myparams->topics.end(); it++)
{
cout << "Subscribing to <" << *it <<">"<<std::endl;
printf("Subscribing to: <%s>\n",it->c_str());
zmq_setsockopt(zmq_subscriber,ZMQ_SUBSCRIBE,(*it).c_str(),
(*it).length());
}
ss.str("");
ss << THREAD_CONTROL_IPC;
void * zmq_ctrlsub = zmq_socket(myparams->zmqcont,ZMQ_SUB);
zmq_connect(zmq_ctrlsub,THREAD_CONTROL_IPC);
zmq_setsockopt(zmq_ctrlsub,ZMQ_SUBSCRIBE,HEADER_CTRL,((std::string)HEADER_CTRL).length());
sleep(1);
while(th_continue==1)
{
ss.str("");
ss << s_recv(zmq_ctrlsub);
msgtype.clear();
msgtype = ss.str();
if (!msgtype.compare(HEADER_CTRL))
{
ss.str("");
ss << "Control message: " << s_recv(zmq_ctrlsub);
temp.clear();
temp.assign(ss.str());
myparams->callback(temp.c_str());
if (temp.compare("Control message: KILL")==0)
th_continue=0;
}
}
printf("Child thread cleaning resources.\n");
zmq_close(zmq_ctrlsub);
zmq_close(zmq_subscriber);
printf("Child thread terminating.\n");
return(EXIT_SUCCESS);
}
void server_location_callback(std::string ip)
{
cout << ip.c_str() << std::endl;
}
static void s_signal_handler (int signal_value)
{
if (signal_value == SIGTERM || signal_value == SIGSTOP ||
signal_value == SIGINT)
s_interrupted = 1;
if (signal_value == SIGALRM)
//signal(SIGALRM,alarm_wakeup);
alarm_expired = 1;
if (signal_value == SIGTSTP)
{
printf("SIGTSTP received\n");
}
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset (&action.sa_mask);
sigaction (SIGINT, &action, NULL);
sigaction (SIGTERM, &action, NULL);
sigaction (SIGALRM, &action, NULL);
sigaction (SIGSTOP, &action, NULL);
sigaction (SIGTSTP, &action, NULL);
}
More information about the zeromq-dev
mailing list