[zeromq-dev] patch: handle idle connections
Dhammika Pathirana
dhammika at gmail.com
Sun Mar 29 04:40:06 CEST 2009
Hi,
Updated patch.
Builds and works on linux, I don't have access to other platforms.
Dhammika
On Fri, Mar 27, 2009 at 1:41 AM, Dhammika Pathirana <dhammika at gmail.com> wrote:
>>
>> What about removing connection x like this:
>>
>> if (x < active) {
>> std::swap (pipes [x], pipes [A-1]);
>> x = A = A - 1;
>> }
>> std::swap (pipes [x], pipes [pipes.size () - 1]);
>> pipes.resize (pipes.size () - 1);
>>
>
>
> This'll work, but can we guarantee fairness?
> I'll update the patch and will post it during weekend.
>
>
> Dhammika
>
-------------- next part --------------
Index: pipe.cpp
===================================================================
--- pipe.cpp (revision 1175)
+++ pipe.cpp (working copy)
@@ -27,7 +27,6 @@
source_engine (source_engine_),
destination_thread (destination_thread_),
destination_engine (destination_engine_),
- alive (true),
head (0),
last_head_position (0),
delayed_gap (false),
@@ -140,12 +139,6 @@
flush ();
}
-void zmq::pipe_t::revive ()
-{
- assert (!alive);
- alive = true;
-}
-
void zmq::pipe_t::set_head (uint64_t position_)
{
// This may cause the next write to succeed.
@@ -178,13 +171,8 @@
bool zmq::pipe_t::read (raw_message_t *msg_)
{
- // If the pipe is dead, there's nothing we can do.
- if (!alive)
- return false;
-
// Get next message, if it's not there, die.
if (!pipe.read (msg_)) {
- alive = false;
return false;
}
Index: mux.cpp
===================================================================
--- mux.cpp (revision 1175)
+++ mux.cpp (working copy)
@@ -21,6 +21,7 @@
#include <zmq/raw_message.hpp>
zmq::mux_t::mux_t () :
+ active (0),
current (0)
{
}
@@ -33,8 +34,26 @@
{
// Associate new pipe with the mux object.
pipes.push_back (pipe_);
+ ++active;
+ if (pipes.size () > active)
+ std::swap (pipes [pipes.size () - 1], pipes [active - 1]);
}
+void zmq::mux_t::revive (pipe_t *pipe_)
+{
+ // Revive an idle pipe.
+ for (pipes_t::size_type i = pipes.size (); i-- > active; ) {
+ if (pipes [i] == pipe_) {
+ std::swap(pipes [i], pipes [active]);
+ ++active;
+ return;
+ }
+ }
+
+ // There's a bug in revive mechanism!
+ assert (false);
+}
+
bool zmq::mux_t::read (message_t *msg_)
{
// Underlying layers work with raw_message_t, layers above use message_t.
@@ -44,17 +63,21 @@
// Deallocate old content of the message.
raw_message_destroy (msg);
- // Round-robin over the pipes to get next message.
- for (int to_process = pipes.size (); to_process != 0; to_process --) {
+ // Round-robin over the active pipes to get next message.
+ for (pipes_t::size_type i = active; i > 0; --i) {
+ // Update current.
+ current = (current + 1) % active;
+
+ // Try to read from current.
bool retrieved = pipes [current]->read ((raw_message_t*) msg_);
-
- current ++;
- if (current == pipes.size ())
- current = 0;
-
+
if (retrieved)
return true;
+
+ // Move to idle list.
+ std::swap (pipes [current], pipes [active - 1]);
+ --active;
}
// No message is available. Initialise the output parameter
@@ -70,18 +93,24 @@
void zmq::mux_t::release_pipe (pipe_t *pipe_)
{
- for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
- if (*it == pipe_) {
+ for (pipes_t::size_type i = pipes.size (); i-- > 0; ) {
+ if (pipes [i] == pipe_) {
// At this point pipe is physically destroyed.
- delete *it;
+ delete pipes [i];
- // Remove the pipe from the list.
- pipes.erase (it);
- if (current == pipes.size ())
- current = 0;
+ // Remove from active.
+ if (i < active) {
+ std::swap (pipes [i], pipes [active - 1]);
+ i = --active;
+ }
+
+ // Move to end of idle list and remove.
+ std::swap (pipes [i], pipes [pipes.size () - 1]);
+ pipes.pop_back ();
return;
}
+ }
// There's a bug in shut down mechanism!
assert (false);
Index: zmq/engine_base.hpp
===================================================================
--- zmq/engine_base.hpp (revision 1175)
+++ zmq/engine_base.hpp (working copy)
@@ -61,7 +61,7 @@
// Notify the reader of the pipe that there are messages
// available in the pipe.
assert (HAS_OUT);
- pipe_->revive ();
+ mux.revive (pipe_);
}
void head (pipe_t *pipe_, int64_t position_)
Index: zmq/pipe.hpp
===================================================================
--- zmq/pipe.hpp (revision 1175)
+++ zmq/pipe.hpp (working copy)
@@ -100,9 +100,6 @@
i_thread *destination_thread;
i_engine *destination_engine;
- // If true we can read messages from the underlying ypipe.
- bool alive;
-
// If hwm is non-zero, the size of pipe is limited. In that case hwm
// is the high water mark for the pipe and lwm is the low water mark.
int64_t hwm;
Index: zmq/mux.hpp
===================================================================
--- zmq/mux.hpp (revision 1175)
+++ zmq/mux.hpp (working copy)
@@ -41,6 +41,9 @@
// Adds a pipe to receive messages from.
void receive_from (pipe_t *pipe_);
+ // Revives a stalled pipe.
+ void revive (pipe_t *pipe_);
+
// Returns a message, if available. If not, returns false.
bool read (message_t *msg_);
@@ -56,8 +59,11 @@
private:
// The list of inbound pipes.
- typedef std::vector <pipe_t*> pipes_t;
+ typedef std::vector <pipe_t *> pipes_t;
pipes_t pipes;
+
+ // The number of active pipes.
+ pipes_t::size_type active;
// Pipe to retrieve next message from. The messages are retrieved
// from the pipes in round-robin fashion (a.k.a. fair queueing).
More information about the zeromq-dev
mailing list