[zeromq-dev] Mac OS X: test_shutdown_stress sometimes fails
Dhammika Pathirana
dhammika at gmail.com
Mon Dec 6 08:57:20 CET 2010
On Fri, Dec 3, 2010 at 2:02 AM, Martin Sustrik <sustrik at 250bpm.com> wrote:
> Hi Dhammika,
>
>> Patch attached.
>
> I've checked the patch. Shouldn't the 'dispatch' be called when the engine
> is actually being moved to another thread (inside finalise_initialisation
> function) rather than in 'read' function?
>
New patch attached.
I separated it into two functions, should be more clear now.
Engine is dispatched in zmq_init:flush().
>From c549459dcc4cd796f8d65ff5c5e1f40715c4fb13 Mon Sep 17 00:00:00 2001
From: dhammika <dhammika at gmail.com>
Date: Sun, 5 Dec 2010 22:57:00 -0800
Subject: [PATCH] fix race condition in session init
Signed-off-by: dhammika <dhammika at gmail.com>
---
src/zmq_engine.cpp | 12 +++++++++++-
src/zmq_engine.hpp | 2 ++
src/zmq_init.cpp | 40 +++++++++++++++++++++++-----------------
src/zmq_init.hpp | 3 +++
4 files changed, 39 insertions(+), 18 deletions(-)
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 0c1070d..746d60f 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -40,6 +40,7 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const
options_t &options_) :
outsize (0),
encoder (out_batch_size),
inout (NULL),
+ ephemeral_inout (NULL),
options (options_),
plugged (false)
{
@@ -57,8 +58,9 @@ void zmq::zmq_engine_t::plug (io_thread_t
*io_thread_, i_inout *inout_)
{
zmq_assert (!plugged);
plugged = true;
+ ephemeral_inout = NULL;
- // Conncet to session/init object.
+ // Connect to session/init object.
zmq_assert (!inout);
zmq_assert (inout_);
encoder.set_inout (inout_);
@@ -89,6 +91,7 @@ void zmq::zmq_engine_t::unplug ()
// Disconnect from init/session object.
encoder.set_inout (NULL);
decoder.set_inout (NULL);
+ ephemeral_inout = inout;
inout = NULL;
}
@@ -152,6 +155,13 @@ void zmq::zmq_engine_t::out_event ()
outpos = NULL;
encoder.get_data (&outpos, &outsize);
+
+ // If IO handler has detached engine, flush any pending IO.
+ if (!plugged) {
+ zmq_assert (ephemeral_inout);
+ ephemeral_inout->flush ();
+ return;
+ }
// If there is no data to send, stop polling for output.
if (outsize == 0) {
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index c5f95dc..4847324 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -69,6 +69,8 @@ namespace zmq
encoder_t encoder;
i_inout *inout;
+ // Detached transient inout handler.
+ i_inout *ephemeral_inout;
options_t options;
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index a796faa..8b8f110 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -34,6 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
socket_base_t *socket_, session_t *session_, fd_t fd_,
const options_t &options_) :
own_t (io_thread_, options_),
+ ephemeral_engine (NULL),
sent (false),
received (false),
socket (socket_),
@@ -64,8 +65,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
options.identity.size ());
sent = true;
- // If initialisation is done, pass the engine to the session and
- // destroy the init object.
+ // Initialization is done, prepare to dispatch engine.
finalise_initialisation ();
return true;
@@ -101,9 +101,9 @@ void zmq::zmq_init_t::flush ()
if (!received)
return;
- // If initialisation is done, pass the engine to the session and
- // destroy the init object.
- finalise_initialisation ();
+ // Initialization is done, dispatch engine.
+ if (ephemeral_engine)
+ dispatch_engine ();
}
void zmq::zmq_init_t::detach ()
@@ -136,6 +136,20 @@ void zmq::zmq_init_t::process_unplug ()
void zmq::zmq_init_t::finalise_initialisation ()
{
if (sent && received) {
+ // Unplug and prepare to dispatch engine.
+ ephemeral_engine = engine;
+ engine = NULL;
+ ephemeral_engine->unplug ();
+ return;
+ }
+}
+
+void zmq::zmq_init_t::dispatch_engine ()
+{
+ if (sent && received) {
+ // Engine must be detached.
+ zmq_assert (!engine);
+ zmq_assert (ephemeral_engine);
// If we know what session we belong to, it's easy, just send the
// engine to that session and destroy the init object. Note that we
@@ -143,9 +157,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
// lifetime of this object in contained in the lifetime of the session
// so the pointer cannot become invalid without notice.
if (session) {
- engine->unplug ();
- send_attach (session, engine, peer_identity, true);
- engine = NULL;
+ send_attach (session, ephemeral_engine, peer_identity, true);
terminate ();
return;
}
@@ -165,9 +177,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
zmq_assert (session);
session->inc_seqnum ();
launch_sibling (session);
- engine->unplug ();
- send_attach (session, engine, peer_identity, false);
- engine = NULL;
+ send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
@@ -178,9 +188,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
// than by send_attach.
session = socket->find_session (peer_identity);
if (session) {
- engine->unplug ();
- send_attach (session, engine, peer_identity, false);
- engine = NULL;
+ send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
@@ -194,9 +202,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
zmq_assert (session);
session->inc_seqnum ();
launch_sibling (session);
- engine->unplug ();
- send_attach (session, engine, peer_identity, false);
- engine = NULL;
+ send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index 511f141..c5aa459 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -44,6 +44,7 @@ namespace zmq
private:
void finalise_initialisation ();
+ void dispatch_engine ();
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
@@ -57,6 +58,8 @@ namespace zmq
// Associated wire-protocol engine.
i_engine *engine;
+ // Detached transient engine.
+ i_engine *ephemeral_engine;
// True if our own identity was already sent to the peer.
bool sent;
--
1.7.0.4
Dhammika
More information about the zeromq-dev
mailing list