[zeromq-dev] [PATCH] Re: ZMQ 2.1.0 and OpenPGM Memory Usage

Bob Beaty bobbeaty at comcast.net
Tue Dec 7 15:37:35 CET 2010


Martin, Steven, et. al.,
  I've got a functioning patch for the setting of the Recovery Interval as  milliseconds as opposed to seconds. It includes the changes to the docs, and while it might not be up to professional writing levels, it's accurate and complete so people can see what the new option is supposed to do.
  Of course, I've tried to follow all the guidelines in the Contributing to 0MQ page, but it's quite possible I missed a few. Sorry for that.
  Along those same lines, I'm including the patch inline as that was the requested method:



From 77f14013dab2c5e539cacc41ca5efe7b829b8b54 Mon Sep 17 00:00:00 2001
From: Bob Beaty <rbeaty at peak6.com>
Date: Tue, 7 Dec 2010 08:03:08 -0600
Subject: [PATCH] Added Recovery Interval in Milliseconds

The minimum allowable recovery interval was 1s using ZMQ_RECOVERY_IVL, but
after talking with Martin, it seemed reasonable to add an overriding value
measured in milliseconds. The changes cover the options, the docs, and the
socket control.

Signed-off-by: Bob Beaty <rbeaty at peak6.com>
---
 doc/zmq_getsockopt.txt |   20 ++++++++++++++++++++
 doc/zmq_setsockopt.txt |   24 ++++++++++++++++++++++++
 include/zmq.h          |    1 +
 src/options.cpp        |   18 ++++++++++++++++++
 src/options.hpp        |    2 ++
 src/pgm_socket.cpp     |   26 ++++++++++++++++++--------
 6 files changed, 83 insertions(+), 8 deletions(-)

diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index 7f73e1c..fe70dcc 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -167,6 +167,26 @@ Default value:: 10
 Applicable socket types:: all, when using multicast transports
 
 
+ZMQ_RECOVERY_IVL_MSEC: Get multicast recovery interval in milliseconds
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_RECOVERY_IVL'_MSEC option shall retrieve the recovery interval, in
+milliseconds, for multicast transports using the specified 'socket'.  The
+recovery interval determines the maximum time in seconds that a receiver
+can be absent from a multicast group before unrecoverable data loss will
+occur.
+
+For backward compatibility, the default value of 'ZMQ_RECOVERY_IVL_MSEC' is
+zero indicating that the recovery interval should be obtained from the
+'ZMQ_RECOVERY_IVL' option. However, if the 'ZMQ_RECOVERY_IVL_MSEC' value is
+not zero, then it will take precedence, and be used.
+
+[horizontal]
+Option value type:: int64_t
+Option value unit:: milliseconds
+Default value:: 0
+Applicable socket types:: all, when using multicast transports
+
+
 ZMQ_MCAST_LOOP: Control multicast loop-back
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 The 'ZMQ_MCAST_LOOP' option controls whether data sent via multicast
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index 86b01e4..88f6b3b 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -171,6 +171,30 @@ Default value:: 10
 Applicable socket types:: all, when using multicast transports
 
 
+ZMQ_RECOVERY_IVL_MSEC: Set multicast recovery interval in milliseconds
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_RECOVERY_IVL_MSEC' option shall set the recovery interval, specified
+in milliseconds (ms) for multicast transports using the specified 'socket'.
+The recovery interval determines the maximum time in milliseconds that a
+receiver can be absent from a multicast group before unrecoverable data loss
+will occur.
+
+A non-zero value of the 'ZMQ_RECOVERY_IVL_MSEC' option will take precedence
+over the 'ZMQ_RECOVERY_IVL' option, but since the default for the
+'ZMQ_RECOVERY_IVL_MSEC' is zero, the default is to use the 'ZMQ_RECOVERY_IVL'
+option value.
+
+CAUTION: Exercise care when setting large recovery intervals as the data
+needed for recovery will be held in memory. For example, a 1 minute recovery
+interval at a data rate of 1Gbps requires a 7GB in-memory buffer.
+
+[horizontal]
+Option value type:: int64_t
+Option value unit:: milliseconds
+Default value:: 0
+Applicable socket types:: all, when using multicast transports
+
+
 ZMQ_MCAST_LOOP: Control multicast loop-back
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 The 'ZMQ_MCAST_LOOP' option shall control whether data sent via multicast
diff --git a/include/zmq.h b/include/zmq.h
index 997595b..a773f45 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -202,6 +202,7 @@ ZMQ_EXPORT int zmq_term (void *context);
 #define ZMQ_LINGER 17
 #define ZMQ_RECONNECT_IVL 18
 #define ZMQ_BACKLOG 19
+#define ZMQ_RECOVERY_IVL_MSEC 20   /*  opt. recovery time, reconcile in 3.x   */
 
 /*  Send/recv options.                                                        */
 #define ZMQ_NOBLOCK 1
diff --git a/src/options.cpp b/src/options.cpp
index b4ca6b5..6b005af 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -30,6 +30,7 @@ zmq::options_t::options_t () :
     affinity (0),
     rate (100),
     recovery_ivl (10),
