[zeromq-dev] zmq_poll performance question

Francis Le Bourse zno-reply-francis.lebourse at sfr-sh.fr
Tue Nov 25 13:45:09 CET 2014


On 11/24/2014 8:08 PM, Arnaud Kapp wrote:
>> Currently, the patch is written for 3.2.4. I'll wait to put it in libzmq master
> Oh okay. This is the commit that added the flag:
> https://github.com/zeromq/libzmq/commit/779c37abc433cb6595ddeedaf86b280317656bdd
I have rewritten the patch to take the POLLPRI flag into account, and to 
apply cleanly to the current master (Wed 11/25).
Cheers,
Francis

>
> libzmq was 4.1 at the time I believe.
>
> I'll probably look at it this week-end then :)
>
> On Mon, Nov 24, 2014 at 12:10 PM, Francis Le Bourse
> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>> Hi,
>> On 11/24/2014 11:35 AM, Arnaud Kapp wrote:
>>> Hello,
>>>
>>> I recently added support for POLLPRI flag.
>>> It looks like it's not handled in your patch
>> No, it isn't handled. In which version do you have added this flag ?
>> Currently, the patch is written for 3.2.4. I'll wait to put it in libzmq
>> master.
>>>    and that it needs custom
>>> support. Since there is no test related to this flags you wouldn't
>>> notice.
>>>
>>> I can give it a look if you want.
>> That would be nice.
>>
>> Cheers,
>> Francis
>>
>>
>>> On Sat, Nov 22, 2014 at 2:16 PM, Pieter Hintjens <ph at imatix.com> wrote:
>>>> I suggest you send the patch to libzmq master, and ensure all test
>>>> cases pass. Then we can get this into the next version.
>>>>
>>>> On Fri, Nov 21, 2014 at 2:50 PM, Francis Le Bourse
>>>> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>>>>> Hi,
>>>>>
>>>>> On 11/6/2014 3:18 PM, Pieter Hintjens wrote:
>>>>>> Oh, ok. Sounds like you have a good candidate for some before/after
>>>>>> measurement and optimization. Are you going to try to make a patch for
>>>>>> this?
>>>>> I have a patch candidate for this optimization, the performance
>>>>> improvement
>>>>> is very good and it doesn't seem to introduce any new instability.
>>>>> What is modified:
>>>>>       - zmq_poll(), there is only one poll() now,
>>>>>       - and epoll() from epoll.cpp
>>>>> Other calls to poll() and select() are left unmodified.
>>>>>
>>>>> I woulld be happy to have any feedback.
>>>>>
>>>>>
>>>>> Cheers,
>>>>> Francis
>>>>>>
>>>>>> On Thu, Nov 6, 2014 at 2:09 PM, Francis Le Bourse
>>>>>> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>>>>>>> On 11/6/2014 11:47 AM, Pieter Hintjens wrote:
>>>>>>>> A simple optimization is, when you are polling sockets for input, to
>>>>>>>> continue reading from an active socket using a non-blocking read. So
>>>>>>>> you process all waiting messages on a socket and then only switch
>>>>>>>> back
>>>>>>>> to poll when needed.
>>>>>>> Thank you for you quick reply.
>>>>>>>
>>>>>>> Yes, but the question was more about the zmq_poll() internals.
>>>>>>> For 600+ file descriptors, zmq_poll() calls poll() a huge number of
>>>>>>> times
>>>>>>> for only a few that will trigger a POLLIN and the relevant information
>>>>>>> is
>>>>>>> already known / present in the pollfds array. The performance hit is
>>>>>>> there.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Francis
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> On Thu, Nov 6, 2014 at 11:28 AM, Francis Le Bourse
>>>>>>>> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am looking at a performance issue in zmq, when the number of
>>>>>>>>> zsockets
>>>>>>>>> /
>>>>>>>>> file descriptors becomes large.
>>>>>>>>> The relevant calls are:
>>>>>>>>>         poll+0x57
>>>>>>>>>         zmq_poll+0x2e3
>>>>>>>>>         zloop_start+0x1e8
>>>>>>>>>         main+0xb40
>>>>>>>>>         __libc_start_main+0xfd
>>>>>>>>> immediately followed by a loop of
>>>>>>>>>         poll+0x57
>>>>>>>>>         zmq::signaler_t::wait(int)+0x33
>>>>>>>>>         zmq::mailbox_t::recv(zmq::command_t*, int)+0x78
>>>>>>>>>         zmq::socket_base_t::process_commands(int, bool)+0xbe
>>>>>>>>>         zmq::socket_base_t::getsockopt(int, void*, unsigned
>>>>>>>>> long*)+0x135
>>>>>>>>>         zmq_getsockopt+0x75
>>>>>>>>>         zmq_poll+0x3da
>>>>>>>>>         zloop_start+0x1e8
>>>>>>>>>         main+0xb40
>>>>>>>>>         __libc_start_main+0xfd
>>>>>>>>>
>>>>>>>>> The code in the loop is executed once per file descriptor in the
>>>>>>>>> initial
>>>>>>>>> pollarray, redoing a poll() system call each time.
>>>>>>>>> Is there a reason to proceed that way ?
>>>>>>>>> Would be possible to reuse the results of the first poll() in order
>>>>>>>>> to
>>>>>>>>> bypass the second set of system calls ?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Francis
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> _______________________________________________
>>>>>>>>> zeromq-dev mailing list
>>>>>>>>> zeromq-dev at lists.zeromq.org
>>>>>>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>>>>>>>
>>>> _______________________________________________
>>>> zeromq-dev mailing list
>>>> zeromq-dev at lists.zeromq.org
>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>
>>>
>>
>
>

