[zeromq-dev] [PATCH] Fix potential race when terminating io threads in ctx_t destructor.
Martin Townsend
martin.townsend at xsilon.com
Wed Jul 2 21:37:43 CEST 2014
I've just remembered there are pollers other than epoll so this patch
will break them. I would be interested if the patch fixes Issue #795 so
I've cc'ed the author and if it does I'll fix up a patch that caters for
all pollers.
Best Regards,
Martin.
On 02/07/14 10:05, Martin Townsend wrote:
> After stopping the IO threads wait for the associated poller to stop
> before destroying them to avoid get_load () == 0 assertion.
> Potentially a fix for Issue #795
>
> ---
> src/ctx.cpp | 12 ++++++++++--
> src/epoll.cpp | 20 +++++++++++++++++++-
> src/epoll.hpp | 4 ++++
> src/io_thread.cpp | 5 +++++
> src/io_thread.hpp | 3 +++
> 5 files changed, 41 insertions(+), 3 deletions(-)
>
> diff --git a/src/ctx.cpp b/src/ctx.cpp
> index 0155ef2..72db140 100644
> --- a/src/ctx.cpp
> +++ b/src/ctx.cpp
> @@ -80,8 +80,16 @@ zmq::ctx_t::~ctx_t ()
> io_threads [i]->stop ();
>
> // Wait till I/O threads actually terminate.
> - for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
> - delete io_threads [i];
> + for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
> + int j = 0;
> +
> + while (!io_threads [i]->stopped ()) {
> + sleep (1);
> + if (++j == 5)
> + break;
> + }
> + delete io_threads [i];
> + }
>
> // Deallocate the reaper thread object.
> delete reaper;
> diff --git a/src/epoll.cpp b/src/epoll.cpp
> index 324d4bd..2c3a1b9 100644
> --- a/src/epoll.cpp
> +++ b/src/epoll.cpp
> @@ -33,7 +33,8 @@
> #include "i_poll_events.hpp"
>
> zmq::epoll_t::epoll_t () :
> - stopping (false)
> + stopping (false),
> + poller_running (false)
> {
> epoll_fd = epoll_create (1);
> errno_assert (epoll_fd != -1);
> @@ -41,12 +42,22 @@ zmq::epoll_t::epoll_t () :
>
> zmq::epoll_t::~epoll_t ()
> {
> + int i;
> +
> // Wait till the worker thread exits.
> worker.stop ();
>
> close (epoll_fd);
> for (retired_t::iterator it = retired.begin (); it != retired.end
> (); ++it)
> delete *it;
> +
> + // Allow 5 seconds for the poller worker thread to exit.
> + i = 0;
> + while (poller_running) {
> + sleep (1);
> + if (++i == 5)
> + break;
> + }
> }
>
> zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events
> *events_)
> @@ -126,6 +137,11 @@ void zmq::epoll_t::stop ()
> stopping = true;
> }
>
> +bool zmq::epoll_t::stopped ()
> +{
> + return !poller_running;
> +}
> +
> int zmq::epoll_t::max_fds ()
> {
> return -1;
> @@ -135,6 +151,7 @@ void zmq::epoll_t::loop ()
> {
> epoll_event ev_buf [max_io_events];
>
> + poller_running = true;
> while (!stopping) {
>
> // Execute any due timers.
> @@ -171,6 +188,7 @@ void zmq::epoll_t::loop ()
> delete *it;
> retired.clear ();
> }
> + poller_running = false;
> }
>
> void zmq::epoll_t::worker_routine (void *arg_)
> diff --git a/src/epoll.hpp b/src/epoll.hpp
> index fbf871c..4a29c48 100644
> --- a/src/epoll.hpp
> +++ b/src/epoll.hpp
> @@ -57,6 +57,7 @@ namespace zmq
> void reset_pollout (handle_t handle_);
> void start ();
> void stop ();
> + bool stopped ();
>
> static int max_fds ();
>
> @@ -85,6 +86,9 @@ namespace zmq
> // If true, thread is in the process of shutting down.
> bool stopping;
>
> + // If true, poller worker thread is running.
> + bool poller_running;
> +
> // Handle of the physical thread doing the I/O work.
> thread_t worker;
>
> diff --git a/src/io_thread.cpp b/src/io_thread.cpp
> index b05b56b..59295a0 100644
> --- a/src/io_thread.cpp
> +++ b/src/io_thread.cpp
> @@ -50,6 +50,11 @@ void zmq::io_thread_t::stop ()
> send_stop ();
> }
>
> +bool zmq::io_thread_t::stopped ()
> +{
> + return poller->stopped ();
> +}
> +
> zmq::mailbox_t *zmq::io_thread_t::get_mailbox ()
> {
> return &mailbox;
> diff --git a/src/io_thread.hpp b/src/io_thread.hpp
> index 49a2e80..e4c7797 100644
> --- a/src/io_thread.hpp
> +++ b/src/io_thread.hpp
> @@ -52,6 +52,9 @@ namespace zmq
> // Ask underlying thread to stop.
> void stop ();
>
> + // Has it stopped, used after calling stop above.
> + bool stopped ();
> +
> // Returns mailbox associated with this I/O thread.
> mailbox_t *get_mailbox ();
>
More information about the zeromq-dev
mailing list