[zeromq-dev] patch: handle idle connections

Dhammika Pathirana dhammika at gmail.com
Sun Mar 29 04:40:06 CEST 2009


Hi,

Updated patch.
Builds and works on linux, I don't have access to other platforms.

Dhammika



On Fri, Mar 27, 2009 at 1:41 AM, Dhammika Pathirana <dhammika at gmail.com> wrote:
>>
>> What about removing connection x like this:
>>
>> if (x < active) {
>>    std::swap (pipes [x], pipes [A-1]);
>>    x = A = A - 1;
>> }
>> std::swap (pipes [x], pipes [pipes.size () - 1]);
>> pipes.resize (pipes.size () - 1);
>>
>
>
> This'll work, but can we guarantee fairness?
> I'll update the patch and will post it during weekend.
>
>
> Dhammika
>
-------------- next part --------------
Index: pipe.cpp
===================================================================
--- pipe.cpp	(revision 1175)
+++ pipe.cpp	(working copy)
@@ -27,7 +27,6 @@
     source_engine (source_engine_),
     destination_thread (destination_thread_),
     destination_engine (destination_engine_),
-    alive (true),
     head (0),
     last_head_position (0),
     delayed_gap (false),
@@ -140,12 +139,6 @@
     flush ();
 }
 
-void zmq::pipe_t::revive ()
-{
-    assert (!alive);
-    alive = true;
-}
-
 void zmq::pipe_t::set_head (uint64_t position_)
 {
     //  This may cause the next write to succeed.
@@ -178,13 +171,8 @@
 
 bool zmq::pipe_t::read (raw_message_t *msg_)
 {
-    //  If the pipe is dead, there's nothing we can do.
-    if (!alive)
-        return false;
-
     //  Get next message, if it's not there, die.
     if (!pipe.read (msg_)) {
-        alive = false;
         return false;
     }
 
Index: mux.cpp
===================================================================
--- mux.cpp	(revision 1175)
+++ mux.cpp	(working copy)
@@ -21,6 +21,7 @@
 #include <zmq/raw_message.hpp>
 
 zmq::mux_t::mux_t () :
+    active (0),
     current (0)
 {
 }
@@ -33,8 +34,26 @@
 {
     //  Associate new pipe with the mux object.
     pipes.push_back (pipe_);
+    ++active;
+    if (pipes.size () > active)
+        std::swap (pipes [pipes.size () - 1], pipes [active - 1]);
 }
 
+void zmq::mux_t::revive (pipe_t *pipe_)
+{
+    //  Revive an idle pipe.
+    for (pipes_t::size_type i = pipes.size (); i-- > active; ) {
+        if (pipes [i] == pipe_) {
+            std::swap(pipes [i], pipes [active]);
+            ++active;
+            return;
+        }
+    }
+
+    //  There's a bug in revive mechanism!
+    assert (false);
+}
+
 bool zmq::mux_t::read (message_t *msg_)
 {
     //  Underlying layers work with raw_message_t, layers above use message_t.
@@ -44,17 +63,21 @@
     //  Deallocate old content of the message.
     raw_message_destroy (msg);
 
-    //  Round-robin over the pipes to get next message.
-    for (int to_process = pipes.size (); to_process != 0; to_process --) {
+    //  Round-robin over the active pipes to get next message.
+    for (pipes_t::size_type i = active; i > 0; --i) {
 
+        //  Update current.
+        current = (current + 1) % active;
+        
+        //  Try to read from current.
         bool retrieved = pipes [current]->read ((raw_message_t*) msg_);
-
-        current ++;
-        if (current == pipes.size ())
-            current = 0;
-
+        
         if (retrieved)
             return true;
+
+        //  Move to idle list.
+        std::swap (pipes [current], pipes [active - 1]);
+        --active;
     }
 
     //  No message is available. Initialise the output parameter
@@ -70,18 +93,24 @@
 
 void zmq::mux_t::release_pipe (pipe_t *pipe_)
 {
-    for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
-        if (*it == pipe_) {
+    for (pipes_t::size_type i = pipes.size (); i-- > 0; ) {
+        if (pipes [i] == pipe_) {
 
             //  At this point pipe is physically destroyed.
-            delete *it;
+            delete pipes [i];
 
-            //  Remove the pipe from the list.
-            pipes.erase (it);
-            if (current == pipes.size ())
-                current = 0;
+            //  Remove from active.
+            if (i < active) {
+                std::swap (pipes [i], pipes [active - 1]);
+                i = --active;
+            }
+            
+            //  Move to end of idle list and remove.
+            std::swap (pipes [i], pipes [pipes.size () - 1]);
+            pipes.pop_back ();
             return;
         }
+    }
 
     //  There's a bug in shut down mechanism!
     assert (false);
Index: zmq/engine_base.hpp
===================================================================
--- zmq/engine_base.hpp	(revision 1175)
+++ zmq/engine_base.hpp	(working copy)
@@ -61,7 +61,7 @@
             //  Notify the reader of the pipe that there are messages
             //  available in the pipe.
             assert (HAS_OUT);
-            pipe_->revive ();
+            mux.revive (pipe_);
         }
 
         void head (pipe_t *pipe_, int64_t position_)
Index: zmq/pipe.hpp
===================================================================
--- zmq/pipe.hpp	(revision 1175)
+++ zmq/pipe.hpp	(working copy)
@@ -100,9 +100,6 @@
         i_thread *destination_thread;
         i_engine *destination_engine;
 
-        //  If true we can read messages from the underlying ypipe.
-        bool alive;
-
         //  If hwm is non-zero, the size of pipe is limited. In that case hwm
         //  is the high water mark for the pipe and lwm is the low water mark.
         int64_t hwm;
Index: zmq/mux.hpp
===================================================================
--- zmq/mux.hpp	(revision 1175)
+++ zmq/mux.hpp	(working copy)
@@ -41,6 +41,9 @@
         //  Adds a pipe to receive messages from.
         void receive_from (pipe_t *pipe_);
 
+        //  Revives a stalled pipe.
+        void revive (pipe_t *pipe_);
+
         //  Returns a message, if available. If not, returns false.
         bool read (message_t *msg_);
 
@@ -56,8 +59,11 @@
     private:
 
         //  The list of inbound pipes.
-        typedef std::vector <pipe_t*> pipes_t;
+        typedef std::vector <pipe_t *> pipes_t;
         pipes_t pipes;
+        
+        //  The number of active pipes.
+        pipes_t::size_type active;
 
         //  Pipe to retrieve next message from. The messages are retrieved
         //  from the pipes in round-robin fashion (a.k.a. fair queueing).


More information about the zeromq-dev mailing list