[zeromq-dev] PUB/SUB assert(get_load () == 0) in zmq_ctx_destroy

Martin Townsend martin.townsend at xsilon.com
Tue Jul 1 16:28:42 CEST 2014


Hi,

I think I have got to the bottom of the problem. My main app is calling 
zmq_ctx_destroy which ends up calling the destructor for the io_thread 
associated with the IPC socket.  This destructor calls the destructor 
for the poller instance.  problem is that the io_thread instance has 
added the mailbox handle to the poller using add_fd but expects it to be 
removed using
void zmq::io_thread_t::process_stop ()
{
     poller->rm_fd (mailbox_handle);
     poller->stop ();
}

but in the context destructor it doesn't wait for the poller to stop it 
just calls

     for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
         io_threads [i]->stop ();

     //  Wait till I/O threads actually terminate.
     for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
         delete io_threads [i];
     }

If I'm correct in my thinking the poller is needed to process the stop 
command in zmq::io_thread_t::in_event??

if so we now have a race condition between io_thread stopping and 
receiving the stop command before we delete it.  So I added code to 
ensure the poller and io_thread are stopped in a coordinated fashion and 
I don't get the assert.

eg in poller loop

void zmq::epoll_t::loop ()
{
     epoll_event ev_buf [max_io_events];

     printf("poller started\n");
     poller_running = true;
     while (!stopping) {
         ...
     }
     printf("poller stopped\n");
     poller_running = false;
}

in destructor we wait for poller worker to stop
zmq::epoll_t::~epoll_t ()
{
     int i;

     printf("poller dying\n");

     //  Wait till the worker thread exits.
     worker.stop ();

     close (epoll_fd);
     for (retired_t::iterator it = retired.begin (); it != retired.end 
(); ++it)
         delete *it;

     i = 0;
     while(poller_running) {
         sleep(1);
         if(++i == 10)
             break;
     }
     printf("poller dead\n");
}


Then I created a poller stopped function that returned !poller_running 
which is used by io_thread_t stopped function so I can check this in the 
ctx destructor

zmq::ctx_t::~ctx_t ()
{
     //  Check that there are no remaining sockets.
     zmq_assert (sockets.empty ());

     printf("Stopping iothreads\n");
     //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
     //  thread subsequent invocation of destructor would hang-up.
     for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
         io_threads [i]->stop ();

     printf("Checking iothreads\n");
     //  Wait till I/O threads actually terminate.
     for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
         while (!io_threads [i]->stopped ()) {
             printf("iothread not stopped wait and try again\n");
             sleep (1);
         }
         delete io_threads [i];
     }

     ...


I can create a patch but I would like to get people's thoughts first.

Regards,
Martin.



On 01/07/14 11:45, Martin Townsend wrote:
> Apologies I forgot to say that I'm using v4.0.4 with epoll.
>
> On 01/07/14 11:44, Martin Townsend wrote:
>> Hi,
>>
>> I'm getting an assert when trying to gracefully exit
>>
>> At the bottom of the mail is my pthread code that handles subscribing in
>> a pub/sub model and reads power values.  On SIGINT and SIGTERM it sets
>> quit to 1 and the thread exits but I get an exception on the call to
>> zmq_ctx_destroy (zcontext);
>>
>> Assertion failed: get_load () == 0 (poller_base.cpp:31)
>> Thread [2] (Suspended: Signal 'SIGABRT' received. Description: Aborted.)
>>        15 __GI_raise() raise.c:56 0x482c9ff4
>>        14 __GI_abort() abort.c:89 0x482ce4d0
>>        13 zmq::zmq_abort() err.cpp:74 0x4803c558
>>        12 zmq::poller_base_t::~poller_base_t() poller_base.cpp:31 0x480519d4
>>        11 zmq::epoll_t::~epoll_t() epoll.cpp:42 0x4803b8f8
>>        10 zmq::epoll_t::~epoll_t() epoll.cpp:50 0x4803b930
>>        9 zmq::io_thread_t::~io_thread_t() io_thread.cpp:39 0x4803d2f0
>>        8 zmq::io_thread_t::~io_thread_t() io_thread.cpp:40 0x4803d3ac
>>        7 zmq::ctx_t::~ctx_t() ctx.cpp:82 0x48033a9c
>>        6 zmq::ctx_t::terminate() ctx.cpp:153 0x480367d4
>>        5 zmq_ctx_term() zmq.cpp:171 0x4806f13c
>>        4 zmq_ctx_destroy() zmq.cpp:241 0x4806f3e0
>>        3 zsub_thread() xcoapd.c:470 0x10002b9c
>>        2 start_thread() pthread_create.c:314 0x48095180
>>        1 clone() clone.S:65 0x48383e88
>>
>> Am I doing anything incorrect below?  If I put a sleep(4) before
>> destroying the context I don't get the assert so I'm suspecting a race
>> condition somewhere.  I'm running on a 100MHz Microblaze.
>>
>> /* ZeroMq Subscriber task */
>> static void * zsub_thread(void * arg) {
>>        int rc;
>>
>>        zcontext = zmq_ctx_new ();
>>        zmq_ctx_set (zcontext, ZMQ_MAX_SOCKETS, 256);
>>        int max_sockets = zmq_ctx_get (zcontext, ZMQ_MAX_SOCKETS); assert
>> (max_sockets == 256);
>>
>>        subscriber = zmq_socket (zcontext, ZMQ_SUB);
>>        printf("Subscribing\n");
>>        rc = zmq_connect (subscriber, "ipc://xpwrd.ipc");
>>        assert (rc == 0);
>>        // Subscribe to zipcode, default is NYC, 10001
>>        char *filter = "Pwr";
>>        rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen
>> (filter));
>>
>>
>>        while(!quit) {
>>            char buf[32];
>>            char *s;
>>            int unknown_err_count;
>>            zmq_pollitem_t items [] = {
>>                { subscriber, 0, ZMQ_POLLIN, 0 }
>>            };
>>
>>            rc = zmq_poll (items, 1, 1000);
>>            if(rc == 1) {
>>                /* As we only have one poll item we can safely call recv on
>> this */
>>                unknown_err_count = 0;
>>                s = s_recv (subscriber, buf, sizeof(buf));
>>                if(s)
>>                    sscanf(s, "Pwr:%04dW T:%dC", &pwr, &temp);
>>
>>                printf("Pwr:%04dW T:%dC\n", pwr, temp);
>>            } else if(rc == 0) {
>>                unknown_err_count = 0;
>>                printf("Poller Timed out\n");
>>            } else {
>>                /* error */
>>                if(errno == ETERM) {
>>                    printf("Poller has detected that socket was terminated\n");
>>                    break;
>>                } else if(errno == EFAULT) {
>>                    printf("zmq_poll has invalid parameters!!!!\n");
>>                    break;
>>                } else if(errno != EINTR) {
>>                    /* Ignore EINTR as we will just poll again on next
>> iteration of
>>                     * loop, all other errors are undocumented in zmq so
>> print a
>>                     * message and set a flag and if it occurs 3 times exit
>> loop */
>>                    printf("Poller returned unknown error %d", errno);
>>                    printf("%s\n", strerror(errno));
>>                    unknown_err_count++;
>>                    if(unknown_err_count == 3) {
>>                        break;
>>                    }
>>                }
>>            }
>>        }
>>
>>        printf("Subscriber thread closed\n");
>>        zmq_close (subscriber);
>>        zmq_ctx_destroy (zcontext);
>>
>>        return (void *)0;
>> }
>>
>>
>> Best Regards,
>> Martin.
>>
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev




More information about the zeromq-dev mailing list