-------------- next part --------------
diff -urwB libzmq-original/src/ctx.cpp libzmq/src/ctx.cpp
--- libzmq-original/src/ctx.cpp	2014-11-25 11:01:35.704930597 +0100
+++ libzmq/src/ctx.cpp	2014-11-25 11:03:41.041081315 +0100
@@ -28,6 +28,8 @@
 #include <new>
 #include <string.h>
 
+#include <poll.h>
+
 #include "ctx.hpp"
 #include "socket_base.hpp"
 #include "io_thread.hpp"
diff -urwB libzmq-original/src/epoll.cpp libzmq/src/epoll.cpp
--- libzmq-original/src/epoll.cpp	2014-11-25 11:01:35.707930552 +0100
+++ libzmq/src/epoll.cpp	2014-11-25 12:31:57.856918921 +0100
@@ -27,6 +27,8 @@
 #include <algorithm>
 #include <new>
 
+#include <poll.h>
+
 #include "epoll.hpp"
 #include "err.hpp"
 #include "config.hpp"
@@ -152,18 +154,43 @@
         for (int i = 0; i < n; i ++) {
             poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr);
 
+	    uint32_t events = ev_buf[i].events;
+	    ev_buf[i].events = 0;
+
+	    // build a pollfd structure with the events returned by
+	    // epoll and call in_events_ex in order to bypass poll()
+	    // if EPOLLIN is present in events
+
+	    struct pollfd pfd;
+	    pfd.fd = pe->fd;
+	    pfd.events = 0;
+	    if(pe->ev.events & EPOLLIN)
+		pfd.events |= POLLIN;
+	    if(pe->ev.events & EPOLLOUT)
+		pfd.events |= POLLOUT;
+	    pfd.revents = 0;
+	    
+	    if(events & EPOLLIN)
+		pfd.revents |= POLLIN;
+	    if(events & EPOLLOUT)
+		pfd.revents |= POLLOUT;
+	    if(events & EPOLLERR)
+		pfd.revents |= POLLERR;
+	    if(events & EPOLLHUP)
+		pfd.revents |= POLLHUP;
+	    
             if (pe->fd == retired_fd)
                 continue;
-            if (ev_buf [i].events & (EPOLLERR | EPOLLHUP))
+            if (events & (EPOLLERR | EPOLLHUP))
                 pe->events->in_event ();
             if (pe->fd == retired_fd)
                continue;
-            if (ev_buf [i].events & EPOLLOUT)
+            if (events & EPOLLOUT)
                 pe->events->out_event ();
             if (pe->fd == retired_fd)
                 continue;
-            if (ev_buf [i].events & EPOLLIN)
-                pe->events->in_event ();
+            if (events & EPOLLIN)
+		pe->events->in_event_ex (&pfd);
         }
 
         //  Destroy retired event sources.
