[zeromq-dev] [PATCH] Updated Recovery Interval in Milliseconds

Bob Beaty drbobbeaty at gmail.com
Tue Dec 7 22:24:56 CET 2010


Martin,
  I have been able to completely verify this on both the transmitter and receiver. Whew!
  There were a few additional changes, but the bulk was the original patch, but in any event, included is the complete "patch file" as requested in the Contributing to 0MQ page.
  The jzmq patch is still correct - nothing changed there.



From bc4736508a62303dc6b4334e95fd1d1633aff7dd Mon Sep 17 00:00:00 2001
From: Bob Beaty <rbeaty at peak6.com>
Date: Tue, 7 Dec 2010 15:14:23 -0600
Subject: [PATCH] Added Recovery Interval in Milliseconds

The lower limit on the ZMQ_RECOVERY_IVL is 1 (sec), and if you needed to have
something smalelr than that, say for a very fast feed, you didn't have any
options. With this change, you can specify the Recovery Interval in msec by
just using the ZMQ_RECOVERY_IVL_MSEC option and giving it the integer number
of msec. This will override the ZMQ_RECOVERY_IVL, and if absent the former
will still be used.

Signed-off-by: Bob Beaty <rbeaty at peak6.com>
---
 doc/zmq_getsockopt.txt |   20 ++++++++++++++++++++
 doc/zmq_setsockopt.txt |   24 ++++++++++++++++++++++++
 include/zmq.h          |    1 +
 src/options.cpp        |   18 ++++++++++++++++++
 src/options.hpp        |    2 ++
 src/pgm_socket.cpp     |   32 ++++++++++++++++++--------------
 6 files changed, 83 insertions(+), 14 deletions(-)

diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index 7f73e1c..fe70dcc 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -167,6 +167,26 @@ Default value:: 10
 Applicable socket types:: all, when using multicast transports
 
 
+ZMQ_RECOVERY_IVL_MSEC: Get multicast recovery interval in milliseconds
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_RECOVERY_IVL'_MSEC option shall retrieve the recovery interval, in
+milliseconds, for multicast transports using the specified 'socket'.  The
+recovery interval determines the maximum time in seconds that a receiver
+can be absent from a multicast group before unrecoverable data loss will
+occur.
+
+For backward compatibility, the default value of 'ZMQ_RECOVERY_IVL_MSEC' is
+zero indicating that the recovery interval should be obtained from the
+'ZMQ_RECOVERY_IVL' option. However, if the 'ZMQ_RECOVERY_IVL_MSEC' value is
+not zero, then it will take precedence, and be used.
+
+[horizontal]
+Option value type:: int64_t
+Option value unit:: milliseconds
+Default value:: 0
+Applicable socket types:: all, when using multicast transports
+
+
 ZMQ_MCAST_LOOP: Control multicast loop-back
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 The 'ZMQ_MCAST_LOOP' option controls whether data sent via multicast
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index 86b01e4..88f6b3b 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -171,6 +171,30 @@ Default value:: 10
 Applicable socket types:: all, when using multicast transports
 
 
+ZMQ_RECOVERY_IVL_MSEC: Set multicast recovery interval in milliseconds
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_RECOVERY_IVL_MSEC' option shall set the recovery interval, specified
+in milliseconds (ms) for multicast transports using the specified 'socket'.
+The recovery interval determines the maximum time in milliseconds that a
+receiver can be absent from a multicast group before unrecoverable data loss
+will occur.
+
+A non-zero value of the 'ZMQ_RECOVERY_IVL_MSEC' option will take precedence
+over the 'ZMQ_RECOVERY_IVL' option, but since the default for the
+'ZMQ_RECOVERY_IVL_MSEC' is zero, the default is to use the 'ZMQ_RECOVERY_IVL'
+option value.
+
+CAUTION: Exercise care when setting large recovery intervals as the data
+needed for recovery will be held in memory. For example, a 1 minute recovery
+interval at a data rate of 1Gbps requires a 7GB in-memory buffer.
+
+[horizontal]
+Option value type:: int64_t
+Option value unit:: milliseconds
+Default value:: 0
+Applicable socket types:: all, when using multicast transports
+
+
 ZMQ_MCAST_LOOP: Control multicast loop-back
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 The 'ZMQ_MCAST_LOOP' option shall control whether data sent via multicast
diff --git a/include/zmq.h b/include/zmq.h
index 997595b..a773f45 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -202,6 +202,7 @@ ZMQ_EXPORT int zmq_term (void *context);
 #define ZMQ_LINGER 17
 #define ZMQ_RECONNECT_IVL 18
 #define ZMQ_BACKLOG 19
+#define ZMQ_RECOVERY_IVL_MSEC 20   /*  opt. recovery time, reconcile in 3.x   */
 
 /*  Send/recv options.                                                        */
 #define ZMQ_NOBLOCK 1
