[zeromq-dev] [PATCH] Re: assertion failure with master

Martin Lucina mato at kotelna.sk
Mon Nov 8 13:39:00 CET 2010


Chuck,

can you try the preliminary patch below against master and let me know if
it fixes your problem?

Martin, please review.  The code is getting decidedly ugly, so much for
trying to emulate atomic message queueing over a SOCK_STREAM socket... The
alternative is to use SOCK_DGRAM instead as discussed on IRC which are by
definition atomic.

-mato

mailbox: Retry send/recv if a partial write occured.

Signed-off-by: Martin Lucina <mato at kotelna.sk>
---

diff --git a/src/mailbox.cpp b/src/mailbox.cpp
index c186007..7c55ce5 100644
--- a/src/mailbox.cpp
+++ b/src/mailbox.cpp
@@ -156,13 +156,15 @@ zmq::mailbox_t::~mailbox_t ()
 void zmq::mailbox_t::send (const command_t &cmd_)
 {
     //  Attempt to write an entire command without blocking.
+    ssize_t want_nbytes = sizeof (command_t);
     ssize_t nbytes;
     do {
-        nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
+        nbytes = ::send (w, &cmd_, want_nbytes, 0);
     } while (nbytes == -1 && errno == EINTR);
 
     //  Attempt to increase mailbox SNDBUF if the send failed.
-    if (nbytes == -1 && errno == EAGAIN) {
+    if ((nbytes > 0 && nbytes < want_nbytes) ||
+        (nbytes == -1 && errno == EAGAIN)) {
         int old_sndbuf, new_sndbuf;
         socklen_t sndbuf_size = sizeof old_sndbuf;
 
@@ -181,25 +183,44 @@ void zmq::mailbox_t::send (const command_t &cmd_)
         errno_assert (rc == 0);
         zmq_assert (new_sndbuf > old_sndbuf);
 
+        //  The failed send may have written data; if so we only wish to
+        //  write the remainder of the command.
+        char *retry_data = (char *)&cmd_;
+        size_t retry_size = want_nbytes;
+        if (nbytes > 0) {
+            retry_data += nbytes;
+            retry_size -= nbytes;
+        }
+        else
+            nbytes = 0;
+
         //  Retry the sending operation; at this point it must succeed.
+        ssize_t retry_nbytes;
         do {
-            nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
-        } while (nbytes == -1 && errno == EINTR);
+            retry_nbytes = ::send (w, retry_data, retry_size, 0);
+        } while (retry_nbytes == -1 && errno == EINTR);
+        errno_assert (retry_nbytes != -1);
+
+        //  Update total number of bytes written.
+        nbytes += retry_nbytes;
+    }
+    else {
+        errno_assert (nbytes != -1);
     }
-    errno_assert (nbytes != -1);
 
-    //  This should never happen as we've already checked that command size is
-    //  less than PIPE_BUF.
-    zmq_assert (nbytes == sizeof (command_t));
+    //  This should never happen as either the initial send was atomic 
+    //  or the retry send must have written the remainder of the command.
+    zmq_assert (nbytes == want_nbytes);
 }
 
 int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
 {
+    ssize_t want_nbytes = sizeof (command_t);
 #ifdef MSG_DONTWAIT
 
     //  Attempt to read an entire command. Returns EAGAIN if non-blocking
     //  mode is requested and a command is not available.
-    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
+    ssize_t nbytes = ::recv (r, cmd_, want_nbytes,
         block_ ? 0 : MSG_DONTWAIT);
     if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
         return -1;
@@ -217,7 +238,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
     //  and a command is not available. Save value of errno if we wish to pass
     //  it to caller.
     int err = 0;
-    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
+    ssize_t nbytes = ::recv (r, cmd_, want_nbytes, 0);
     if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))  
         err = errno;
 
@@ -240,8 +261,40 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
     //  Sanity check for success.
     errno_assert (nbytes != -1);
 
-    //  Check whether we haven't got half of command.
-    zmq_assert (nbytes == sizeof (command_t));
+    //  If we got less bytes than we wanted, this means the sender
+    //  had to retry the send. Retrieve the remainder of the command.
+    if (nbytes < want_nbytes) {
+        ssize_t retry_nbytes;
+        char *retry_data = (char *)cmd_;
+        size_t retry_size = want_nbytes - nbytes;
+        retry_data += nbytes;
+
+#ifndef MSG_DONTWAIT
+        //  Set the reader to blocking mode.
+        int flags = fcntl (r, F_GETFL, 0);
+        errno_assert (flags >= 0);
+        int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
+        errno_assert (rc == 0);
+#endif
+
+        //  Ignore EINTR as we must receive the entire remainder.
+        do {
+            retry_nbytes = ::recv (r, retry_data, retry_size, 0);
+        } while (retry_nbytes == -1 && errno == EINTR);
+        errno_assert (retry_nbytes != -1);
+        nbytes += retry_nbytes;
+
+#ifndef MSG_DONTWAIT
+        //  Re-set the reader to non-blocking mode.
+        flags = fcntl (r, F_GETFL, 0);
+        errno_assert (flags >= 0);
+        rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
+        errno_assert (rc == 0);
+#endif
+    }
+
+    //  Sanity check that we now have the entire command.
+    zmq_assert (nbytes == want_nbytes);
 
     return 0;
 }



More information about the zeromq-dev mailing list