diff -urwB libzmq-original/src/i_poll_events.hpp libzmq/src/i_poll_events.hpp
--- libzmq-original/src/i_poll_events.hpp	2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/i_poll_events.hpp	2014-11-25 11:03:41.042081411 +0100
@@ -20,6 +20,8 @@
 #ifndef __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
 #define __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
  
+#include <poll.h>
+
 namespace zmq
 {
  
@@ -32,6 +34,7 @@
  
         // Called by I/O thread when file descriptor is ready for reading.
         virtual void in_event () = 0;
+        virtual void in_event_ex (pollfd *) = 0;
  
         // Called by I/O thread when file descriptor is ready for writing.
         virtual void out_event () = 0;
diff -urwB libzmq-original/src/io_object.cpp libzmq/src/io_object.cpp
--- libzmq-original/src/io_object.cpp	2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/io_object.cpp	2014-11-25 11:03:41.042081411 +0100
@@ -95,6 +95,11 @@
     zmq_assert (false);
 }
 
+void zmq::io_object_t::in_event_ex (pollfd *pfd)
+{
+    zmq_assert (false);
+}
+
 void zmq::io_object_t::out_event ()
 {
     zmq_assert (false);
diff -urwB libzmq-original/src/io_object.hpp libzmq/src/io_object.hpp
--- libzmq-original/src/io_object.hpp	2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/io_object.hpp	2014-11-25 11:03:41.043081508 +0100
@@ -63,6 +63,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB libzmq-original/src/io_thread.cpp libzmq/src/io_thread.cpp
--- libzmq-original/src/io_thread.cpp	2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/io_thread.cpp	2014-11-25 11:03:41.043081508 +0100
@@ -77,6 +77,23 @@
     errno_assert (rc != 0 && errno == EAGAIN);
 }
 
+void zmq::io_thread_t::in_event_ex (pollfd *pfd)
+{
+    //  TODO: Do we want to limit number of commands I/O thread can
+    //  process in a single go?
+
+    command_t cmd;
+    int rc = mailbox.recv_ex (&cmd, 0, pfd); // will clear POLLIN
+    
+    while (rc == 0 || errno == EINTR) {
+        if (rc == 0)
+            cmd.destination->process_command (cmd);
+        rc = mailbox.recv_ex (&cmd, 0, pfd);
+    }
+
+    errno_assert (rc != 0 && errno == EAGAIN);
+}
+
 void zmq::io_thread_t::out_event ()
 {
     //  We are never polling for POLLOUT here. This function is never called.
diff -urwB libzmq-original/src/io_thread.hpp libzmq/src/io_thread.hpp
--- libzmq-original/src/io_thread.hpp	2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/io_thread.hpp	2014-11-25 11:03:41.043081508 +0100
@@ -57,6 +57,7 @@
 
         //  i_poll_events implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB libzmq-original/src/ipc_connecter.cpp libzmq/src/ipc_connecter.cpp
--- libzmq-original/src/ipc_connecter.cpp	2014-11-25 11:01:35.710930507 +0100
+++ libzmq/src/ipc_connecter.cpp	2014-11-25 11:03:41.043081508 +0100
@@ -99,6 +99,14 @@
     out_event ();
 }
 
+void zmq::ipc_connecter_t::in_event_ex (pollfd *pfd)
+{
+    //  We are not polling for incomming data, so we are actually called
+    //  because of error here. However, we can get error on out event as well
+    //  on some platforms, so we'll simply handle both events in the same way.
+    out_event ();
+}
+
 void zmq::ipc_connecter_t::out_event ()
 {
     fd_t fd = connect ();
diff -urwB libzmq-original/src/ipc_connecter.hpp libzmq/src/ipc_connecter.hpp
--- libzmq-original/src/ipc_connecter.hpp	2014-11-25 11:01:35.710930507 +0100
+++ libzmq/src/ipc_connecter.hpp	2014-11-25 11:03:41.044081604 +0100
@@ -58,6 +58,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB libzmq-original/src/ipc_listener.cpp libzmq/src/ipc_listener.cpp
--- libzmq-original/src/ipc_listener.cpp	2014-11-25 11:01:35.710930507 +0100
+++ libzmq/src/ipc_listener.cpp	2014-11-25 11:03:41.044081604 +0100
@@ -110,6 +110,11 @@
     socket->event_accepted (endpoint, fd);
 }
 