diff --git a/src/options.cpp b/src/options.cpp
index b4ca6b5..6b005af 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -30,6 +30,7 @@ zmq::options_t::options_t () :
     affinity (0),
     rate (100),
     recovery_ivl (10),
+    recovery_ivl_msec (0),
     use_multicast_loop (true),
     sndbuf (0),
     rcvbuf (0),
@@ -101,6 +102,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
         recovery_ivl = (uint32_t) *((int64_t*) optval_);
         return 0;
 
+    case ZMQ_RECOVERY_IVL_MSEC:
+        if (optvallen_ != sizeof (int64_t)  || *((int64_t*) optval_) < 0) {
+            errno = EINVAL;
+            return -1;
+        }
+        recovery_ivl_msec = (uint32_t) *((int64_t*) optval_);
+        return 0;
+
     case ZMQ_MCAST_LOOP:
         if (optvallen_ != sizeof (int64_t)) {
             errno = EINVAL;
@@ -225,6 +234,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
         *optvallen_ = sizeof (int64_t);
         return 0;
 
+    case ZMQ_RECOVERY_IVL_MSEC:
+        if (*optvallen_ < sizeof (int64_t)) {
+            errno = EINVAL;
+            return -1;
+        }
+        *((int64_t*) optval_) = recovery_ivl_msec;
+        *optvallen_ = sizeof (int64_t);
+        return 0;
+
     case ZMQ_MCAST_LOOP:
         if (*optvallen_ < sizeof (int64_t)) {
             errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index 2c6f65d..0603e4b 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -44,6 +44,8 @@ namespace zmq
 
         //  Reliability time interval [s]. Default 10s.
         uint32_t recovery_ivl;
+        //  Reliability time interval [ms]. Default 0 = not used.
+        uint32_t recovery_ivl_msec;
 
         //  Enable multicast loopback. Default disabled (false).
         bool use_multicast_loop;
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 1c98edd..762ad7b 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -89,8 +89,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
         errno = EINVAL;
         return -1;
     }
-    //  Recovery interval [s]. 
-    if (options.recovery_ivl <= 0) {
+    //  Recovery interval [s] or [ms] - based on the user's call 
+    if ((options.recovery_ivl <= 0) && (options.recovery_ivl_msec <= 0)) {
         errno = EINVAL;
         return -1;
     }
@@ -199,8 +199,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
 
     if (receiver) {
         const int recv_only        = 1,
-                  rxw_max_rte      = options.rate * 1000 / 8,
-                  rxw_secs         = options.recovery_ivl,
+                  rxw_max_tpdu     = (int) pgm_max_tpdu,
+                  rxw_sqns         = (options.recovery_ivl_msec > 0 ?
+                                      options.recovery_ivl_msec * options.rate /
+                                      (1000 * rxw_max_tpdu) :
+                                      options.recovery_ivl * options.rate /
+                                      rxw_max_tpdu),
                   peer_expiry      = pgm_secs (300),
                   spmr_expiry      = pgm_msecs (25),
                   nak_bo_ivl       = pgm_msecs (50),
@@ -211,10 +215,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
 
         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
                 sizeof (recv_only)) ||
-            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_MAX_RTE, &rxw_max_rte,
-                sizeof (rxw_max_rte)) ||
-            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SECS, &rxw_secs,
-                sizeof (rxw_secs)) ||
+            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
+                sizeof (rxw_sqns)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,
                 sizeof (peer_expiry)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry,
@@ -232,8 +234,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
             goto err_abort;
     } else {
         const int send_only        = 1,
-                  txw_max_rte      = options.rate * 1000 / 8,
-                  txw_secs         = options.recovery_ivl,
+                  txw_max_tpdu     = (int) pgm_max_tpdu,
+                  txw_sqns         = (options.recovery_ivl_msec > 0 ?
+                                      options.recovery_ivl_msec * options.rate /
+                                      (1000 * txw_max_tpdu) :
+                                      options.recovery_ivl * options.rate /
+                                      txw_max_tpdu),
                   ambient_spm      = pgm_secs (30),
                   heartbeat_spm[]  = { pgm_msecs (100),
                                        pgm_msecs (100),
@@ -247,10 +253,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
 
         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,
                 &send_only, sizeof (send_only)) ||
-            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_MAX_RTE,
-                &txw_max_rte, sizeof (txw_max_rte)) ||
-            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SECS,
-                &txw_secs, sizeof (txw_secs)) ||
+            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
+                &txw_sqns, sizeof (txw_sqns)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
                 &ambient_spm, sizeof (ambient_spm)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
-- 
1.7.1




    Thanks,
        Bob (drbobbeaty at gmail.com)
    The Man from S.P.U.D.
    We will write no code before it's designed.




More information about the zeromq-dev mailing list