[zeromq-dev] PUB/SUB assert(get_load () == 0) in zmq_ctx_destroy

Pieter Hintjens ph at imatix.com
Tue Jul 1 18:17:24 CEST 2014


Could be related, yes. Since you can reproduce it, why not make the
patch, and send it to master.

On Tue, Jul 1, 2014 at 4:46 PM, Martin Townsend
<martin.townsend at xsilon.com> wrote:
> This maybe related to
> https://github.com/zeromq/libzmq/issues/795
>
> Regards,
> Martin.
>
>
> On 01/07/14 15:28, Martin Townsend wrote:
>> Hi,
>>
>> I think I have got to the bottom of the problem. My main app is calling
>> zmq_ctx_destroy which ends up calling the destructor for the io_thread
>> associated with the IPC socket.  This destructor calls the destructor
>> for the poller instance.  problem is that the io_thread instance has
>> added the mailbox handle to the poller using add_fd but expects it to be
>> removed using
>> void zmq::io_thread_t::process_stop ()
>> {
>>       poller->rm_fd (mailbox_handle);
>>       poller->stop ();
>> }
>>
>> but in the context destructor it doesn't wait for the poller to stop it
>> just calls
>>
>>       for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
>>           io_threads [i]->stop ();
>>
>>       //  Wait till I/O threads actually terminate.
>>       for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
>>           delete io_threads [i];
>>       }
>>
>> If I'm correct in my thinking the poller is needed to process the stop
>> command in zmq::io_thread_t::in_event??
>>
>> if so we now have a race condition between io_thread stopping and
>> receiving the stop command before we delete it.  So I added code to
>> ensure the poller and io_thread are stopped in a coordinated fashion and
>> I don't get the assert.
>>
>> eg in poller loop
>>
>> void zmq::epoll_t::loop ()
>> {
>>       epoll_event ev_buf [max_io_events];
>>
>>       printf("poller started\n");
>>       poller_running = true;
>>       while (!stopping) {
>>           ...
>>       }
>>       printf("poller stopped\n");
>>       poller_running = false;
>> }
>>
>> in destructor we wait for poller worker to stop
>> zmq::epoll_t::~epoll_t ()
>> {
>>       int i;
>>
>>       printf("poller dying\n");
>>
>>       //  Wait till the worker thread exits.
>>       worker.stop ();
>>
>>       close (epoll_fd);
>>       for (retired_t::iterator it = retired.begin (); it != retired.end
>> (); ++it)
>>           delete *it;
>>
>>       i = 0;
>>       while(poller_running) {
>>           sleep(1);
>>           if(++i == 10)
>>               break;
>>       }
>>       printf("poller dead\n");
>> }
>>
>>
>> Then I created a poller stopped function that returned !poller_running
>> which is used by io_thread_t stopped function so I can check this in the
>> ctx destructor
>>
>> zmq::ctx_t::~ctx_t ()
>> {
>>       //  Check that there are no remaining sockets.
>>       zmq_assert (sockets.empty ());
>>
>>       printf("Stopping iothreads\n");
>>       //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
>>       //  thread subsequent invocation of destructor would hang-up.
>>       for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
>>           io_threads [i]->stop ();
>>
>>       printf("Checking iothreads\n");
>>       //  Wait till I/O threads actually terminate.
>>       for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
>>           while (!io_threads [i]->stopped ()) {
>>               printf("iothread not stopped wait and try again\n");
>>               sleep (1);
>>           }
>>           delete io_threads [i];
>>       }
>>
>>       ...
>>
>>
>> I can create a patch but I would like to get people's thoughts first.
>>
>> Regards,
>> Martin.
>>
>>
>>
>> On 01/07/14 11:45, Martin Townsend wrote:
>>> Apologies I forgot to say that I'm using v4.0.4 with epoll.
>>>
>>> On 01/07/14 11:44, Martin Townsend wrote:
>>>> Hi,
>>>>
>>>> I'm getting an assert when trying to gracefully exit
>>>>
>>>> At the bottom of the mail is my pthread code that handles subscribing in
>>>> a pub/sub model and reads power values.  On SIGINT and SIGTERM it sets
>>>> quit to 1 and the thread exits but I get an exception on the call to
>>>> zmq_ctx_destroy (zcontext);
>>>>
>>>> Assertion failed: get_load () == 0 (poller_base.cpp:31)
>>>> Thread [2] (Suspended: Signal 'SIGABRT' received. Description: Aborted.)
>>>>         15 __GI_raise() raise.c:56 0x482c9ff4
>>>>         14 __GI_abort() abort.c:89 0x482ce4d0
>>>>         13 zmq::zmq_abort() err.cpp:74 0x4803c558
>>>>         12 zmq::poller_base_t::~poller_base_t() poller_base.cpp:31 0x480519d4
>>>>         11 zmq::epoll_t::~epoll_t() epoll.cpp:42 0x4803b8f8
>>>>         10 zmq::epoll_t::~epoll_t() epoll.cpp:50 0x4803b930
>>>>         9 zmq::io_thread_t::~io_thread_t() io_thread.cpp:39 0x4803d2f0
>>>>         8 zmq::io_thread_t::~io_thread_t() io_thread.cpp:40 0x4803d3ac
>>>>         7 zmq::ctx_t::~ctx_t() ctx.cpp:82 0x48033a9c
>>>>         6 zmq::ctx_t::terminate() ctx.cpp:153 0x480367d4
>>>>         5 zmq_ctx_term() zmq.cpp:171 0x4806f13c
>>>>         4 zmq_ctx_destroy() zmq.cpp:241 0x4806f3e0
>>>>         3 zsub_thread() xcoapd.c:470 0x10002b9c
>>>>         2 start_thread() pthread_create.c:314 0x48095180
>>>>         1 clone() clone.S:65 0x48383e88
>>>>
>>>> Am I doing anything incorrect below?  If I put a sleep(4) before
>>>> destroying the context I don't get the assert so I'm suspecting a race
>>>> condition somewhere.  I'm running on a 100MHz Microblaze.
>>>>
>>>> /* ZeroMq Subscriber task */
>>>> static void * zsub_thread(void * arg) {
>>>>         int rc;
>>>>
>>>>         zcontext = zmq_ctx_new ();
>>>>         zmq_ctx_set (zcontext, ZMQ_MAX_SOCKETS, 256);
>>>>         int max_sockets = zmq_ctx_get (zcontext, ZMQ_MAX_SOCKETS); assert
>>>> (max_sockets == 256);
>>>>
>>>>         subscriber = zmq_socket (zcontext, ZMQ_SUB);
>>>>         printf("Subscribing\n");
>>>>         rc = zmq_connect (subscriber, "ipc://xpwrd.ipc");
>>>>         assert (rc == 0);
>>>>         // Subscribe to zipcode, default is NYC, 10001
>>>>         char *filter = "Pwr";
>>>>         rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen
>>>> (filter));
>>>>
>>>>
>>>>         while(!quit) {
>>>>             char buf[32];
>>>>             char *s;
>>>>             int unknown_err_count;
>>>>             zmq_pollitem_t items [] = {
>>>>                 { subscriber, 0, ZMQ_POLLIN, 0 }
>>>>             };
>>>>
>>>>             rc = zmq_poll (items, 1, 1000);
>>>>             if(rc == 1) {
>>>>                 /* As we only have one poll item we can safely call recv on
>>>> this */
>>>>                 unknown_err_count = 0;
>>>>                 s = s_recv (subscriber, buf, sizeof(buf));
>>>>                 if(s)
>>>>                     sscanf(s, "Pwr:%04dW T:%dC", &pwr, &temp);
>>>>
>>>>                 printf("Pwr:%04dW T:%dC\n", pwr, temp);
>>>>             } else if(rc == 0) {
>>>>                 unknown_err_count = 0;
>>>>                 printf("Poller Timed out\n");
>>>>             } else {
>>>>                 /* error */
>>>>                 if(errno == ETERM) {
>>>>                     printf("Poller has detected that socket was terminated\n");
>>>>                     break;
>>>>                 } else if(errno == EFAULT) {
>>>>                     printf("zmq_poll has invalid parameters!!!!\n");
>>>>                     break;
>>>>                 } else if(errno != EINTR) {
>>>>                     /* Ignore EINTR as we will just poll again on next
>>>> iteration of
>>>>                      * loop, all other errors are undocumented in zmq so
>>>> print a
>>>>                      * message and set a flag and if it occurs 3 times exit
>>>> loop */
>>>>                     printf("Poller returned unknown error %d", errno);
>>>>                     printf("%s\n", strerror(errno));
>>>>                     unknown_err_count++;
>>>>                     if(unknown_err_count == 3) {
>>>>                         break;
>>>>                     }
>>>>                 }
>>>>             }
>>>>         }
>>>>
>>>>         printf("Subscriber thread closed\n");
>>>>         zmq_close (subscriber);
>>>>         zmq_ctx_destroy (zcontext);
>>>>
>>>>         return (void *)0;
>>>> }
>>>>
>>>>
>>>> Best Regards,
>>>> Martin.
>>>>
>>>> _______________________________________________
>>>> zeromq-dev mailing list
>>>> zeromq-dev at lists.zeromq.org
>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>> _______________________________________________
>>> zeromq-dev mailing list
>>> zeromq-dev at lists.zeromq.org
>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev



More information about the zeromq-dev mailing list