+void zmq::ipc_listener_t::in_event_ex (pollfd *pfd)
+{
+    zmq::ipc_listener_t::in_event();
+}
+
 int zmq::ipc_listener_t::get_address (std::string &addr_)
 {
     struct sockaddr_storage ss;
diff -urwB libzmq-original/src/ipc_listener.hpp libzmq/src/ipc_listener.hpp
--- libzmq-original/src/ipc_listener.hpp	2014-11-25 11:01:35.710930507 +0100
+++ libzmq/src/ipc_listener.hpp	2014-11-25 11:03:41.044081604 +0100
@@ -59,6 +59,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
 
         //  Close the listening socket.
         int close ();
diff -urwB libzmq-original/src/mailbox.cpp libzmq/src/mailbox.cpp
--- libzmq-original/src/mailbox.cpp	2014-11-25 11:01:35.711930493 +0100
+++ libzmq/src/mailbox.cpp	2014-11-25 11:03:41.045081698 +0100
@@ -17,6 +17,10 @@
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
+#if defined ZMQ_POLL_BASED_ON_POLL
+#include <poll.h>
+#endif
+
 #include "mailbox.hpp"
 #include "err.hpp"
 
@@ -84,3 +88,32 @@
     zmq_assert (ok);
     return 0;
 }
+
+int zmq::mailbox_t::recv_ex (command_t *cmd_, int timeout_, pollfd *pfd_)
+{
+    //  Try to get the command straight away.
+    if (active) {
+        bool ok = cpipe.read (cmd_);
+        if (ok)
+            return 0;
+
+        //  If there are no more commands available, switch into passive state.
+        active = false;
+        signaler.recv ();
+    }
+
+    //  Wait for signal from the command sender.
+    int rc = signaler.wait_ex (timeout_, pfd_);
+    if (rc != 0 && (errno == EAGAIN || errno == EINTR))
+        return -1;
+
+    //  We've got the signal. Now we can switch into active state.
+    active = true;
+
+    //  Get a command.
+    errno_assert (rc == 0);
+    bool ok = cpipe.read (cmd_);
+    zmq_assert (ok);
+    return 0;
+}
+
diff -urwB libzmq-original/src/mailbox.hpp libzmq/src/mailbox.hpp
--- libzmq-original/src/mailbox.hpp	2014-11-25 11:01:35.712930479 +0100
+++ libzmq/src/mailbox.hpp	2014-11-25 11:03:41.045081698 +0100
@@ -22,6 +22,8 @@
 
 #include <stddef.h>
 
+#include <poll.h>
+
 #include "platform.hpp"
 #include "signaler.hpp"
 #include "fd.hpp"
@@ -43,6 +45,7 @@
         fd_t get_fd () const;
         void send (const command_t &cmd_);
         int recv (command_t *cmd_, int timeout_);
+        int recv_ex (command_t *cmd_, int timeout_, pollfd *pfd_);
 
 #ifdef HAVE_FORK
         // close the file descriptors in the signaller. This is used in a forked
diff -urwB libzmq-original/src/pgm_receiver.cpp libzmq/src/pgm_receiver.cpp
--- libzmq-original/src/pgm_receiver.cpp	2014-11-25 11:01:35.715930435 +0100
+++ libzmq/src/pgm_receiver.cpp	2014-11-25 11:03:41.045081698 +0100
@@ -275,6 +275,11 @@
 }
 
 
+void zmq::pgm_receiver_t::in_event_ex (pollfd *pfd)
+{
+    zmq::pgm_receiver_t::in_event();
+}
+
 void zmq::pgm_receiver_t::timer_event (int token)
 {
     zmq_assert (token == rx_timer_id);
diff -urwB libzmq-original/src/pgm_receiver.hpp libzmq/src/pgm_receiver.hpp
--- libzmq-original/src/pgm_receiver.hpp	2014-11-25 11:01:35.715930435 +0100
+++ libzmq/src/pgm_receiver.hpp	2014-11-25 11:03:41.045081698 +0100
@@ -63,6 +63,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void timer_event (int token);
 
     private:
diff -urwB libzmq-original/src/pgm_sender.cpp libzmq/src/pgm_sender.cpp
--- libzmq-original/src/pgm_sender.cpp	2014-11-25 11:01:35.715930435 +0100
+++ libzmq/src/pgm_sender.cpp	2014-11-25 11:03:41.046081791 +0100
@@ -157,6 +157,11 @@
     }
 }
 
