[PATCH] REQ socket discards duplicate replies

Martin Sustrik sustrik at 250bpm.com
Tue Jun 21 14:47:50 CEST 2011


Instead of empty message ("stack bottom") request ID
is sent. When reply arrives, the socket checks the ID
and if it doesn't match the ID of the ongoing operation
the reply is silently discarded.

Signed-off-by: Martin Sustrik <sustrik at 250bpm.com>
---
 .gitignore                   |    1 +
 src/rep.cpp                  |   52 ++++----------
 src/req.cpp                  |   36 ++++++++--
 src/req.hpp                  |    9 +++
 src/socket_base.cpp          |   18 +++---
 tests/Makefile.am            |    5 +-
 tests/test_reqrep_device.cpp |  160 ++++++++++++++++++++++++++++++++++++++++++
 7 files changed, 229 insertions(+), 52 deletions(-)
 create mode 100644 tests/test_reqrep_device.cpp

diff --git a/.gitignore b/.gitignore
index 2adeb22..dc38b44 100644
--- a/.gitignore
+++ b/.gitignore
@@ -29,6 +29,7 @@ tests/test_reqrep_tcp
 tests/test_shutdown_stress
 tests/test_hwm
 tests/test_timeo
+tests/test_reqrep_device
 src/platform.hpp*
 src/stamp-h1
 devices/zmq_forwarder/zmq_forwarder
diff --git a/src/rep.cpp b/src/rep.cpp
index b987d9c..a5d1462 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -64,54 +64,32 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
         return -1;
     }
 
+    //  First thing to do when receiving a request is to copy all the labels
+    //  to the reply pipe.
     if (request_begins) {
-
-        //  Copy the backtrace stack to the reply pipe.
         while (true) {
-
-            //  TODO: If request can be read but reply pipe is not
-            //  ready for writing, we should drop the reply.
-
-            //  Get next part of the backtrace stack.
             int rc = xrep_t::xrecv (msg_, flags_);
             if (rc != 0)
                 return rc;
+            if (!(msg_->flags () & msg_t::label))
+                break;
 
-            if (msg_->flags () & (msg_t::more | msg_t::label)) {
-
-                //  Empty message part delimits the traceback stack.
-                bool bottom = (msg_->size () == 0);
-
-                //  Push it to the reply pipe.
-                rc = xrep_t::xsend (msg_, flags_);
-                zmq_assert (rc == 0);
-
-                //  The end of the traceback, move to processing message body.
-                if (bottom)
-                    break;
-            }
-            else {
-
-                //  If the traceback stack is malformed, discard anything
-                //  already sent to pipe (we're at end of invalid message)
-                //  and continue reading -- that'll switch us to the next pipe
-                //  and next request.
-                rc = xrep_t::rollback ();
-                zmq_assert (rc == 0);
-            }
+            //  TODO: If the reply cannot be sent to the peer because
+            //  od congestion, we should drop it.
+            rc = xrep_t::xsend (msg_, flags_);
+            zmq_assert (rc == 0);
         }
-
         request_begins = false;
     }
-
-    //  Now the routing info is safely stored. Get the first part
-    //  of the message payload and exit.
-    int rc = xrep_t::xrecv (msg_, flags_);
-    if (rc != 0)
-        return rc;
+    else {
+        int rc = xrep_t::xrecv (msg_, flags_);
+        if (rc != 0)
+            return rc;
+    }
+    zmq_assert (!(msg_->flags () & msg_t::label));
 
     //  If whole request is read, flip the FSM to reply-sending state.
-    if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
+    if (!(msg_->flags () & msg_t::more)) {
         sending_reply = true;
         request_begins = true;
     }
diff --git a/src/req.cpp b/src/req.cpp
index b0e58dc..e70fd90 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -21,13 +21,26 @@
 #include "req.hpp"
 #include "err.hpp"
 #include "msg.hpp"
+#include "uuid.hpp"
+#include "wire.hpp"
 
 zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
     xreq_t (parent_, tid_),
     receiving_reply (false),
-    message_begins (true)
+    message_begins (true),
+    request_id (0)
 {
     options.type = ZMQ_REQ;
+
+    //  Generate a random endpoint ID. To keep it short do so by
+    //  collapsing an UUID into 4 bytes.
+    //  TODO: In the future we want to have a real random number here.
+    uint32_t buff [4];
+    generate_uuid ((void*) buff);
+    buff [0] ^= buff [1];
+    buff [0] ^= buff [2];
+    buff [0] ^= buff [3];
+    endpoint_id = buff [0];
 }
 
 zmq::req_t::~req_t ()
@@ -43,12 +56,15 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
         return -1;
     }
 
