[zeromq-dev] patch: handle idle connections

Dhammika Pathirana dhammika at gmail.com
Tue Mar 24 09:21:08 CET 2009


Hi,

A patch to improve handling idle connections.
I did bit of testing on linux.

Dhammika
-------------- next part --------------
Index: libzmq/pipe.cpp
===================================================================
--- libzmq/pipe.cpp	(revision 1175)
+++ libzmq/pipe.cpp	(working copy)
@@ -27,7 +27,6 @@
     source_engine (source_engine_),
     destination_thread (destination_thread_),
     destination_engine (destination_engine_),
-    alive (true),
     head (0),
     last_head_position (0),
     delayed_gap (false),
@@ -140,12 +139,6 @@
     flush ();
 }
 
-void zmq::pipe_t::revive ()
-{
-    assert (!alive);
-    alive = true;
-}
-
 void zmq::pipe_t::set_head (uint64_t position_)
 {
     //  This may cause the next write to succeed.
@@ -178,13 +171,8 @@
 
 bool zmq::pipe_t::read (raw_message_t *msg_)
 {
-    //  If the pipe is dead, there's nothing we can do.
-    if (!alive)
-        return false;
-
     //  Get next message, if it's not there, die.
     if (!pipe.read (msg_)) {
-        alive = false;
         return false;
     }
 
Index: libzmq/mux.cpp
===================================================================
--- libzmq/mux.cpp	(revision 1175)
+++ libzmq/mux.cpp	(working copy)
@@ -19,9 +19,10 @@
 
 #include <zmq/mux.hpp>
 #include <zmq/raw_message.hpp>
+#include <algorithm>
 
 zmq::mux_t::mux_t () :
-    current (0)
+    current (active_pipes.end ())
 {
 }
 
@@ -33,8 +34,17 @@
 {
     //  Associate new pipe with the mux object.
     pipes.push_back (pipe_);
+    active_pipes.push_back (pipe_);
+    if (current == active_pipes.end ())
+        current = active_pipes.begin ();
 }
 
+void zmq::mux_t::revive (pipe_t *pipe_)
+{
+    //  Revive a stalled pipe.
+    active_pipes.push_back (pipe_);
+}
+
 bool zmq::mux_t::read (message_t *msg_)
 {
     //  Underlying layers work with raw_message_t, layers above use message_t.
@@ -44,17 +54,20 @@
     //  Deallocate old content of the message.
     raw_message_destroy (msg);
 
-    //  Round-robin over the pipes to get next message.
-    for (int to_process = pipes.size (); to_process != 0; to_process --) {
-
-        bool retrieved = pipes [current]->read ((raw_message_t*) msg_);
-
-        current ++;
-        if (current == pipes.size ())
-            current = 0;
-
-        if (retrieved)
+    //  Round-robin over the active pipes to get next message.
+    while (!active_pipes.empty ()) {
+        if (current == active_pipes.end ())
+            current = active_pipes.begin ();
+        
+        bool retrieved = (*current)->read ((raw_message_t*) msg_);
+        
+        if (retrieved) {
+            current++;
             return true;
+        }
+        
+        //  Couldn't retrieve message, remove from active list.
+        current = active_pipes.erase (current);
     }
 
     //  No message is available. Initialise the output parameter
@@ -65,23 +78,32 @@
 
 bool zmq::mux_t::empty ()
 {
-    return pipes.empty ();
+    return (pipes.empty ());
 }
 
 void zmq::mux_t::release_pipe (pipe_t *pipe_)
 {
-    for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
-        if (*it == pipe_) {
+    pipes_t::iterator it;
+    
+    //  Remove the pipe from active list.
+    it = find (active_pipes.begin (), active_pipes.end (), pipe_);
+    if (it != active_pipes.end ()) {
+        //  Update current.
+        if (current == it)
+            current = active_pipes.erase (current);
+        else 
+            active_pipes.erase (it);
+    }
 
-            //  At this point pipe is physically destroyed.
-            delete *it;
+    //  Remove the pipe from inbound list.
+    it = find (pipes.begin (), pipes.end (), pipe_);
+    if (it != pipes.end ()) {
+        //  At this point pipe is physically destroyed.
+        delete *it;
 
-            //  Remove the pipe from the list.
-            pipes.erase (it);
-            if (current == pipes.size ())
-                current = 0;
-            return;
-        }
+        pipes.erase (it);
+        return;
+    }
 
     //  There's a bug in shut down mechanism!
     assert (false);
Index: libzmq/zmq/engine_base.hpp
===================================================================
--- libzmq/zmq/engine_base.hpp	(revision 1175)
+++ libzmq/zmq/engine_base.hpp	(working copy)
@@ -61,7 +61,7 @@
             //  Notify the reader of the pipe that there are messages
             //  available in the pipe.
             assert (HAS_OUT);
-            pipe_->revive ();
+            mux.revive (pipe_);
         }
 
         void head (pipe_t *pipe_, int64_t position_)
Index: libzmq/zmq/pipe.hpp
===================================================================
--- libzmq/zmq/pipe.hpp	(revision 1175)
+++ libzmq/zmq/pipe.hpp	(working copy)
@@ -100,9 +100,6 @@
         i_thread *destination_thread;
         i_engine *destination_engine;
 
-        //  If true we can read messages from the underlying ypipe.
-        bool alive;
-
         //  If hwm is non-zero, the size of pipe is limited. In that case hwm
         //  is the high water mark for the pipe and lwm is the low water mark.
         int64_t hwm;
Index: libzmq/zmq/mux.hpp
===================================================================
--- libzmq/zmq/mux.hpp	(revision 1175)
+++ libzmq/zmq/mux.hpp	(working copy)
@@ -21,7 +21,7 @@
 #define __ZMQ_MUX_HPP_INCLUDED__
 
 #include <assert.h>
-#include <vector>
+#include <list>
 
 #include <zmq/message.hpp>
 #include <zmq/pipe.hpp>
@@ -41,6 +41,9 @@
         //  Adds a pipe to receive messages from.
         void receive_from (pipe_t *pipe_);
 
+        //  Revives a stalled pipe.
+        void revive (pipe_t *pipe_);
+
         //  Returns a message, if available. If not, returns false.
         bool read (message_t *msg_);
 
@@ -56,12 +59,15 @@
     private:
 
         //  The list of inbound pipes.
-        typedef std::vector <pipe_t*> pipes_t;
+        typedef std::list <pipe_t*> pipes_t;
         pipes_t pipes;
+        
+        //  The list of active pipes.
+        pipes_t active_pipes;
 
         //  Pipe to retrieve next message from. The messages are retrieved
         //  from the pipes in round-robin fashion (a.k.a. fair queueing).
-        pipes_t::size_type current;
+        pipes_t::iterator current;
 
         mux_t (const mux_t&);
         void operator = (const mux_t&);


More information about the zeromq-dev mailing list