[zeromq-dev] [PATCH] Recovery Interval in Milliseconds (-1 default)

Bob Beaty drbobbeaty at gmail.com
Thu Dec 9 14:37:47 CET 2010


Martin,
  Enclosed is the patch for the addition of the ZMQ_RECOVERY_IVL_MSEC option. It now has the default of -1, and the logic is all the same - something non-zero for ZMQ_RECOVERY_IVL_MSEC overrides the ZMQ_RECOVERY_IVL, but if nothing is set for ZMQ_RECOVERY_IVL_MSEC, then the default behavior is to use ZMQ_RECOVERY_IVL.
  I've run the tests, and it's all working. I've even made RPMs for CentOS 5 (but I think these are distributed on the server). Looks good.



From 54d05b44f27f269241e8752cd568df78196a6136 Mon Sep 17 00:00:00 2001
From: Bob Beaty <rbeaty at peak6.com>
Date: Thu, 9 Dec 2010 07:20:19 -0600
Subject: [PATCH] Added Recovery Interval in Milliseconds

For very high-speed message systems, the memory used for recovery can get to
be very large. The corrent limitation on that reduction is the ZMQ_RECOVERY_IVL
of 1 sec. I added in an additional option ZMQ_RECOVERY_IVL_MSEC, which is the
Recovery Interval in milliseconds. If used, this will override the previous
one, and allow you to set a sub-second recovery interval. If not set, the
default behavior is to use ZMQ_RECOVERY_IVL.

Signed-off-by: Bob Beaty <rbeaty at peak6.com>
---
 doc/zmq_getsockopt.txt |   20 ++++++++++++++++++++
 doc/zmq_setsockopt.txt |   24 ++++++++++++++++++++++++
 include/zmq.h          |    1 +
 src/options.cpp        |   18 ++++++++++++++++++
 src/options.hpp        |    2 ++
 src/pgm_socket.cpp     |   32 ++++++++++++++++++--------------
 6 files changed, 83 insertions(+), 14 deletions(-)

diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index 7f73e1c..132e7b3 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -167,6 +167,26 @@ Default value:: 10
 Applicable socket types:: all, when using multicast transports
 
 
+ZMQ_RECOVERY_IVL_MSEC: Get multicast recovery interval in milliseconds
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_RECOVERY_IVL'_MSEC option shall retrieve the recovery interval, in
+milliseconds, for multicast transports using the specified 'socket'.  The
+recovery interval determines the maximum time in seconds that a receiver
+can be absent from a multicast group before unrecoverable data loss will
+occur.
+
+For backward compatibility, the default value of 'ZMQ_RECOVERY_IVL_MSEC' is
+-1 indicating that the recovery interval should be obtained from the
+'ZMQ_RECOVERY_IVL' option. However, if the 'ZMQ_RECOVERY_IVL_MSEC' value is
+not zero, then it will take precedence, and be used.
+
+[horizontal]
+Option value type:: int64_t
+Option value unit:: milliseconds
+Default value:: -1
+Applicable socket types:: all, when using multicast transports
+
+
 ZMQ_MCAST_LOOP: Control multicast loop-back
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 The 'ZMQ_MCAST_LOOP' option controls whether data sent via multicast
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index 86b01e4..58f04b3 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -171,6 +171,30 @@ Default value:: 10
 Applicable socket types:: all, when using multicast transports
 
 
+ZMQ_RECOVERY_IVL_MSEC: Set multicast recovery interval in milliseconds
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_RECOVERY_IVL_MSEC' option shall set the recovery interval, specified
+in milliseconds (ms) for multicast transports using the specified 'socket'.
+The recovery interval determines the maximum time in milliseconds that a
+receiver can be absent from a multicast group before unrecoverable data loss
+will occur.
+
+A non-zero value of the 'ZMQ_RECOVERY_IVL_MSEC' option will take precedence
+over the 'ZMQ_RECOVERY_IVL' option, but since the default for the
+'ZMQ_RECOVERY_IVL_MSEC' is -1, the default is to use the 'ZMQ_RECOVERY_IVL'
+option value.
+
+CAUTION: Exercise care when setting large recovery intervals as the data
+needed for recovery will be held in memory. For example, a 1 minute recovery
+interval at a data rate of 1Gbps requires a 7GB in-memory buffer.
+
+[horizontal]
+Option value type:: int64_t
+Option value unit:: milliseconds
+Default value:: -1
+Applicable socket types:: all, when using multicast transports
+
+
 ZMQ_MCAST_LOOP: Control multicast loop-back
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 The 'ZMQ_MCAST_LOOP' option shall control whether data sent via multicast
diff --git a/include/zmq.h b/include/zmq.h
index 997595b..a773f45 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -202,6 +202,7 @@ ZMQ_EXPORT int zmq_term (void *context);
 #define ZMQ_LINGER 17
 #define ZMQ_RECONNECT_IVL 18
 #define ZMQ_BACKLOG 19
