[zeromq-dev] PUB/SUB assert(get_load () == 0) in zmq_ctx_destroy
Martin Townsend
martin.townsend at xsilon.com
Tue Jul 1 16:28:42 CEST 2014
Hi,
I think I have got to the bottom of the problem. My main app is calling
zmq_ctx_destroy which ends up calling the destructor for the io_thread
associated with the IPC socket. This destructor calls the destructor
for the poller instance. problem is that the io_thread instance has
added the mailbox handle to the poller using add_fd but expects it to be
removed using
void zmq::io_thread_t::process_stop ()
{
poller->rm_fd (mailbox_handle);
poller->stop ();
}
but in the context destructor it doesn't wait for the poller to stop it
just calls
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
io_threads [i]->stop ();
// Wait till I/O threads actually terminate.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
delete io_threads [i];
}
If I'm correct in my thinking the poller is needed to process the stop
command in zmq::io_thread_t::in_event??
if so we now have a race condition between io_thread stopping and
receiving the stop command before we delete it. So I added code to
ensure the poller and io_thread are stopped in a coordinated fashion and
I don't get the assert.
eg in poller loop
void zmq::epoll_t::loop ()
{
epoll_event ev_buf [max_io_events];
printf("poller started\n");
poller_running = true;
while (!stopping) {
...
}
printf("poller stopped\n");
poller_running = false;
}
in destructor we wait for poller worker to stop
zmq::epoll_t::~epoll_t ()
{
int i;
printf("poller dying\n");
// Wait till the worker thread exits.
worker.stop ();
close (epoll_fd);
for (retired_t::iterator it = retired.begin (); it != retired.end
(); ++it)
delete *it;
i = 0;
while(poller_running) {
sleep(1);
if(++i == 10)
break;
}
printf("poller dead\n");
}
Then I created a poller stopped function that returned !poller_running
which is used by io_thread_t stopped function so I can check this in the
ctx destructor
zmq::ctx_t::~ctx_t ()
{
// Check that there are no remaining sockets.
zmq_assert (sockets.empty ());
printf("Stopping iothreads\n");
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
io_threads [i]->stop ();
printf("Checking iothreads\n");
// Wait till I/O threads actually terminate.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
while (!io_threads [i]->stopped ()) {
printf("iothread not stopped wait and try again\n");
sleep (1);
}
delete io_threads [i];
}
...
I can create a patch but I would like to get people's thoughts first.
Regards,
Martin.
On 01/07/14 11:45, Martin Townsend wrote:
> Apologies I forgot to say that I'm using v4.0.4 with epoll.
>
> On 01/07/14 11:44, Martin Townsend wrote:
>> Hi,
>>
>> I'm getting an assert when trying to gracefully exit
>>
>> At the bottom of the mail is my pthread code that handles subscribing in
>> a pub/sub model and reads power values. On SIGINT and SIGTERM it sets
>> quit to 1 and the thread exits but I get an exception on the call to
>> zmq_ctx_destroy (zcontext);
>>
>> Assertion failed: get_load () == 0 (poller_base.cpp:31)
>> Thread [2] (Suspended: Signal 'SIGABRT' received. Description: Aborted.)
>> 15 __GI_raise() raise.c:56 0x482c9ff4
>> 14 __GI_abort() abort.c:89 0x482ce4d0
>> 13 zmq::zmq_abort() err.cpp:74 0x4803c558
>> 12 zmq::poller_base_t::~poller_base_t() poller_base.cpp:31 0x480519d4
>> 11 zmq::epoll_t::~epoll_t() epoll.cpp:42 0x4803b8f8
>> 10 zmq::epoll_t::~epoll_t() epoll.cpp:50 0x4803b930
>> 9 zmq::io_thread_t::~io_thread_t() io_thread.cpp:39 0x4803d2f0
>> 8 zmq::io_thread_t::~io_thread_t() io_thread.cpp:40 0x4803d3ac
>> 7 zmq::ctx_t::~ctx_t() ctx.cpp:82 0x48033a9c
>> 6 zmq::ctx_t::terminate() ctx.cpp:153 0x480367d4
>> 5 zmq_ctx_term() zmq.cpp:171 0x4806f13c
>> 4 zmq_ctx_destroy() zmq.cpp:241 0x4806f3e0
>> 3 zsub_thread() xcoapd.c:470 0x10002b9c
>> 2 start_thread() pthread_create.c:314 0x48095180
>> 1 clone() clone.S:65 0x48383e88
>>
>> Am I doing anything incorrect below? If I put a sleep(4) before
>> destroying the context I don't get the assert so I'm suspecting a race
>> condition somewhere. I'm running on a 100MHz Microblaze.
>>
>> /* ZeroMq Subscriber task */
>> static void * zsub_thread(void * arg) {
>> int rc;
>>
>> zcontext = zmq_ctx_new ();
>> zmq_ctx_set (zcontext, ZMQ_MAX_SOCKETS, 256);
>> int max_sockets = zmq_ctx_get (zcontext, ZMQ_MAX_SOCKETS); assert
>> (max_sockets == 256);
>>
>> subscriber = zmq_socket (zcontext, ZMQ_SUB);
>> printf("Subscribing\n");
>> rc = zmq_connect (subscriber, "ipc://xpwrd.ipc");
>> assert (rc == 0);
>> // Subscribe to zipcode, default is NYC, 10001
>> char *filter = "Pwr";
>> rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen
>> (filter));
>>
>>
>> while(!quit) {
>> char buf[32];
>> char *s;
>> int unknown_err_count;
>> zmq_pollitem_t items [] = {
>> { subscriber, 0, ZMQ_POLLIN, 0 }
>> };
>>
>> rc = zmq_poll (items, 1, 1000);
>> if(rc == 1) {
>> /* As we only have one poll item we can safely call recv on
>> this */
>> unknown_err_count = 0;
>> s = s_recv (subscriber, buf, sizeof(buf));
>> if(s)
>> sscanf(s, "Pwr:%04dW T:%dC", &pwr, &temp);
>>
>> printf("Pwr:%04dW T:%dC\n", pwr, temp);
>> } else if(rc == 0) {
>> unknown_err_count = 0;
>> printf("Poller Timed out\n");
>> } else {
>> /* error */
>> if(errno == ETERM) {
>> printf("Poller has detected that socket was terminated\n");
>> break;
>> } else if(errno == EFAULT) {
>> printf("zmq_poll has invalid parameters!!!!\n");
>> break;
>> } else if(errno != EINTR) {
>> /* Ignore EINTR as we will just poll again on next
>> iteration of
>> * loop, all other errors are undocumented in zmq so
>> print a
>> * message and set a flag and if it occurs 3 times exit
>> loop */
>> printf("Poller returned unknown error %d", errno);
>> printf("%s\n", strerror(errno));
>> unknown_err_count++;
>> if(unknown_err_count == 3) {
>> break;
>> }
>> }
>> }
>> }
>>
>> printf("Subscriber thread closed\n");
>> zmq_close (subscriber);
>> zmq_ctx_destroy (zcontext);
>>
>> return (void *)0;
>> }
>>
>>
>> Best Regards,
>> Martin.
>>
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
More information about the zeromq-dev
mailing list