[zeromq-dev] patch: non-blocking tcp accept

Dhammika Pathirana dhammika at gmail.com
Wed Apr 29 10:05:02 CEST 2009


Hi Martin,

Here's the patch, I added a #define block in accept to check for
platform specific errno values.
I didn't add errnos for OpenVMS, it's not defined in top level config file.
Patch is released under MIT license.

Dhammika



On Tue, Apr 28, 2009 at 3:35 AM, Martin Sustrik <sustrik at fastmq.com> wrote:
> Hi Dhammika,
>
>> Patch to add non-blocking tcp accept.
>>
>> 1. sets non-blocking flag in tcp listener socket
>> 2. moves accept to tcp_listener.in_event()
>> 3. passes native socket descriptor to tcp_socket constructor
>>
>> Builds and tested on linux.
>> Would appreciate if someone can run a quick build on a win32 box.
>> Hopefully it'll build without any drama.
>>
>> Requires few changes in zmq_server, perf and examples (mainly in
>> tcp_socket constructor).
>> If we're happy with this, I'll send a full patch.
>
> The patch looks OK. Please, do send the full patch (and state it's licensed
> under MIT license).
>
> The one thing I'm unsure of is EAGAIN/EWOULDBLOCK distinction. It seems that
> both Linux and Win32 assign the same numeric value to both errors. I am not
> sure of other platforms.
>
> POSIX says:
>
> [EAGAIN]
>    Resource unavailable, try again (may be the same value as [EWOULDBLOCK]).
>
> [EWOULDBLOCK]
>    Operation would block (may be the same value as [EAGAIN]).
>
> Pretty confusing...
>
> Martin
>
-------------- next part --------------
Index: perf/transports/tcp_transport.hpp
===================================================================
--- perf/transports/tcp_transport.hpp	(revision 1409)
+++ perf/transports/tcp_transport.hpp	(working copy)
@@ -61,10 +61,10 @@
                 //  initiated by the other party.
 
                 //  Create listening socket.
-                tcp_listener = new zmq::tcp_listener_t (iface_or_host_);
+                tcp_listener = new zmq::tcp_listener_t (iface_or_host_, true);
 
                 //  Wait for and accept first incoming connection.
-                tcp_socket = new zmq::tcp_socket_t (*tcp_listener, true);
+                tcp_socket = new zmq::tcp_socket_t (tcp_listener->accept (), true);
             } else {
 
                 //  If 'listen' flag is not set, object actively creates
Index: zmq_server/zmq_server.cpp
===================================================================
--- zmq_server/zmq_server.cpp	(revision 1409)
+++ zmq_server/zmq_server.cpp	(working copy)
@@ -163,7 +163,7 @@
 #ifdef ZMQ_TRACE
             printf ("Opening connection.\n");
 #endif
-	    socket_list.push_back (new tcp_socket_t (listening_socket, true));           
+	    socket_list.push_back (new tcp_socket_t (listening_socket.accept (), true));           
             fd_t s = socket_list.back ()->get_fd ();
             FD_SET (s, &source_set_fds);
             
Index: libzmq/tcp_socket.cpp
===================================================================
--- libzmq/tcp_socket.cpp	(revision 1409)
+++ libzmq/tcp_socket.cpp	(working copy)
@@ -48,14 +48,12 @@
     reopen ();
 }
 