+#define ZMQ_RECOVERY_IVL_MSEC 20   /*  opt. recovery time, reconcile in 3.x   */
 
 /*  Send/recv options.                                                        */
 #define ZMQ_NOBLOCK 1
diff --git a/src/options.cpp b/src/options.cpp
index b4ca6b5..ae35059 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -30,6 +30,7 @@ zmq::options_t::options_t () :
     affinity (0),
     rate (100),
     recovery_ivl (10),
+    recovery_ivl_msec (-1),
     use_multicast_loop (true),
     sndbuf (0),
     rcvbuf (0),
@@ -101,6 +102,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
         recovery_ivl = (uint32_t) *((int64_t*) optval_);
         return 0;
 
+    case ZMQ_RECOVERY_IVL_MSEC:
+        if (optvallen_ != sizeof (int64_t)  || *((int64_t*) optval_) < 0) {
+            errno = EINVAL;
+            return -1;
+        }
+        recovery_ivl_msec = (int32_t) *((int64_t*) optval_);
+        return 0;
+
     case ZMQ_MCAST_LOOP:
         if (optvallen_ != sizeof (int64_t)) {
             errno = EINVAL;
@@ -225,6 +234,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
         *optvallen_ = sizeof (int64_t);
         return 0;
 
+    case ZMQ_RECOVERY_IVL_MSEC:
+        if (*optvallen_ < sizeof (int64_t)) {
+            errno = EINVAL;
+            return -1;
+        }
+        *((int64_t*) optval_) = recovery_ivl_msec;
+        *optvallen_ = sizeof (int64_t);
+        return 0;
+
     case ZMQ_MCAST_LOOP:
         if (*optvallen_ < sizeof (int64_t)) {
             errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index 2c6f65d..e6df505 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -44,6 +44,8 @@ namespace zmq
 
         //  Reliability time interval [s]. Default 10s.
         uint32_t recovery_ivl;
+        //  Reliability time interval [ms]. Default -1 = not used.
+        int32_t recovery_ivl_msec;
 
         //  Enable multicast loopback. Default disabled (false).
         bool use_multicast_loop;
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 1c98edd..762ad7b 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -89,8 +89,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
         errno = EINVAL;
         return -1;
     }
-    //  Recovery interval [s]. 
-    if (options.recovery_ivl <= 0) {
+    //  Recovery interval [s] or [ms] - based on the user's call 
+    if ((options.recovery_ivl <= 0) && (options.recovery_ivl_msec <= 0)) {
         errno = EINVAL;
         return -1;
     }
@@ -199,8 +199,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
 
     if (receiver) {
         const int recv_only        = 1,
-                  rxw_max_rte      = options.rate * 1000 / 8,
-                  rxw_secs         = options.recovery_ivl,
+                  rxw_max_tpdu     = (int) pgm_max_tpdu,
+                  rxw_sqns         = (options.recovery_ivl_msec > 0 ?
+                                      options.recovery_ivl_msec * options.rate /
+                                      (1000 * rxw_max_tpdu) :
+                                      options.recovery_ivl * options.rate /
+                                      rxw_max_tpdu),
                   peer_expiry      = pgm_secs (300),
                   spmr_expiry      = pgm_msecs (25),
                   nak_bo_ivl       = pgm_msecs (50),
@@ -211,10 +215,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
 
         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
                 sizeof (recv_only)) ||
-            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_MAX_RTE, &rxw_max_rte,
-                sizeof (rxw_max_rte)) ||
-            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SECS, &rxw_secs,
-                sizeof (rxw_secs)) ||
+            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
+                sizeof (rxw_sqns)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,
                 sizeof (peer_expiry)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry,
@@ -232,8 +234,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
             goto err_abort;
     } else {
         const int send_only        = 1,
-                  txw_max_rte      = options.rate * 1000 / 8,
-                  txw_secs         = options.recovery_ivl,
+                  txw_max_tpdu     = (int) pgm_max_tpdu,
+                  txw_sqns         = (options.recovery_ivl_msec > 0 ?
+                                      options.recovery_ivl_msec * options.rate /
+                                      (1000 * txw_max_tpdu) :
+                                      options.recovery_ivl * options.rate /
+                                      txw_max_tpdu),
                   ambient_spm      = pgm_secs (30),
                   heartbeat_spm[]  = { pgm_msecs (100),
                                        pgm_msecs (100),
@@ -247,10 +253,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
 
         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,
                 &send_only, sizeof (send_only)) ||
-            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_MAX_RTE,
-                &txw_max_rte, sizeof (txw_max_rte)) ||
-            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SECS,
-                &txw_secs, sizeof (txw_secs)) ||
+            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
+                &txw_sqns, sizeof (txw_sqns)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
                 &ambient_spm, sizeof (ambient_spm)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
-- 
1.7.1




    Thanks,
        Bob (drbobbeaty at gmail.com)
    The Man from S.P.U.D.
    We will write no code before it's designed.

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20101209/2d0577df/attachment.htm>


More information about the zeromq-dev mailing list