+    recovery_ivl_msec (0),
     use_multicast_loop (true),
     sndbuf (0),
     rcvbuf (0),
@@ -101,6 +102,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
         recovery_ivl = (uint32_t) *((int64_t*) optval_);
         return 0;
 
+    case ZMQ_RECOVERY_IVL_MSEC:
+        if (optvallen_ != sizeof (int64_t)  || *((int64_t*) optval_) < 0) {
+            errno = EINVAL;
+            return -1;
+        }
+        recovery_ivl_msec = (uint32_t) *((int64_t*) optval_);
+        return 0;
+
     case ZMQ_MCAST_LOOP:
         if (optvallen_ != sizeof (int64_t)) {
             errno = EINVAL;
@@ -225,6 +234,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
         *optvallen_ = sizeof (int64_t);
         return 0;
 
+    case ZMQ_RECOVERY_IVL_MSEC:
+        if (*optvallen_ < sizeof (int64_t)) {
+            errno = EINVAL;
+            return -1;
+        }
+        *((int64_t*) optval_) = recovery_ivl_msec;
+        *optvallen_ = sizeof (int64_t);
+        return 0;
+
     case ZMQ_MCAST_LOOP:
         if (*optvallen_ < sizeof (int64_t)) {
             errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index 2c6f65d..0603e4b 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -44,6 +44,8 @@ namespace zmq
 
         //  Reliability time interval [s]. Default 10s.
         uint32_t recovery_ivl;
+        //  Reliability time interval [ms]. Default 0 = not used.
+        uint32_t recovery_ivl_msec;
 
         //  Enable multicast loopback. Default disabled (false).
         bool use_multicast_loop;
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 1c98edd..ab78418 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -89,8 +89,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
         errno = EINVAL;
         return -1;
     }
-    //  Recovery interval [s]. 
-    if (options.recovery_ivl <= 0) {
+    //  Recovery interval [s] or [ms] - based on the user's call 
+    if ((options.recovery_ivl <= 0) && (options.recovery_ivl_msec <= 0)) {
         errno = EINVAL;
         return -1;
     }
@@ -200,7 +200,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
     if (receiver) {
         const int recv_only        = 1,
                   rxw_max_rte      = options.rate * 1000 / 8,
-                  rxw_secs         = options.recovery_ivl,
+                  rxw_max_tpdu     = (int) pgm_max_tpdu,
+                  rxw_sqns         = (options.recovery_ivl_msec > 0 ?
+                                      options.recovery_ivl_msec * rxw_max_rte /
+                                      (1000 * rxw_max_tpdu) :
+                                      options.recovery_ivl * rxw_max_rte /
+                                      rxw_max_tpdu),
                   peer_expiry      = pgm_secs (300),
                   spmr_expiry      = pgm_msecs (25),
                   nak_bo_ivl       = pgm_msecs (50),
@@ -213,8 +218,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
                 sizeof (recv_only)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_MAX_RTE, &rxw_max_rte,
                 sizeof (rxw_max_rte)) ||
-            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SECS, &rxw_secs,
-                sizeof (rxw_secs)) ||
+            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
+                sizeof (rxw_sqns)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,
                 sizeof (peer_expiry)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry,
@@ -233,7 +238,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
     } else {
         const int send_only        = 1,
                   txw_max_rte      = options.rate * 1000 / 8,
-                  txw_secs         = options.recovery_ivl,
+                  txw_max_tpdu     = (int) pgm_max_tpdu,
+                  txw_sqns         = (options.recovery_ivl_msec > 0 ?
+                                      options.recovery_ivl_msec * txw_max_rte /
+                                      (1000 * txw_max_tpdu) :
+                                      options.recovery_ivl * txw_max_rte /
+                                      txw_max_tpdu),
                   ambient_spm      = pgm_secs (30),
                   heartbeat_spm[]  = { pgm_msecs (100),
                                        pgm_msecs (100),
@@ -249,8 +259,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
                 &send_only, sizeof (send_only)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_MAX_RTE,
                 &txw_max_rte, sizeof (txw_max_rte)) ||
-            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SECS,
-                &txw_secs, sizeof (txw_secs)) ||
+            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
+                &txw_sqns, sizeof (txw_sqns)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
                 &ambient_spm, sizeof (ambient_spm)) ||
             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
-- 
1.7.1




On Dec 6, 2010, at 6:49 PM, Steven McCoy wrote:

> On 7 December 2010 00:44, Bob Beaty <bobbeaty at comcast.net> wrote:
> Steven,
>   I am implementing this change to the ZeroMQ project, and have run into a slight snag... I can't find any reference to the variable 'tpdu_size' in the codebase. I can find a method called get_max_tsdu_size(), which is really a call to:
> 
> 
> It's pgm_max_tpdu in config.hpp, default 1500 bytes, assuming no sub-netting,
> 
> https://github.com/zeromq/zeromq2/blob/master/src/config.hpp
> 
> -- 
> Steve-o 


    Thanks,
        Bob (drbob at TheManFromSPUD.com)
    The Man from S.P.U.D.
    We will write no code before it's designed.




More information about the zeromq-dev mailing list