-    //  First part of the request is empty message part (stack bottom).
+    //  First part of the request is the request identity.
     if (message_begins) {
         msg_t prefix;
-        int rc = prefix.init ();
+        int rc = prefix.init_size (8);
         errno_assert (rc == 0);
         prefix.set_flags (msg_t::label);
+        unsigned char *data = (unsigned char*) prefix.data ();
+        put_uint32 (data, endpoint_id);
+        put_uint32 (data + 4, request_id);
         rc = xreq_t::xsend (&prefix, flags_);
         if (rc != 0)
             return rc;
@@ -78,13 +94,20 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
         return -1;
     }
 
-    //  First part of the reply should be empty message part (stack bottom).
+    //  First part of the reply should be the original request identity.
+    //  If it's different it's a stray reply and it will be dropped.
     if (message_begins) {
         int rc = xreq_t::xrecv (msg_, flags_);
         if (rc != 0)
             return rc;
         zmq_assert (msg_->flags () & msg_t::label);
-        zmq_assert (msg_->size () == 0);
+        zmq_assert (msg_->size () == 8);
+        unsigned char *data = (unsigned char*) msg_->data ();
+        if (get_uint32 (data) != endpoint_id ||
+              get_uint32 (data + 4) != request_id) {
+            errno = EAGAIN;
+            return -1;
+        }
         message_begins = false;
     }
 
@@ -94,6 +117,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
 
     //  If the reply is fully received, flip the FSM into request-sending state.
     if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
+        request_id++;
         receiving_reply = false;
         message_begins = true;
     }
@@ -103,6 +127,8 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
 
 bool zmq::req_t::xhas_in ()
 {
+    //  TODO: Duplicates should be removed here.
+
     if (!receiving_reply)
         return false;
 
diff --git a/src/req.hpp b/src/req.hpp
index e0554ac..25b1a46 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -22,6 +22,7 @@
 #define __ZMQ_REQ_HPP_INCLUDED__
 
 #include "xreq.hpp"
+#include "stdint.hpp"
 
 namespace zmq
 {
@@ -49,6 +50,14 @@ namespace zmq
         //  of the message must be empty message part (backtrace stack bottom).
         bool message_begins;
 
+        //  Endpoint ID. Randomly generated. Identifies this particular instance
+        //  of the socket.
+        uint32_t endpoint_id;
+
+        //  Request ID. Request numbers gradually increase (and wrap over)
+        //  so that we don't have to generate random ID for each request.
+        uint32_t request_id;
+
         req_t (const req_t&);
         const req_t &operator = (const req_t&);
     };
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index eaf1776..804ec46 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -598,15 +598,15 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
         ticks = 0;
 
         rc = xrecv (msg_, flags_);
-        if (rc == 0) {
-            rcvlabel = msg_->flags () & msg_t::label;
-            if (rcvlabel)
-                msg_->reset_flags (msg_t::label);
-            rcvmore = msg_->flags () & msg_t::more;
-            if (rcvmore)
-                msg_->reset_flags (msg_t::more);
-        }
-        return rc;
+        if (rc < 0)
+            return rc;
+        rcvlabel = msg_->flags () & msg_t::label;
+        if (rcvlabel)
+            msg_->reset_flags (msg_t::label);
+        rcvmore = msg_->flags () & msg_t::more;
+        if (rcvmore)
+            msg_->reset_flags (msg_t::more);
+        return 0;
     }
 
     //  Compute the time when the timeout should occur.
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 785b7c5..9238850 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -5,7 +5,8 @@ noinst_PROGRAMS = test_pair_inproc \
                   test_pair_tcp \
                   test_reqrep_inproc \
                   test_reqrep_tcp \
-                  test_hwm
+                  test_hwm \
+                  test_reqrep_device
 
 if !ON_MINGW
 noinst_PROGRAMS += test_shutdown_stress \
@@ -22,6 +23,8 @@ test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp
 
 test_hwm_SOURCES = test_hwm.cpp
 
+test_reqrep_device_SOURCES = test_reqrep_device.cpp
+
 if !ON_MINGW
 test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
 test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
