[zeromq-dev] Mac OS X: test_shutdown_stress sometimes fails

Dhammika Pathirana dhammika at gmail.com
Mon Dec 6 08:57:20 CET 2010


On Fri, Dec 3, 2010 at 2:02 AM, Martin Sustrik <sustrik at 250bpm.com> wrote:
> Hi Dhammika,
>
>> Patch attached.
>
> I've checked the patch. Shouldn't the 'dispatch' be called when the engine
> is actually being moved to another thread (inside finalise_initialisation
> function) rather than in 'read' function?
>


New patch attached.
I separated it into two functions, should be more clear now.
Engine is dispatched in zmq_init:flush().


>From c549459dcc4cd796f8d65ff5c5e1f40715c4fb13 Mon Sep 17 00:00:00 2001
From: dhammika <dhammika at gmail.com>
Date: Sun, 5 Dec 2010 22:57:00 -0800
Subject: [PATCH] fix race condition in session init


Signed-off-by: dhammika <dhammika at gmail.com>
---
 src/zmq_engine.cpp |   12 +++++++++++-
 src/zmq_engine.hpp |    2 ++
 src/zmq_init.cpp   |   40 +++++++++++++++++++++++-----------------
 src/zmq_init.hpp   |    3 +++
 4 files changed, 39 insertions(+), 18 deletions(-)

diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 0c1070d..746d60f 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -40,6 +40,7 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const
options_t &options_) :
     outsize (0),
     encoder (out_batch_size),
     inout (NULL),
+    ephemeral_inout (NULL),
     options (options_),
     plugged (false)
 {
@@ -57,8 +58,9 @@ void zmq::zmq_engine_t::plug (io_thread_t
*io_thread_, i_inout *inout_)
 {
     zmq_assert (!plugged);
     plugged = true;
+    ephemeral_inout = NULL;

-    //  Conncet to session/init object.
+    //  Connect to session/init object.
     zmq_assert (!inout);
     zmq_assert (inout_);
     encoder.set_inout (inout_);
@@ -89,6 +91,7 @@ void zmq::zmq_engine_t::unplug ()
     //  Disconnect from init/session object.
     encoder.set_inout (NULL);
     decoder.set_inout (NULL);
+    ephemeral_inout = inout;
     inout = NULL;
 }

@@ -152,6 +155,13 @@ void zmq::zmq_engine_t::out_event ()

         outpos = NULL;
         encoder.get_data (&outpos, &outsize);
+
+        //  If IO handler has detached engine, flush any pending IO.
+        if (!plugged) {
+            zmq_assert (ephemeral_inout);
+            ephemeral_inout->flush ();
+            return;
+        }

         //  If there is no data to send, stop polling for output.
         if (outsize == 0) {
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index c5f95dc..4847324 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -69,6 +69,8 @@ namespace zmq
         encoder_t encoder;

         i_inout *inout;
+        //  Detached transient inout handler.
+        i_inout *ephemeral_inout;

         options_t options;

diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index a796faa..8b8f110 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -34,6 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
       socket_base_t *socket_, session_t *session_, fd_t fd_,
       const options_t &options_) :
     own_t (io_thread_, options_),
+    ephemeral_engine (NULL),
     sent (false),
     received (false),
     socket (socket_),
@@ -64,8 +65,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
         options.identity.size ());
     sent = true;

-    //  If initialisation is done, pass the engine to the session and
-    //  destroy the init object.
+    //  Initialization is done, prepare to dispatch engine.
     finalise_initialisation ();

     return true;
@@ -101,9 +101,9 @@ void zmq::zmq_init_t::flush ()
     if (!received)
         return;

-    //  If initialisation is done, pass the engine to the session and
-    //  destroy the init object.
-    finalise_initialisation ();
+    //  Initialization is done, dispatch engine.
+    if (ephemeral_engine)
+        dispatch_engine ();
 }

 void zmq::zmq_init_t::detach ()
@@ -136,6 +136,20 @@ void zmq::zmq_init_t::process_unplug ()
 void zmq::zmq_init_t::finalise_initialisation ()
 {
     if (sent && received) {
+        //  Unplug and prepare to dispatch engine.
+        ephemeral_engine = engine;
+        engine = NULL;
+        ephemeral_engine->unplug ();
+        return;
+    }
+}
+
+void zmq::zmq_init_t::dispatch_engine ()
+{
+    if (sent && received) {
+        //  Engine must be detached.
+        zmq_assert (!engine);
+        zmq_assert (ephemeral_engine);

         //  If we know what session we belong to, it's easy, just send the
         //  engine to that session and destroy the init object. Note that we
@@ -143,9 +157,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
         //  lifetime of this object in contained in the lifetime of the session
         //  so the pointer cannot become invalid without notice.
         if (session) {
-            engine->unplug ();
-            send_attach (session, engine, peer_identity, true);
-            engine = NULL;
+            send_attach (session, ephemeral_engine, peer_identity, true);
             terminate ();
             return;
         }
@@ -165,9 +177,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
             zmq_assert (session);
             session->inc_seqnum ();
             launch_sibling (session);
-            engine->unplug ();
-            send_attach (session, engine, peer_identity, false);
-            engine = NULL;
+            send_attach (session, ephemeral_engine, peer_identity, false);
             terminate ();
             return;
         }
@@ -178,9 +188,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
         //  than by send_attach.
         session = socket->find_session (peer_identity);
         if (session) {
-            engine->unplug ();
-            send_attach (session, engine, peer_identity, false);
-            engine = NULL;
+            send_attach (session, ephemeral_engine, peer_identity, false);
             terminate ();
             return;
         }
@@ -194,9 +202,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
         zmq_assert (session);
         session->inc_seqnum ();
         launch_sibling (session);
-        engine->unplug ();
-        send_attach (session, engine, peer_identity, false);
-        engine = NULL;
+        send_attach (session, ephemeral_engine, peer_identity, false);
         terminate ();
         return;
     }
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index 511f141..c5aa459 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -44,6 +44,7 @@ namespace zmq
     private:

         void finalise_initialisation ();
+        void dispatch_engine ();

         //  i_inout interface implementation.
         bool read (::zmq_msg_t *msg_);
@@ -57,6 +58,8 @@ namespace zmq

         //  Associated wire-protocol engine.
         i_engine *engine;
+        //  Detached transient engine.
+        i_engine *ephemeral_engine;

         //  True if our own identity was already sent to the peer.
         bool sent;
-- 
1.7.0.4



Dhammika



More information about the zeromq-dev mailing list