[zeromq-dev] [PATCH] Fix potential race when terminating io threads in ctx_t destructor.

Martin Townsend martin.townsend at xsilon.com
Wed Jul 2 21:37:43 CEST 2014


I've just remembered there are pollers other than epoll so this patch 
will break them.  I would be interested if the patch fixes Issue #795 so 
I've cc'ed the author and if it does I'll fix up a patch that caters for 
all pollers.

Best Regards,
Martin.

On 02/07/14 10:05, Martin Townsend wrote:
> After stopping the IO threads wait for the associated poller to stop
> before destroying them to avoid get_load () == 0 assertion.
> Potentially a fix for Issue #795
>
> ---
>    src/ctx.cpp       | 12 ++++++++++--
>    src/epoll.cpp     | 20 +++++++++++++++++++-
>    src/epoll.hpp     |  4 ++++
>    src/io_thread.cpp |  5 +++++
>    src/io_thread.hpp |  3 +++
>    5 files changed, 41 insertions(+), 3 deletions(-)
>
> diff --git a/src/ctx.cpp b/src/ctx.cpp
> index 0155ef2..72db140 100644
> --- a/src/ctx.cpp
> +++ b/src/ctx.cpp
> @@ -80,8 +80,16 @@ zmq::ctx_t::~ctx_t ()
>            io_threads [i]->stop ();
>
>        //  Wait till I/O threads actually terminate.
> -    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
> -        delete io_threads [i];
> +    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
> +        int j = 0;
> +
> +        while (!io_threads [i]->stopped ()) {
> +            sleep (1);
> +            if (++j == 5)
> +                break;
> +        }
> +        delete io_threads [i];
> +    }
>
>        //  Deallocate the reaper thread object.
>        delete reaper;
> diff --git a/src/epoll.cpp b/src/epoll.cpp
> index 324d4bd..2c3a1b9 100644
> --- a/src/epoll.cpp
> +++ b/src/epoll.cpp
> @@ -33,7 +33,8 @@
>    #include "i_poll_events.hpp"
>
>    zmq::epoll_t::epoll_t () :
> -    stopping (false)
> +    stopping (false),
> +    poller_running (false)
>    {
>        epoll_fd = epoll_create (1);
>        errno_assert (epoll_fd != -1);
> @@ -41,12 +42,22 @@ zmq::epoll_t::epoll_t () :
>
>    zmq::epoll_t::~epoll_t ()
>    {
> +    int i;
> +
>        //  Wait till the worker thread exits.
>        worker.stop ();
>
>        close (epoll_fd);
>        for (retired_t::iterator it = retired.begin (); it != retired.end
> (); ++it)
>            delete *it;
> +
> +    // Allow 5 seconds for the poller worker thread to exit.
> +    i = 0;
> +    while (poller_running) {
> +        sleep (1);
> +        if (++i == 5)
> +            break;
> +    }
>    }
>
>    zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events
> *events_)
> @@ -126,6 +137,11 @@ void zmq::epoll_t::stop ()
>        stopping = true;
>    }
>
> +bool zmq::epoll_t::stopped ()
> +{
> +    return !poller_running;
> +}
> +
>    int zmq::epoll_t::max_fds ()
>    {
>        return -1;
> @@ -135,6 +151,7 @@ void zmq::epoll_t::loop ()
>    {
>        epoll_event ev_buf [max_io_events];
>
> +    poller_running = true;
>        while (!stopping) {
>
>            //  Execute any due timers.
> @@ -171,6 +188,7 @@ void zmq::epoll_t::loop ()
>                delete *it;
>            retired.clear ();
>        }
> +    poller_running = false;
>    }
>
>    void zmq::epoll_t::worker_routine (void *arg_)
> diff --git a/src/epoll.hpp b/src/epoll.hpp
> index fbf871c..4a29c48 100644
> --- a/src/epoll.hpp
> +++ b/src/epoll.hpp
> @@ -57,6 +57,7 @@ namespace zmq
>            void reset_pollout (handle_t handle_);
>            void start ();
>            void stop ();
> +        bool stopped ();
>
>            static int max_fds ();
>
> @@ -85,6 +86,9 @@ namespace zmq
>            //  If true, thread is in the process of shutting down.
>            bool stopping;
>
> +        //  If true, poller worker thread is running.
> +        bool poller_running;
> +
>            //  Handle of the physical thread doing the I/O work.
>            thread_t worker;
>
> diff --git a/src/io_thread.cpp b/src/io_thread.cpp
> index b05b56b..59295a0 100644
> --- a/src/io_thread.cpp
> +++ b/src/io_thread.cpp
> @@ -50,6 +50,11 @@ void zmq::io_thread_t::stop ()
>        send_stop ();
>    }
>
> +bool zmq::io_thread_t::stopped ()
> +{
> +    return poller->stopped ();
> +}
> +
>    zmq::mailbox_t *zmq::io_thread_t::get_mailbox ()
>    {
>        return &mailbox;
> diff --git a/src/io_thread.hpp b/src/io_thread.hpp
> index 49a2e80..e4c7797 100644
> --- a/src/io_thread.hpp
> +++ b/src/io_thread.hpp
> @@ -52,6 +52,9 @@ namespace zmq
>            //  Ask underlying thread to stop.
>            void stop ();
>
> +        //  Has it stopped, used after calling stop above.
> +        bool stopped ();
> +
>            //  Returns mailbox associated with this I/O thread.
>            mailbox_t *get_mailbox ();
>




More information about the zeromq-dev mailing list