[zeromq-dev] patch: handle idle connections
Dhammika Pathirana
dhammika at gmail.com
Tue Mar 24 09:21:08 CET 2009
Hi,
A patch to improve handling idle connections.
I did bit of testing on linux.
Dhammika
-------------- next part --------------
Index: libzmq/pipe.cpp
===================================================================
--- libzmq/pipe.cpp (revision 1175)
+++ libzmq/pipe.cpp (working copy)
@@ -27,7 +27,6 @@
source_engine (source_engine_),
destination_thread (destination_thread_),
destination_engine (destination_engine_),
- alive (true),
head (0),
last_head_position (0),
delayed_gap (false),
@@ -140,12 +139,6 @@
flush ();
}
-void zmq::pipe_t::revive ()
-{
- assert (!alive);
- alive = true;
-}
-
void zmq::pipe_t::set_head (uint64_t position_)
{
// This may cause the next write to succeed.
@@ -178,13 +171,8 @@
bool zmq::pipe_t::read (raw_message_t *msg_)
{
- // If the pipe is dead, there's nothing we can do.
- if (!alive)
- return false;
-
// Get next message, if it's not there, die.
if (!pipe.read (msg_)) {
- alive = false;
return false;
}
Index: libzmq/mux.cpp
===================================================================
--- libzmq/mux.cpp (revision 1175)
+++ libzmq/mux.cpp (working copy)
@@ -19,9 +19,10 @@
#include <zmq/mux.hpp>
#include <zmq/raw_message.hpp>
+#include <algorithm>
zmq::mux_t::mux_t () :
- current (0)
+ current (active_pipes.end ())
{
}
@@ -33,8 +34,17 @@
{
// Associate new pipe with the mux object.
pipes.push_back (pipe_);
+ active_pipes.push_back (pipe_);
+ if (current == active_pipes.end ())
+ current = active_pipes.begin ();
}
+void zmq::mux_t::revive (pipe_t *pipe_)
+{
+ // Revive a stalled pipe.
+ active_pipes.push_back (pipe_);
+}
+
bool zmq::mux_t::read (message_t *msg_)
{
// Underlying layers work with raw_message_t, layers above use message_t.
@@ -44,17 +54,20 @@
// Deallocate old content of the message.
raw_message_destroy (msg);
- // Round-robin over the pipes to get next message.
- for (int to_process = pipes.size (); to_process != 0; to_process --) {
-
- bool retrieved = pipes [current]->read ((raw_message_t*) msg_);
-
- current ++;
- if (current == pipes.size ())
- current = 0;
-
- if (retrieved)
+ // Round-robin over the active pipes to get next message.
+ while (!active_pipes.empty ()) {
+ if (current == active_pipes.end ())
+ current = active_pipes.begin ();
+
+ bool retrieved = (*current)->read ((raw_message_t*) msg_);
+
+ if (retrieved) {
+ current++;
return true;
+ }
+
+ // Couldn't retrieve message, remove from active list.
+ current = active_pipes.erase (current);
}
// No message is available. Initialise the output parameter
@@ -65,23 +78,32 @@
bool zmq::mux_t::empty ()
{
- return pipes.empty ();
+ return (pipes.empty ());
}
void zmq::mux_t::release_pipe (pipe_t *pipe_)
{
- for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
- if (*it == pipe_) {
+ pipes_t::iterator it;
+
+ // Remove the pipe from active list.
+ it = find (active_pipes.begin (), active_pipes.end (), pipe_);
+ if (it != active_pipes.end ()) {
+ // Update current.
+ if (current == it)
+ current = active_pipes.erase (current);
+ else
+ active_pipes.erase (it);
+ }
- // At this point pipe is physically destroyed.
- delete *it;
+ // Remove the pipe from inbound list.
+ it = find (pipes.begin (), pipes.end (), pipe_);
+ if (it != pipes.end ()) {
+ // At this point pipe is physically destroyed.
+ delete *it;
- // Remove the pipe from the list.
- pipes.erase (it);
- if (current == pipes.size ())
- current = 0;
- return;
- }
+ pipes.erase (it);
+ return;
+ }
// There's a bug in shut down mechanism!
assert (false);
Index: libzmq/zmq/engine_base.hpp
===================================================================
--- libzmq/zmq/engine_base.hpp (revision 1175)
+++ libzmq/zmq/engine_base.hpp (working copy)
@@ -61,7 +61,7 @@
// Notify the reader of the pipe that there are messages
// available in the pipe.
assert (HAS_OUT);
- pipe_->revive ();
+ mux.revive (pipe_);
}
void head (pipe_t *pipe_, int64_t position_)
Index: libzmq/zmq/pipe.hpp
===================================================================
--- libzmq/zmq/pipe.hpp (revision 1175)
+++ libzmq/zmq/pipe.hpp (working copy)
@@ -100,9 +100,6 @@
i_thread *destination_thread;
i_engine *destination_engine;
- // If true we can read messages from the underlying ypipe.
- bool alive;
-
// If hwm is non-zero, the size of pipe is limited. In that case hwm
// is the high water mark for the pipe and lwm is the low water mark.
int64_t hwm;
Index: libzmq/zmq/mux.hpp
===================================================================
--- libzmq/zmq/mux.hpp (revision 1175)
+++ libzmq/zmq/mux.hpp (working copy)
@@ -21,7 +21,7 @@
#define __ZMQ_MUX_HPP_INCLUDED__
#include <assert.h>
-#include <vector>
+#include <list>
#include <zmq/message.hpp>
#include <zmq/pipe.hpp>
@@ -41,6 +41,9 @@
// Adds a pipe to receive messages from.
void receive_from (pipe_t *pipe_);
+ // Revives a stalled pipe.
+ void revive (pipe_t *pipe_);
+
// Returns a message, if available. If not, returns false.
bool read (message_t *msg_);
@@ -56,12 +59,15 @@
private:
// The list of inbound pipes.
- typedef std::vector <pipe_t*> pipes_t;
+ typedef std::list <pipe_t*> pipes_t;
pipes_t pipes;
+
+ // The list of active pipes.
+ pipes_t active_pipes;
// Pipe to retrieve next message from. The messages are retrieved
// from the pipes in round-robin fashion (a.k.a. fair queueing).
- pipes_t::size_type current;
+ pipes_t::iterator current;
mux_t (const mux_t&);
void operator = (const mux_t&);
More information about the zeromq-dev
mailing list