[zeromq-dev] patch: non-blocking tcp accept
Dhammika Pathirana
dhammika at gmail.com
Wed Apr 29 10:05:02 CEST 2009
Hi Martin,
Here's the patch, I added a #define block in accept to check for
platform specific errno values.
I didn't add errnos for OpenVMS, it's not defined in top level config file.
Patch is released under MIT license.
Dhammika
On Tue, Apr 28, 2009 at 3:35 AM, Martin Sustrik <sustrik at fastmq.com> wrote:
> Hi Dhammika,
>
>> Patch to add non-blocking tcp accept.
>>
>> 1. sets non-blocking flag in tcp listener socket
>> 2. moves accept to tcp_listener.in_event()
>> 3. passes native socket descriptor to tcp_socket constructor
>>
>> Builds and tested on linux.
>> Would appreciate if someone can run a quick build on a win32 box.
>> Hopefully it'll build without any drama.
>>
>> Requires few changes in zmq_server, perf and examples (mainly in
>> tcp_socket constructor).
>> If we're happy with this, I'll send a full patch.
>
> The patch looks OK. Please, do send the full patch (and state it's licensed
> under MIT license).
>
> The one thing I'm unsure of is EAGAIN/EWOULDBLOCK distinction. It seems that
> both Linux and Win32 assign the same numeric value to both errors. I am not
> sure of other platforms.
>
> POSIX says:
>
> [EAGAIN]
> Resource unavailable, try again (may be the same value as [EWOULDBLOCK]).
>
> [EWOULDBLOCK]
> Operation would block (may be the same value as [EAGAIN]).
>
> Pretty confusing...
>
> Martin
>
-------------- next part --------------
Index: perf/transports/tcp_transport.hpp
===================================================================
--- perf/transports/tcp_transport.hpp (revision 1409)
+++ perf/transports/tcp_transport.hpp (working copy)
@@ -61,10 +61,10 @@
// initiated by the other party.
// Create listening socket.
- tcp_listener = new zmq::tcp_listener_t (iface_or_host_);
+ tcp_listener = new zmq::tcp_listener_t (iface_or_host_, true);
// Wait for and accept first incoming connection.
- tcp_socket = new zmq::tcp_socket_t (*tcp_listener, true);
+ tcp_socket = new zmq::tcp_socket_t (tcp_listener->accept (), true);
} else {
// If 'listen' flag is not set, object actively creates
Index: zmq_server/zmq_server.cpp
===================================================================
--- zmq_server/zmq_server.cpp (revision 1409)
+++ zmq_server/zmq_server.cpp (working copy)
@@ -163,7 +163,7 @@
#ifdef ZMQ_TRACE
printf ("Opening connection.\n");
#endif
- socket_list.push_back (new tcp_socket_t (listening_socket, true));
+ socket_list.push_back (new tcp_socket_t (listening_socket.accept (), true));
fd_t s = socket_list.back ()->get_fd ();
FD_SET (s, &source_set_fds);
Index: libzmq/tcp_socket.cpp
===================================================================
--- libzmq/tcp_socket.cpp (revision 1409)
+++ libzmq/tcp_socket.cpp (working copy)
@@ -48,14 +48,12 @@
reopen ();
}
-zmq::tcp_socket_t::tcp_socket_t (tcp_listener_t &listener, bool block_) :
- s (retired_fd),
+zmq::tcp_socket_t::tcp_socket_t (fd_t fd_, bool block_) :
+ s (fd_),
hostname (""),
block (block_)
{
- // Accept the socket.
- s = listener.accept ();
- wsa_assert (s != INVALID_SOCKET);
+ wsa_assert (s != retired_fd);
// Set socket properties to non-blocking mode.
if (! block) {
@@ -201,12 +199,12 @@
reopen ();
}
-zmq::tcp_socket_t::tcp_socket_t (tcp_listener_t &listener, bool block_) :
+zmq::tcp_socket_t::tcp_socket_t (fd_t fd_, bool block_) :
+ s(fd_),
hostname (""),
block (block_)
{
- // Accept the socket.
- s = listener.accept ();
+ assert (s != retired_fd);
if (! block) {
Index: libzmq/bp_tcp_sender.cpp
===================================================================
--- libzmq/bp_tcp_sender.cpp (revision 1409)
+++ libzmq/bp_tcp_sender.cpp (working copy)
@@ -46,7 +46,7 @@
}
zmq::bp_tcp_sender_t::bp_tcp_sender_t (i_thread *calling_thread_,
- i_thread *thread_, tcp_listener_t &listener_,
+ i_thread *thread_, fd_t fd_,
const char *local_object_) :
writebuf_size (bp_out_batch_size),
write_size (0),
@@ -56,7 +56,7 @@
local_object (local_object_),
reconnect_flag (false),
state (engine_connected),
- socket (listener_)
+ socket (fd_)
{
// Allocate write buffer.
writebuf = new unsigned char [writebuf_size];
Index: libzmq/bp_tcp_listener.cpp
===================================================================
--- libzmq/bp_tcp_listener.cpp (revision 1409)
+++ libzmq/bp_tcp_listener.cpp (working copy)
@@ -84,12 +84,16 @@
void zmq::bp_tcp_listener_t::in_event ()
{
+ fd_t fd = listener.accept ();
+ if (fd == retired_fd)
+ return;
+
if (!sender) {
// Create the engine to take care of the connection.
// TODO: make buffer size configurable by user
bp_tcp_receiver_t *engine = new bp_tcp_receiver_t (poller,
- handler_threads [current_handler_thread], listener, peer_name);
+ handler_threads [current_handler_thread], fd, peer_name);
zmq_assert (engine);
// The newly created engine serves as a local source of messages
@@ -118,7 +122,7 @@
// Create the engine to take care of the connection.
// TODO: make buffer size configurable by user
bp_tcp_sender_t *engine = new bp_tcp_sender_t (poller,
- handler_threads [current_handler_thread], listener, peer_name);
+ handler_threads [current_handler_thread], fd, peer_name);
zmq_assert (engine);
// The newly created engine serves as a local destination of messages
Index: libzmq/tcp_listener.cpp
===================================================================
--- libzmq/tcp_listener.cpp (revision 1409)
+++ libzmq/tcp_listener.cpp (working copy)
@@ -37,11 +37,12 @@
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
+#include <fcntl.h>
#endif
#ifdef ZMQ_HAVE_WINDOWS
-zmq::tcp_listener_t::tcp_listener_t (const char *iface_)
+zmq::tcp_listener_t::tcp_listener_t (const char *iface_, bool block_)
{
// Convert the hostname into sockaddr_in structure.
sockaddr_in ip_address;
@@ -57,6 +58,13 @@
(const char*) &flag, sizeof (int));
wsa_assert (rc != SOCKET_ERROR);
+ // Set non-blocking flag.
+ if (! block_) {
+ flag = 1;
+ rc = ioctlsocket (s, FIONBIO, &flag);
+ wsa_assert (rc != SOCKET_ERROR);
+ }
+
// Bind the socket to the network interface_i and port.
rc = bind (s, (struct sockaddr*) &ip_address, sizeof (ip_address));
wsa_assert (rc != SOCKET_ERROR);
@@ -102,6 +110,10 @@
{
// Accept one incoming connection.
fd_t sock = ::accept (s, NULL, NULL);
+ if (sock == INVALID_SOCKET &&
+ (WSAGetLastError () == WSAEWOULDBLOCK || WSAGetLastError () == WSAECONNRESET))
+ return retired_fd;
+
wsa_assert (sock != INVALID_SOCKET);
return sock;
}
@@ -116,7 +128,7 @@
#else
-zmq::tcp_listener_t::tcp_listener_t (const char *iface_)
+zmq::tcp_listener_t::tcp_listener_t (const char *iface_, bool block_)
{
// Convert the hostname into sockaddr_in structure.
sockaddr_in ip_address;
@@ -131,6 +143,15 @@
int rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0);
+ // Set non-blocking flag.
+ if (! block_) {
+ flag = fcntl (s, F_GETFL, 0);
+ if (flag == -1)
+ flag = 0;
+ rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
+ errno_assert (rc != -1);
+ }
+
// Bind the socket to the network interface_i and port.
rc = bind (s, (struct sockaddr*) &ip_address, sizeof (ip_address));
errno_assert (rc == 0);
@@ -177,6 +198,41 @@
{
// Accept one incoming connection.
fd_t sock = ::accept (s, NULL, NULL);
+
+#if (defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD || \
+ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_OSX)
+
+ if (sock == -1 &&
+ (errno == EAGAIN || errno == EWOULDBLOCK ||
+ errno == EINTR || errno == ECONNABORTED))
+ return retired_fd;
+
+#elif (defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_AIX)
+
+ if (sock == -1 &&
+ (errno == EWOULDBLOCK || errno == EINTR ||
+ errno == ECONNABORTED || errno == EPROTO))
+ return retired_fd;
+
+#elif defined ZMQ_HAVE_HPUX
+
+ if (sock == -1 &&
+ (errno == EAGAIN || errno == EWOULDBLOCK ||
+ errno == EINTR || errno == ECONNABORTED || errno == ENOBUFS))
+ return retired_fd;
+
+#elif defined ZMQ_HAVE_QNXNTO
+
+ if (sock == -1 &&
+ (errno == EWOULDBLOCK || errno == EINTR || errno == ECONNABORTED))
+ return retired_fd;
+
+#elif defined ZMQ_HAVE_OPENVMS
+#error "Undefined non-blocking tcp errno values"
+#else
+#error "Undefined non-blocking tcp errno values"
+#endif
+
errno_assert (sock != -1);
return sock;
}
Index: libzmq/zmq/bp_tcp_receiver.hpp
===================================================================
--- libzmq/zmq/bp_tcp_receiver.hpp (revision 1409)
+++ libzmq/zmq/bp_tcp_receiver.hpp (working copy)
@@ -91,7 +91,7 @@
const char *hostname_, const char *local_object_,
const char * /* options_*/);
bp_tcp_receiver_t (i_thread *calling_thread_, i_thread *thread_,
- tcp_listener_t &listener_, const char *local_object_);
+ fd_t fd_, const char *local_object_);
~bp_tcp_receiver_t ();
Index: libzmq/zmq/tcp_socket.hpp
===================================================================
--- libzmq/zmq/tcp_socket.hpp (revision 1409)
+++ libzmq/zmq/tcp_socket.hpp (working copy)
@@ -44,9 +44,8 @@
// state.
ZMQ_EXPORT tcp_socket_t (const char *hostname_, bool block_ = false);
- // Opens a socket by accepting a connection from TCP listener object
- ZMQ_EXPORT tcp_socket_t (tcp_listener_t &listener,
- bool block_ = false);
+ // Associates a socket with a native socket descriptor from TCP listener
+ ZMQ_EXPORT tcp_socket_t (fd_t fd_, bool block_ = false);
// Closes the socket.
ZMQ_EXPORT ~tcp_socket_t ();
Index: libzmq/zmq/tcp_listener.hpp
===================================================================
--- libzmq/zmq/tcp_listener.hpp (revision 1409)
+++ libzmq/zmq/tcp_listener.hpp (working copy)
@@ -35,7 +35,7 @@
// Create TCP listining socket. Interface is either interface name,
// in that case port number is chosen by OS and can be retrieved
// by get_port method, or <interface-name>:<port-number>.
- ZMQ_EXPORT tcp_listener_t (const char *interface_);
+ ZMQ_EXPORT tcp_listener_t (const char *interface_, bool block_ = false);
// Closes the socket.
ZMQ_EXPORT ~tcp_listener_t ();
Index: libzmq/zmq/bp_tcp_sender.hpp
===================================================================
--- libzmq/zmq/bp_tcp_sender.hpp (revision 1409)
+++ libzmq/zmq/bp_tcp_sender.hpp (working copy)
@@ -91,7 +91,7 @@
const char *hostname_, const char *local_object_,
const char * /* options_*/);
bp_tcp_sender_t (i_thread *calling_thread_, i_thread *thread_,
- tcp_listener_t &listener_, const char *local_object_);
+ fd_t fd_, const char *local_object_);
~bp_tcp_sender_t ();
Index: libzmq/bp_tcp_receiver.cpp
===================================================================
--- libzmq/bp_tcp_receiver.cpp (revision 1409)
+++ libzmq/bp_tcp_receiver.cpp (working copy)
@@ -46,7 +46,7 @@
}
zmq::bp_tcp_receiver_t::bp_tcp_receiver_t (i_thread *calling_thread_,
- i_thread *thread_, tcp_listener_t &listener_,
+ i_thread *thread_, fd_t fd_,
const char *local_object_) :
readbuf_size (bp_in_batch_size),
read_size (0),
@@ -56,7 +56,7 @@
local_object (local_object_),
reconnect_flag (false),
state (engine_connected),
- socket (listener_)
+ socket (fd_)
{
// Allocate read buffer.
readbuf = new unsigned char [readbuf_size];
More information about the zeromq-dev
mailing list