+void zmq::pgm_sender_t::in_event_ex (pollfd *pfd)
+{
+    zmq::pgm_sender_t::in_event ();
+}
+
 void zmq::pgm_sender_t::out_event ()
 {
     //  POLLOUT event from send socket. If write buffer is empty, 
diff -urwB libzmq-original/src/pgm_sender.hpp libzmq/src/pgm_sender.hpp
--- libzmq-original/src/pgm_sender.hpp	2014-11-25 11:01:35.716930420 +0100
+++ libzmq/src/pgm_sender.hpp	2014-11-25 11:03:41.046081791 +0100
@@ -62,6 +62,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int token);
 
diff -urwB libzmq-original/src/reaper.cpp libzmq/src/reaper.cpp
--- libzmq-original/src/reaper.cpp	2014-11-25 11:01:35.720930360 +0100
+++ libzmq/src/reaper.cpp	2014-11-25 11:03:41.046081791 +0100
@@ -83,6 +83,24 @@
     }
 }
 
+void zmq::reaper_t::in_event_ex (pollfd *pfd)
+{
+    while (true) {
+
+        //  Get the next command. If there is none, exit.
+        command_t cmd;
+        int rc = mailbox.recv_ex (&cmd, 0, pfd);
+        if (rc != 0 && errno == EINTR)
+            continue;
+        if (rc != 0 && errno == EAGAIN)
+            break;
+        errno_assert (rc == 0);
+
+        //  Process the command.
+        cmd.destination->process_command (cmd);
+    }
+}
+
 void zmq::reaper_t::out_event ()
 {
     zmq_assert (false);
diff -urwB libzmq-original/src/reaper.hpp libzmq/src/reaper.hpp
--- libzmq-original/src/reaper.hpp	2014-11-25 11:01:35.720930360 +0100
+++ libzmq/src/reaper.hpp	2014-11-25 11:03:41.047081882 +0100
@@ -45,6 +45,7 @@
 
         //  i_poll_events implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB libzmq-original/src/signaler.cpp libzmq/src/signaler.cpp
--- libzmq-original/src/signaler.cpp	2014-11-25 11:01:35.721930346 +0100
+++ libzmq/src/signaler.cpp	2014-11-25 12:19:39.843830861 +0100
@@ -260,6 +260,94 @@
 #endif
 }
 