-zmq::tcp_socket_t::tcp_socket_t (tcp_listener_t &listener, bool block_) :
-    s (retired_fd),
+zmq::tcp_socket_t::tcp_socket_t (fd_t fd_, bool block_) :
+    s (fd_),
     hostname (""),
     block (block_)
 {
-    //  Accept the socket.
-    s = listener.accept ();
-    wsa_assert (s != INVALID_SOCKET);
+    wsa_assert (s != retired_fd);
  
     //  Set socket properties to non-blocking mode. 
     if (! block) {
@@ -201,12 +199,12 @@
     reopen ();
 }
 
-zmq::tcp_socket_t::tcp_socket_t (tcp_listener_t &listener, bool block_) :
+zmq::tcp_socket_t::tcp_socket_t (fd_t fd_, bool block_) :
+    s(fd_),
     hostname (""),
     block (block_)
 {
-    //  Accept the socket.
-    s = listener.accept ();
+    assert (s != retired_fd);
     
     if (! block) {
 
Index: libzmq/bp_tcp_sender.cpp
===================================================================
--- libzmq/bp_tcp_sender.cpp	(revision 1409)
+++ libzmq/bp_tcp_sender.cpp	(working copy)
@@ -46,7 +46,7 @@
 }
 
 zmq::bp_tcp_sender_t::bp_tcp_sender_t (i_thread *calling_thread_,
-      i_thread *thread_, tcp_listener_t &listener_,
+      i_thread *thread_, fd_t fd_,
       const char *local_object_) :
     writebuf_size (bp_out_batch_size),
     write_size (0),
@@ -56,7 +56,7 @@
     local_object (local_object_),
     reconnect_flag (false),
     state (engine_connected),
-    socket (listener_)
+    socket (fd_)
 {
     //  Allocate write buffer.
     writebuf = new unsigned char [writebuf_size];
Index: libzmq/bp_tcp_listener.cpp
===================================================================
--- libzmq/bp_tcp_listener.cpp	(revision 1409)
+++ libzmq/bp_tcp_listener.cpp	(working copy)
@@ -84,12 +84,16 @@
 
 void zmq::bp_tcp_listener_t::in_event ()
 {
+    fd_t fd = listener.accept ();
+    if (fd == retired_fd)
+        return;
+
     if (!sender) {
 
         //  Create the engine to take care of the connection.
         //  TODO: make buffer size configurable by user
         bp_tcp_receiver_t *engine = new bp_tcp_receiver_t (poller,
-            handler_threads [current_handler_thread], listener, peer_name);
+            handler_threads [current_handler_thread], fd, peer_name);
         zmq_assert (engine);
 
         //  The newly created engine serves as a local source of messages
@@ -118,7 +122,7 @@
         //  Create the engine to take care of the connection.
         //  TODO: make buffer size configurable by user
         bp_tcp_sender_t *engine = new bp_tcp_sender_t (poller,
-            handler_threads [current_handler_thread], listener, peer_name);
+            handler_threads [current_handler_thread], fd, peer_name);
         zmq_assert (engine);
 
         //  The newly created engine serves as a local destination of messages
Index: libzmq/tcp_listener.cpp
===================================================================
--- libzmq/tcp_listener.cpp	(revision 1409)
+++ libzmq/tcp_listener.cpp	(working copy)
@@ -37,11 +37,12 @@
 #include <netinet/tcp.h>
 #include <netinet/in.h>
 #include <netdb.h>
+#include <fcntl.h>
 #endif
 
 #ifdef ZMQ_HAVE_WINDOWS
 
-zmq::tcp_listener_t::tcp_listener_t (const char *iface_)
+zmq::tcp_listener_t::tcp_listener_t (const char *iface_, bool block_)
 {
     //  Convert the hostname into sockaddr_in structure.
     sockaddr_in ip_address;
@@ -57,6 +58,13 @@
         (const char*) &flag, sizeof (int));
     wsa_assert (rc != SOCKET_ERROR);
 
+    //  Set non-blocking flag.
+    if (! block_) {
+        flag = 1;
+        rc = ioctlsocket (s, FIONBIO, &flag);
+        wsa_assert (rc != SOCKET_ERROR);
+    }
+
     //  Bind the socket to the network interface_i and port.
     rc = bind (s, (struct sockaddr*) &ip_address, sizeof (ip_address));
     wsa_assert (rc != SOCKET_ERROR);
@@ -102,6 +110,10 @@
 {
     //  Accept one incoming connection.
     fd_t sock = ::accept (s, NULL, NULL);
+    if (sock == INVALID_SOCKET && 
+        (WSAGetLastError () == WSAEWOULDBLOCK || WSAGetLastError () == WSAECONNRESET))
+        return retired_fd;
+
     wsa_assert (sock != INVALID_SOCKET);
     return sock;
 }
@@ -116,7 +128,7 @@
 
 #else
 
-zmq::tcp_listener_t::tcp_listener_t (const char *iface_)
+zmq::tcp_listener_t::tcp_listener_t (const char *iface_, bool block_)
 {
     //  Convert the hostname into sockaddr_in structure.
     sockaddr_in ip_address;
@@ -131,6 +143,15 @@
     int rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
     errno_assert (rc == 0);
 
+    //  Set non-blocking flag.
+    if (! block_) {
+        flag = fcntl (s, F_GETFL, 0);
+        if (flag == -1) 
+            flag = 0;
+        rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
+        errno_assert (rc != -1);
+    }
+
     //  Bind the socket to the network interface_i and port.
     rc = bind (s, (struct sockaddr*) &ip_address, sizeof (ip_address));
     errno_assert (rc == 0);
@@ -177,6 +198,41 @@
 {
     //  Accept one incoming connection.
     fd_t sock = ::accept (s, NULL, NULL);
+
+#if (defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD || \
+     defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_OSX)
+
+    if (sock == -1 && 
+        (errno == EAGAIN || errno == EWOULDBLOCK || 
+         errno == EINTR || errno == ECONNABORTED))
+        return retired_fd;
+
+#elif (defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_AIX)
+
+    if (sock == -1 && 
+        (errno == EWOULDBLOCK || errno == EINTR || 
+         errno == ECONNABORTED || errno == EPROTO))
+        return retired_fd;
+
+#elif defined ZMQ_HAVE_HPUX
+
+    if (sock == -1 && 
+        (errno == EAGAIN || errno == EWOULDBLOCK || 
+         errno == EINTR || errno == ECONNABORTED || errno == ENOBUFS))
+        return retired_fd;
+
+#elif defined ZMQ_HAVE_QNXNTO 
+
+    if (sock == -1 && 
+        (errno == EWOULDBLOCK || errno == EINTR || errno == ECONNABORTED))
+        return retired_fd;
+
+#elif defined ZMQ_HAVE_OPENVMS
+#error "Undefined non-blocking tcp errno values"
+#else
+#error "Undefined non-blocking tcp errno values"
+#endif
+
     errno_assert (sock != -1); 
     return sock;
 }
Index: libzmq/zmq/bp_tcp_receiver.hpp
===================================================================
--- libzmq/zmq/bp_tcp_receiver.hpp	(revision 1409)
+++ libzmq/zmq/bp_tcp_receiver.hpp	(working copy)
@@ -91,7 +91,7 @@
             const char *hostname_, const char *local_object_,
             const char * /* options_*/);
         bp_tcp_receiver_t (i_thread *calling_thread_, i_thread *thread_,
-            tcp_listener_t &listener_, const char *local_object_);
+            fd_t fd_, const char *local_object_);
 
         ~bp_tcp_receiver_t ();
 
Index: libzmq/zmq/tcp_socket.hpp
===================================================================
--- libzmq/zmq/tcp_socket.hpp	(revision 1409)
+++ libzmq/zmq/tcp_socket.hpp	(working copy)
@@ -44,9 +44,8 @@
         //  state.
         ZMQ_EXPORT tcp_socket_t (const char *hostname_,  bool block_ = false);
 
-        //  Opens a socket by accepting a connection from TCP listener object
-        ZMQ_EXPORT tcp_socket_t (tcp_listener_t &listener,
-            bool block_ = false);
+        //  Associates a socket with a native socket descriptor from TCP listener
+        ZMQ_EXPORT tcp_socket_t (fd_t fd_, bool block_ = false);
          
         //  Closes the socket.
         ZMQ_EXPORT ~tcp_socket_t ();
Index: libzmq/zmq/tcp_listener.hpp
===================================================================
--- libzmq/zmq/tcp_listener.hpp	(revision 1409)
+++ libzmq/zmq/tcp_listener.hpp	(working copy)
@@ -35,7 +35,7 @@
         //  Create TCP listining socket. Interface is either interface name,
         //  in that case port number is chosen by OS and can be retrieved
         //  by get_port method, or <interface-name>:<port-number>.
-        ZMQ_EXPORT tcp_listener_t (const char *interface_);
+        ZMQ_EXPORT tcp_listener_t (const char *interface_, bool block_ = false);
 
         //  Closes the socket.
         ZMQ_EXPORT ~tcp_listener_t ();
Index: libzmq/zmq/bp_tcp_sender.hpp
===================================================================
--- libzmq/zmq/bp_tcp_sender.hpp	(revision 1409)
+++ libzmq/zmq/bp_tcp_sender.hpp	(working copy)
@@ -91,7 +91,7 @@
             const char *hostname_, const char *local_object_,
             const char * /* options_*/);
         bp_tcp_sender_t (i_thread *calling_thread_, i_thread *thread_,
-            tcp_listener_t &listener_, const char *local_object_);
+            fd_t fd_, const char *local_object_);
 
         ~bp_tcp_sender_t ();
 
Index: libzmq/bp_tcp_receiver.cpp
===================================================================
--- libzmq/bp_tcp_receiver.cpp	(revision 1409)
+++ libzmq/bp_tcp_receiver.cpp	(working copy)
@@ -46,7 +46,7 @@
 }
 
 zmq::bp_tcp_receiver_t::bp_tcp_receiver_t (i_thread *calling_thread_,
-      i_thread *thread_, tcp_listener_t &listener_,
+      i_thread *thread_, fd_t fd_,
       const char *local_object_) :
     readbuf_size (bp_in_batch_size),
     read_size (0),
@@ -56,7 +56,7 @@
     local_object (local_object_),
     reconnect_flag (false),
     state (engine_connected),
-    socket (listener_)
+    socket (fd_)
 {
     //  Allocate read buffer.
     readbuf = new unsigned char [readbuf_size];


More information about the zeromq-dev mailing list