[zeromq-dev] PUB/SUB assert(get_load () == 0) in zmq_ctx_destroy

Martin Townsend martin.townsend at xsilon.com
Tue Jul 1 16:46:44 CEST 2014


This maybe related to
https://github.com/zeromq/libzmq/issues/795

Regards,
Martin.


On 01/07/14 15:28, Martin Townsend wrote:
> Hi,
>
> I think I have got to the bottom of the problem. My main app is calling
> zmq_ctx_destroy which ends up calling the destructor for the io_thread
> associated with the IPC socket.  This destructor calls the destructor
> for the poller instance.  problem is that the io_thread instance has
> added the mailbox handle to the poller using add_fd but expects it to be
> removed using
> void zmq::io_thread_t::process_stop ()
> {
>       poller->rm_fd (mailbox_handle);
>       poller->stop ();
> }
>
> but in the context destructor it doesn't wait for the poller to stop it
> just calls
>
>       for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
>           io_threads [i]->stop ();
>
>       //  Wait till I/O threads actually terminate.
>       for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
>           delete io_threads [i];
>       }
>
> If I'm correct in my thinking the poller is needed to process the stop
> command in zmq::io_thread_t::in_event??
>
> if so we now have a race condition between io_thread stopping and
> receiving the stop command before we delete it.  So I added code to
> ensure the poller and io_thread are stopped in a coordinated fashion and
> I don't get the assert.
>
> eg in poller loop
>
> void zmq::epoll_t::loop ()
> {
>       epoll_event ev_buf [max_io_events];
>
>       printf("poller started\n");
>       poller_running = true;
>       while (!stopping) {
>           ...
>       }
>       printf("poller stopped\n");
>       poller_running = false;
> }
>
> in destructor we wait for poller worker to stop
> zmq::epoll_t::~epoll_t ()
> {
>       int i;
>
>       printf("poller dying\n");
>
>       //  Wait till the worker thread exits.
>       worker.stop ();
>
>       close (epoll_fd);
>       for (retired_t::iterator it = retired.begin (); it != retired.end
> (); ++it)
>           delete *it;
>
>       i = 0;
>       while(poller_running) {
>           sleep(1);
>           if(++i == 10)
>               break;
>       }
>       printf("poller dead\n");
> }
>
>
> Then I created a poller stopped function that returned !poller_running
> which is used by io_thread_t stopped function so I can check this in the
> ctx destructor
>
> zmq::ctx_t::~ctx_t ()
> {
>       //  Check that there are no remaining sockets.
>       zmq_assert (sockets.empty ());
>
>       printf("Stopping iothreads\n");
>       //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
>       //  thread subsequent invocation of destructor would hang-up.
>       for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
>           io_threads [i]->stop ();
>
>       printf("Checking iothreads\n");
>       //  Wait till I/O threads actually terminate.
>       for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
>           while (!io_threads [i]->stopped ()) {
>               printf("iothread not stopped wait and try again\n");
>               sleep (1);
>           }
>           delete io_threads [i];
>       }
>
>       ...
>
>
> I can create a patch but I would like to get people's thoughts first.
>
> Regards,
> Martin.
>
>
>
> On 01/07/14 11:45, Martin Townsend wrote:
>> Apologies I forgot to say that I'm using v4.0.4 with epoll.
>>
>> On 01/07/14 11:44, Martin Townsend wrote:
>>> Hi,
>>>
>>> I'm getting an assert when trying to gracefully exit
>>>
>>> At the bottom of the mail is my pthread code that handles subscribing in
>>> a pub/sub model and reads power values.  On SIGINT and SIGTERM it sets
>>> quit to 1 and the thread exits but I get an exception on the call to
>>> zmq_ctx_destroy (zcontext);
>>>
>>> Assertion failed: get_load () == 0 (poller_base.cpp:31)
>>> Thread [2] (Suspended: Signal 'SIGABRT' received. Description: Aborted.)
>>>         15 __GI_raise() raise.c:56 0x482c9ff4
>>>         14 __GI_abort() abort.c:89 0x482ce4d0
>>>         13 zmq::zmq_abort() err.cpp:74 0x4803c558
>>>         12 zmq::poller_base_t::~poller_base_t() poller_base.cpp:31 0x480519d4
>>>         11 zmq::epoll_t::~epoll_t() epoll.cpp:42 0x4803b8f8
>>>         10 zmq::epoll_t::~epoll_t() epoll.cpp:50 0x4803b930
>>>         9 zmq::io_thread_t::~io_thread_t() io_thread.cpp:39 0x4803d2f0
>>>         8 zmq::io_thread_t::~io_thread_t() io_thread.cpp:40 0x4803d3ac
>>>         7 zmq::ctx_t::~ctx_t() ctx.cpp:82 0x48033a9c
>>>         6 zmq::ctx_t::terminate() ctx.cpp:153 0x480367d4
>>>         5 zmq_ctx_term() zmq.cpp:171 0x4806f13c
>>>         4 zmq_ctx_destroy() zmq.cpp:241 0x4806f3e0
>>>         3 zsub_thread() xcoapd.c:470 0x10002b9c
>>>         2 start_thread() pthread_create.c:314 0x48095180
>>>         1 clone() clone.S:65 0x48383e88
>>>
>>> Am I doing anything incorrect below?  If I put a sleep(4) before
>>> destroying the context I don't get the assert so I'm suspecting a race
>>> condition somewhere.  I'm running on a 100MHz Microblaze.
>>>
>>> /* ZeroMq Subscriber task */
>>> static void * zsub_thread(void * arg) {
>>>         int rc;
>>>
>>>         zcontext = zmq_ctx_new ();
>>>         zmq_ctx_set (zcontext, ZMQ_MAX_SOCKETS, 256);
>>>         int max_sockets = zmq_ctx_get (zcontext, ZMQ_MAX_SOCKETS); assert
>>> (max_sockets == 256);
>>>
>>>         subscriber = zmq_socket (zcontext, ZMQ_SUB);
>>>         printf("Subscribing\n");
>>>         rc = zmq_connect (subscriber, "ipc://xpwrd.ipc");
>>>         assert (rc == 0);
>>>         // Subscribe to zipcode, default is NYC, 10001
>>>         char *filter = "Pwr";
>>>         rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen
>>> (filter));
>>>
>>>
>>>         while(!quit) {
>>>             char buf[32];
>>>             char *s;
>>>             int unknown_err_count;
>>>             zmq_pollitem_t items [] = {
>>>                 { subscriber, 0, ZMQ_POLLIN, 0 }
>>>             };
>>>
>>>             rc = zmq_poll (items, 1, 1000);
>>>             if(rc == 1) {
>>>                 /* As we only have one poll item we can safely call recv on
>>> this */
>>>                 unknown_err_count = 0;
>>>                 s = s_recv (subscriber, buf, sizeof(buf));
>>>                 if(s)
>>>                     sscanf(s, "Pwr:%04dW T:%dC", &pwr, &temp);
>>>
>>>                 printf("Pwr:%04dW T:%dC\n", pwr, temp);
>>>             } else if(rc == 0) {
>>>                 unknown_err_count = 0;
>>>                 printf("Poller Timed out\n");
>>>             } else {
>>>                 /* error */
>>>                 if(errno == ETERM) {
>>>                     printf("Poller has detected that socket was terminated\n");
>>>                     break;
>>>                 } else if(errno == EFAULT) {
>>>                     printf("zmq_poll has invalid parameters!!!!\n");
>>>                     break;
>>>                 } else if(errno != EINTR) {
>>>                     /* Ignore EINTR as we will just poll again on next
>>> iteration of
>>>                      * loop, all other errors are undocumented in zmq so
>>> print a
>>>                      * message and set a flag and if it occurs 3 times exit
>>> loop */
>>>                     printf("Poller returned unknown error %d", errno);
>>>                     printf("%s\n", strerror(errno));
>>>                     unknown_err_count++;
>>>                     if(unknown_err_count == 3) {
>>>                         break;
>>>                     }
>>>                 }
>>>             }
>>>         }
>>>
>>>         printf("Subscriber thread closed\n");
>>>         zmq_close (subscriber);
>>>         zmq_ctx_destroy (zcontext);
>>>
>>>         return (void *)0;
>>> }
>>>
>>>
>>> Best Regards,
>>> Martin.
>>>
>>> _______________________________________________
>>> zeromq-dev mailing list
>>> zeromq-dev at lists.zeromq.org
>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev




More information about the zeromq-dev mailing list