+int zmq::signaler_t::wait_ex (int timeout_, pollfd *pfd_)
+{
+#ifdef HAVE_FORK
+    if (unlikely (pid != getpid ())) {
+        // we have forked and the file descriptor is closed. Emulate an interupt
+        // response.
+        //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
+        errno = EINTR;
+        return -1;
+    }
+#endif
+
+#ifdef ZMQ_POLL_BASED_ON_POLL
+    struct pollfd pfd;
+    pfd.fd = r;
+    pfd.events = POLLIN;
+    if(timeout_)
+    {
+	int rc = poll (&pfd, 1, timeout_);
+	if (unlikely (rc < 0)) {
+	    errno_assert (errno == EINTR);
+	    return -1;
+	}
+	else if (unlikely (rc == 0)) {
+	    errno = EAGAIN;
+	    return -1;
+	}
+#ifdef HAVE_FORK
+	else
+	    if (unlikely (pid != getpid ())) {
+		// we have forked and the file descriptor is closed. Emulate an interupt
+		// response.
+		//printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
+		errno = EINTR;
+		return -1;
+	    }
+#endif
+	zmq_assert (rc == 1);
+	zmq_assert (pfd.revents & POLLIN);
+	return 0;
+    }
+    else
+    {
+	if((pfd_->revents & POLLIN) == 0)
+	{
+	    // simulate EAGAIN
+	    errno = EAGAIN;
+	    return -1;
+	}
+	zmq_assert (pfd_->revents & POLLIN);
+	// MUST clear POLLIN
+	pfd_->revents &= ~POLLIN;
+	return 0;
+    }
+    
+#elif defined ZMQ_POLL_BASED_ON_SELECT
+    fd_set fds;
+    FD_ZERO (&fds);
+    FD_SET (r, &fds);
+    struct timeval timeout;
+    if (timeout_ >= 0) {
+        timeout.tv_sec = timeout_ / 1000;
+        timeout.tv_usec = timeout_ % 1000 * 1000;
+    }
+#ifdef ZMQ_HAVE_WINDOWS
+    int rc = select (0, &fds, NULL, NULL,
+        timeout_ >= 0 ? &timeout : NULL);
+    wsa_assert (rc != SOCKET_ERROR);
+#else
+    int rc = select (r + 1, &fds, NULL, NULL,
+        timeout_ >= 0 ? &timeout : NULL);
+    if (unlikely (rc < 0)) {
+        errno_assert (errno == EINTR);
+        return -1;
+    }
+#endif
+    if (unlikely (rc == 0)) {
+        errno = EAGAIN;
+        return -1;
+    }
+    zmq_assert (rc == 1);
+    return 0;
+
+#else
+#error
+#endif
+}
+
 void zmq::signaler_t::recv ()
 {
     //  Attempt to read a signal.
diff -urwB libzmq-original/src/signaler.hpp libzmq/src/signaler.hpp
--- libzmq-original/src/signaler.hpp	2014-11-25 11:01:35.722930332 +0100
+++ libzmq/src/signaler.hpp	2014-11-25 11:03:41.047081882 +0100
@@ -24,6 +24,8 @@
 #include <unistd.h>
 #endif
 
+#include <poll.h>
+
 #include "fd.hpp"
 
 namespace zmq
@@ -44,6 +46,7 @@
         fd_t get_fd () const;
         void send ();
         int wait (int timeout_);
+        int wait_ex (int timeout_, pollfd *pfd_);
         void recv ();
 
 #ifdef HAVE_FORK
diff -urwB libzmq-original/src/socket_base.cpp libzmq/src/socket_base.cpp
--- libzmq-original/src/socket_base.cpp	2014-11-25 11:01:35.722930332 +0100
+++ libzmq/src/socket_base.cpp	2014-11-25 11:03:41.048081973 +0100
@@ -36,6 +36,8 @@
 #include <unistd.h>
 #endif
 
+#include <poll.h>
+
 #include "socket_base.hpp"
 #include "tcp_listener.hpp"
 #include "ipc_listener.hpp"
@@ -346,6 +348,38 @@
     return options.getsockopt (option_, optval_, optvallen_);
 }
 