diff --git a/tests/test_reqrep_device.cpp b/tests/test_reqrep_device.cpp
new file mode 100644
index 0000000..f6f06c9
--- /dev/null
+++ b/tests/test_reqrep_device.cpp
@@ -0,0 +1,160 @@
+/*
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the GNU Lesser General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <assert.h>
+#include <string.h>
+
+#include "../include/zmq.h"
+
+int main (int argc, char *argv [])
+{
+    void *ctx = zmq_init (1);
+    assert (ctx);
+
+    //  Create a req/rep device.
+    void *xreq = zmq_socket (ctx, ZMQ_XREQ);
+    assert (xreq);
+    int rc = zmq_bind (xreq, "tcp://127.0.0.1:5560");
+    assert (rc == 0);
+    void *xrep = zmq_socket (ctx, ZMQ_XREP);
+    assert (xrep);
+    rc = zmq_bind (xrep, "tcp://127.0.0.1:5561");
+    assert (rc == 0);
+
+    //  Create a worker.
+    void *rep = zmq_socket (ctx, ZMQ_REP);
+    assert (rep);
+    rc = zmq_connect (rep, "tcp://127.0.0.1:5560");
+    assert (rc == 0);
+
+    //  Create a client.
+    void *req = zmq_socket (ctx, ZMQ_REQ);
+    assert (req);
+    rc = zmq_connect (req, "tcp://127.0.0.1:5561");
+    assert (rc == 0);
+
+    //  Send a request.
+    rc = zmq_send (req, "ABC", 3, ZMQ_SNDMORE);
+    assert (rc == 3);
+    rc = zmq_send (req, "DEF", 3, 0);
+    assert (rc == 3);
+
+    //  Pass the request through the device.
+    for (int i = 0; i != 4; i++) {
+        zmq_msg_t msg;
+        rc = zmq_msg_init (&msg);
+        assert (rc == 0);
+        rc = zmq_recvmsg (xrep, &msg, 0);
+        assert (rc >= 0);
+        int rcvlabel;
+        size_t sz = sizeof (rcvlabel);
+        rc = zmq_getsockopt (xrep, ZMQ_RCVLABEL, &rcvlabel, &sz);
+        assert (rc == 0);
+        int rcvmore;
+        rc = zmq_getsockopt (xrep, ZMQ_RCVMORE, &rcvmore, &sz);
+        assert (rc == 0);
+        rc = zmq_sendmsg (xreq, &msg,
+            (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0));
+        assert (rc >= 0);
+    }
+
+    //  Receive the request.
+    char buff [3];
+    rc = zmq_recv (rep, buff, 3, 0);
+    assert (rc == 3);
+    assert (memcmp (buff, "ABC", 3) == 0);
+    int rcvlabel;
+    size_t sz = sizeof (rcvlabel);
+    rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz);
+    assert (rc == 0);
+    assert (!rcvlabel);
+    int rcvmore;
+    rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz);
+    assert (rc == 0);
+    assert (rcvmore);
+    rc = zmq_recv (rep, buff, 3, 0);
+    assert (rc == 3);
+    assert (memcmp (buff, "DEF", 3) == 0);
+    rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz);
+    assert (rc == 0);
+    assert (!rcvlabel);
+    rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz);
+    assert (rc == 0);
+    assert (!rcvmore);
+
+    //  Send the reply.
+    rc = zmq_send (rep, "GHI", 3, ZMQ_SNDMORE);
+    assert (rc == 3);
+    rc = zmq_send (rep, "JKL", 3, 0);
+    assert (rc == 3);
+
+    //  Pass the reply through the device.
+    for (int i = 0; i != 4; i++) {
+        zmq_msg_t msg;
+        rc = zmq_msg_init (&msg);
+        assert (rc == 0);
+        rc = zmq_recvmsg (xreq, &msg, 0);
+        assert (rc >= 0);
+        int rcvlabel;
+        size_t sz = sizeof (rcvlabel);
+        rc = zmq_getsockopt (xreq, ZMQ_RCVLABEL, &rcvlabel, &sz);
+        assert (rc == 0);
+        int rcvmore;
+        rc = zmq_getsockopt (xreq, ZMQ_RCVMORE, &rcvmore, &sz);
+        assert (rc == 0);
+        rc = zmq_sendmsg (xrep, &msg,
+            (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0));
+        assert (rc >= 0);
+    }
+
+    //  Receive the reply.
+    rc = zmq_recv (req, buff, 3, 0);
+    assert (rc == 3);
+    assert (memcmp (buff, "GHI", 3) == 0);
+    rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz);
+    assert (rc == 0);
+    assert (!rcvlabel);
+    rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz);
+    assert (rc == 0);
+    assert (rcvmore);
+    rc = zmq_recv (req, buff, 3, 0);
+    assert (rc == 3);
+    assert (memcmp (buff, "JKL", 3) == 0);
+    rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz);
+    assert (rc == 0);
+    assert (!rcvlabel);
+    rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz);
+    assert (rc == 0);
+    assert (!rcvmore);
+
+    //  Clean up.
+    rc = zmq_close (req);
+    assert (rc == 0);
+    rc = zmq_close (rep);
+    assert (rc == 0);
+    rc = zmq_close (xrep);
+    assert (rc == 0);
+    rc = zmq_close (xreq);
+    assert (rc == 0);
+    rc = zmq_term (ctx);
+    assert (rc == 0);
+
+    return 0 ;
+}
-- 
1.7.0.4


--------------040805060206040203050705--


More information about the zeromq-dev mailing list