+// only for ZMQ_EVENTS
+int zmq::socket_base_t::getsockopt_ex (int option_, void *optval_,
+				       size_t *optvallen_, pollfd *pfd_)
+{
+    if (unlikely (ctx_terminated)) {
+        errno = ETERM;
+        return -1;
+    }
+
+    assert (option_ == ZMQ_EVENTS);
+
+    if (option_ == ZMQ_EVENTS) {
+        if (*optvallen_ < sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        int rc = process_commands_ex (0, false, pfd_);
+        if (rc != 0 && (errno == EINTR || errno == ETERM))
+            return -1;
+        errno_assert (rc == 0);
+        *((int*) optval_) = 0;
+        if (has_out ())
+            *((int*) optval_) |= ZMQ_POLLOUT;
+        if (has_in ())
+            *((int*) optval_) |= ZMQ_POLLIN;
+        *optvallen_ = sizeof (int);
+        return 0;
+    }
+
+    return 0;
+}
+
 int zmq::socket_base_t::bind (const char *addr_)
 {
     if (unlikely (ctx_terminated)) {
@@ -1026,6 +1060,62 @@
     return 0;
 }
 
+int zmq::socket_base_t::process_commands_ex (int timeout_, bool throttle_, pollfd *pfd_)
+{
+    int rc;
+    command_t cmd;
+    if (timeout_ != 0) {
+
+        //  If we are asked to wait, simply ask mailbox to wait.
+        rc = mailbox.recv_ex (&cmd, timeout_, pfd_);
+    }
+    else {
+
+        //  If we are asked not to wait, check whether we haven't processed
+        //  commands recently, so that we can throttle the new commands.
+
+        //  Get the CPU's tick counter. If 0, the counter is not available.
+        uint64_t tsc = zmq::clock_t::rdtsc ();
+
+        //  Optimised version of command processing - it doesn't have to check
+        //  for incoming commands each time. It does so only if certain time
+        //  elapsed since last command processing. Command delay varies
+        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
+        //  etc. The optimisation makes sense only on platforms where getting
+        //  a timestamp is a very cheap operation (tens of nanoseconds).
+        if (tsc && throttle_) {
+
+            //  Check whether TSC haven't jumped backwards (in case of migration
+            //  between CPU cores) and whether certain time have elapsed since
+            //  last command processing. If it didn't do nothing.
+            if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
+                return 0;
+            last_tsc = tsc;
+        }
+
+        //  Check whether there are any commands pending for this thread.
+        rc = mailbox.recv_ex (&cmd, 0, pfd_);
+    }
+
+    //  Process all available commands.
+    while (rc == 0) {
+        cmd.destination->process_command (cmd);
+        rc = mailbox.recv_ex (&cmd, 0, pfd_);
+    }
+
+    if (errno == EINTR)
+        return -1;
+
+    zmq_assert (errno == EAGAIN);
+
+    if (ctx_terminated) {
+        errno = ETERM;
+        return -1;
+    }
+
+    return 0;
+}
+
 void zmq::socket_base_t::process_stop ()
 {
     //  Here, someone have called zmq_term while the socket was still alive.
@@ -1124,6 +1214,16 @@
     check_destroy ();
 }
 
+void zmq::socket_base_t::in_event_ex (pollfd *pfd)
+{
+    //  This function is invoked only once the socket is running in the context
+    //  of the reaper thread. Process any commands from other threads/sockets
+    //  that may be available at the moment. Ultimately, the socket will
+    //  be destroyed.
+    process_commands (0, false);
+    check_destroy ();
+}
+
 void zmq::socket_base_t::out_event ()
 {
     zmq_assert (false);
diff -urwB libzmq-original/src/socket_base.hpp libzmq/src/socket_base.hpp
--- libzmq-original/src/socket_base.hpp	2014-11-25 11:01:35.722930332 +0100
+++ libzmq/src/socket_base.hpp	2014-11-25 11:03:41.048081973 +0100
@@ -75,6 +75,7 @@
         //  Interface for communication with the API layer.
         int setsockopt (int option_, const void *optval_, size_t optvallen_);
         int getsockopt (int option_, void *optval_, size_t *optvallen_);
+        int getsockopt_ex (int option_, void *optval_, size_t *optvallen_, pollfd *pfd_);
         int bind (const char *addr_);
         int connect (const char *addr_);
         int term_endpoint (const char *addr_);
@@ -94,6 +95,7 @@
         //  i_poll_events implementation. This interface is used when socket
         //  is handled by the poller in the reaper thread.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
@@ -219,6 +221,7 @@
         //  If throttle argument is true, commands are processed at most once
         //  in a predefined time period.
         int process_commands (int timeout_, bool throttle_);
+        int process_commands_ex (int timeout_, bool throttle_, pollfd *pfd_);
 
         //  Handlers for incoming commands.
         void process_stop ();
diff -urwB libzmq-original/src/stream_engine.cpp libzmq/src/stream_engine.cpp
--- libzmq-original/src/stream_engine.cpp	2014-11-25 11:01:35.723930318 +0100
+++ libzmq/src/stream_engine.cpp	2014-11-25 11:03:41.049082060 +0100
@@ -320,6 +320,11 @@
     session->flush ();
 }
 
+void zmq::stream_engine_t::in_event_ex (pollfd *pfd)
+{
+    zmq::stream_engine_t::in_event();
+}
+
 void zmq::stream_engine_t::out_event ()
 {
     zmq_assert (!io_error);
diff -urwB libzmq-original/src/stream_engine.hpp libzmq/src/stream_engine.hpp
--- libzmq-original/src/stream_engine.hpp	2014-11-25 11:01:35.724930303 +0100
+++ libzmq/src/stream_engine.hpp	2014-11-25 11:03:41.049082060 +0100
@@ -73,6 +73,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB libzmq-original/src/tcp_connecter.cpp libzmq/src/tcp_connecter.cpp
--- libzmq-original/src/tcp_connecter.cpp	2014-11-25 11:01:35.725930288 +0100
+++ libzmq/src/tcp_connecter.cpp	2014-11-25 11:03:41.049082060 +0100
@@ -108,6 +108,14 @@
     out_event ();
 }
 
+void zmq::tcp_connecter_t::in_event_ex (pollfd *pfd)
+{
+    //  We are not polling for incomming data, so we are actually called
+    //  because of error here. However, we can get error on out event as well
+    //  on some platforms, so we'll simply handle both events in the same way.
+    out_event ();
+}
+
 void zmq::tcp_connecter_t::out_event ()
 {
     rm_fd (handle);
diff -urwB libzmq-original/src/tcp_connecter.hpp libzmq/src/tcp_connecter.hpp
--- libzmq-original/src/tcp_connecter.hpp	2014-11-25 11:01:35.725930288 +0100
+++ libzmq/src/tcp_connecter.hpp	2014-11-25 11:03:41.050082146 +0100
@@ -55,6 +55,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB libzmq-original/src/tcp_listener.cpp libzmq/src/tcp_listener.cpp
--- libzmq-original/src/tcp_listener.cpp	2014-11-25 11:01:35.725930288 +0100
+++ libzmq/src/tcp_listener.cpp	2014-11-25 11:03:41.050082146 +0100
@@ -114,6 +114,11 @@
     socket->event_accepted (endpoint, fd);
 }
 
+void zmq::tcp_listener_t::in_event_ex (pollfd *pfd)
+{
+    zmq::tcp_listener_t::in_event ();
+}
+
 void zmq::tcp_listener_t::close ()
 {
     zmq_assert (s != retired_fd);
diff -urwB libzmq-original/src/tcp_listener.hpp libzmq/src/tcp_listener.hpp
--- libzmq-original/src/tcp_listener.hpp	2014-11-25 11:01:35.725930288 +0100
+++ libzmq/src/tcp_listener.hpp	2014-11-25 11:03:41.050082146 +0100
@@ -55,6 +55,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
 
         //  Close the listening socket.
         void close ();
diff -urwB libzmq-original/src/zmq.cpp libzmq/src/zmq.cpp
--- libzmq-original/src/zmq.cpp	2014-11-25 11:01:35.729930228 +0100
+++ libzmq/src/zmq.cpp	2014-11-25 13:41:42.364243253 +0100
@@ -263,6 +263,17 @@
     return result;
 }
 
+int zmq_getsockopt_ex (void *s_, int option_, void *optval_, size_t *optvallen_, pollfd *pfd_)
+{
+    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
+        errno = ENOTSOCK;
+        return -1;
+    }
+    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
+    int result = s->getsockopt_ex (option_, optval_, optvallen_, pfd_);
+    return result;
+}
+
 int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
 {
     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
@@ -761,10 +772,11 @@
             //  using the ZMQ_EVENTS socket option.
             if (items_ [i].socket) {
                 size_t zmq_events_size = sizeof (uint32_t);
-                uint32_t zmq_events;
-                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
-                    &zmq_events_size) == -1) {
-                    if (pollfds != spollfds)
+                uint32_t zmq_events = 0;
+
+		if (zmq_getsockopt_ex (items_ [i].socket, ZMQ_EVENTS,
+				       &zmq_events,
+				       &zmq_events_size, &pollfds [i]) == -1) {
                         free (pollfds);
                     return -1;
                 }
@@ -774,6 +786,9 @@
                 if ((items_ [i].events & ZMQ_POLLIN) &&
                       (zmq_events & ZMQ_POLLIN))
                     items_ [i].revents |= ZMQ_POLLIN;
+                if ((items_ [i].events & ZMQ_POLLPRI) &&
+                      (zmq_events & ZMQ_POLLPRI))
+                    items_ [i].revents |= ZMQ_POLLPRI;
             }
             //  Else, the poll item is a raw file descriptor, simply convert
             //  the events to zmq_pollitem_t-style format.


More information about the zeromq-dev mailing list