[zeromq-dev] zmq_poll performance question

Francis Le Bourse zno-reply-francis.lebourse at sfr-sh.fr
Fri Nov 28 11:35:21 CET 2014


On 11/24/2014 8:08 PM, Arnaud Kapp wrote:
>> Currently, the patch is written for 3.2.4. I'll wait to put it in libzmq master
The first patch for 3.2.4 had an issue in zmq_poll(), I had tried a 
little too aggressive optimization by bypassing the "first_pass" 
processing. It is fixed in the current one.
The patch for the current head is clean.
Cheers,
Francis

> Oh okay. This is the commit that added the flag:
> https://github.com/zeromq/libzmq/commit/779c37abc433cb6595ddeedaf86b280317656bdd
>
> libzmq was 4.1 at the time I believe.
>
> I'll probably look at it this week-end then :)
>
> On Mon, Nov 24, 2014 at 12:10 PM, Francis Le Bourse
> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>> Hi,
>> On 11/24/2014 11:35 AM, Arnaud Kapp wrote:
>>> Hello,
>>>
>>> I recently added support for POLLPRI flag.
>>> It looks like it's not handled in your patch
>> No, it isn't handled. In which version do you have added this flag ?
>> Currently, the patch is written for 3.2.4. I'll wait to put it in libzmq
>> master.
>>>    and that it needs custom
>>> support. Since there is no test related to this flags you wouldn't
>>> notice.
>>>
>>> I can give it a look if you want.
>> That would be nice.
>>
>> Cheers,
>> Francis
>>
>>
>>> On Sat, Nov 22, 2014 at 2:16 PM, Pieter Hintjens <ph at imatix.com> wrote:
>>>> I suggest you send the patch to libzmq master, and ensure all test
>>>> cases pass. Then we can get this into the next version.
>>>>
>>>> On Fri, Nov 21, 2014 at 2:50 PM, Francis Le Bourse
>>>> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>>>>> Hi,
>>>>>
>>>>> On 11/6/2014 3:18 PM, Pieter Hintjens wrote:
>>>>>> Oh, ok. Sounds like you have a good candidate for some before/after
>>>>>> measurement and optimization. Are you going to try to make a patch for
>>>>>> this?
>>>>> I have a patch candidate for this optimization, the performance
>>>>> improvement
>>>>> is very good and it doesn't seem to introduce any new instability.
>>>>> What is modified:
>>>>>       - zmq_poll(), there is only one poll() now,
>>>>>       - and epoll() from epoll.cpp
>>>>> Other calls to poll() and select() are left unmodified.
>>>>>
>>>>> I woulld be happy to have any feedback.
>>>>>
>>>>>
>>>>> Cheers,
>>>>> Francis
>>>>>>
>>>>>> On Thu, Nov 6, 2014 at 2:09 PM, Francis Le Bourse
>>>>>> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>>>>>>> On 11/6/2014 11:47 AM, Pieter Hintjens wrote:
>>>>>>>> A simple optimization is, when you are polling sockets for input, to
>>>>>>>> continue reading from an active socket using a non-blocking read. So
>>>>>>>> you process all waiting messages on a socket and then only switch
>>>>>>>> back
>>>>>>>> to poll when needed.
>>>>>>> Thank you for you quick reply.
>>>>>>>
>>>>>>> Yes, but the question was more about the zmq_poll() internals.
>>>>>>> For 600+ file descriptors, zmq_poll() calls poll() a huge number of
>>>>>>> times
>>>>>>> for only a few that will trigger a POLLIN and the relevant information
>>>>>>> is
>>>>>>> already known / present in the pollfds array. The performance hit is
>>>>>>> there.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Francis
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> On Thu, Nov 6, 2014 at 11:28 AM, Francis Le Bourse
>>>>>>>> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am looking at a performance issue in zmq, when the number of
>>>>>>>>> zsockets
>>>>>>>>> /
>>>>>>>>> file descriptors becomes large.
>>>>>>>>> The relevant calls are:
>>>>>>>>>         poll+0x57
>>>>>>>>>         zmq_poll+0x2e3
>>>>>>>>>         zloop_start+0x1e8
>>>>>>>>>         main+0xb40
>>>>>>>>>         __libc_start_main+0xfd
>>>>>>>>> immediately followed by a loop of
>>>>>>>>>         poll+0x57
>>>>>>>>>         zmq::signaler_t::wait(int)+0x33
>>>>>>>>>         zmq::mailbox_t::recv(zmq::command_t*, int)+0x78
>>>>>>>>>         zmq::socket_base_t::process_commands(int, bool)+0xbe
>>>>>>>>>         zmq::socket_base_t::getsockopt(int, void*, unsigned
>>>>>>>>> long*)+0x135
>>>>>>>>>         zmq_getsockopt+0x75
>>>>>>>>>         zmq_poll+0x3da
>>>>>>>>>         zloop_start+0x1e8
>>>>>>>>>         main+0xb40
>>>>>>>>>         __libc_start_main+0xfd
>>>>>>>>>
>>>>>>>>> The code in the loop is executed once per file descriptor in the
>>>>>>>>> initial
>>>>>>>>> pollarray, redoing a poll() system call each time.
>>>>>>>>> Is there a reason to proceed that way ?
>>>>>>>>> Would be possible to reuse the results of the first poll() in order
>>>>>>>>> to
>>>>>>>>> bypass the second set of system calls ?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Francis
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> _______________________________________________
>>>>>>>>> zeromq-dev mailing list
>>>>>>>>> zeromq-dev at lists.zeromq.org
>>>>>>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>>>>>>>
>>>> _______________________________________________
>>>> zeromq-dev mailing list
>>>> zeromq-dev at lists.zeromq.org
>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>
>>>
>>
>
>

-------------- next part --------------
diff -urwB zeromq-3.2.4-interm/src/ctx.cpp zeromq-3.2.4/src/ctx.cpp
--- zeromq-3.2.4-interm/src/ctx.cpp	2014-11-14 11:50:34.679645346 +0100
+++ zeromq-3.2.4/src/ctx.cpp	2014-11-10 14:46:33.092394320 +0100
@@ -29,6 +29,8 @@
 #include <new>
 #include <string.h>
 
+#include <poll.h>
+
 #include "ctx.hpp"
 #include "socket_base.hpp"
 #include "io_thread.hpp"
diff -urwB zeromq-3.2.4-interm/src/epoll.cpp zeromq-3.2.4/src/epoll.cpp
--- zeromq-3.2.4-interm/src/epoll.cpp	2012-11-13 13:39:41.000000000 +0100
+++ zeromq-3.2.4/src/epoll.cpp	2014-11-20 13:19:52.958137819 +0100
@@ -29,6 +29,8 @@
 #include <algorithm>
 #include <new>
 
+#include <poll.h>
+
 #include "epoll.hpp"
 #include "err.hpp"
 #include "config.hpp"
@@ -148,18 +150,38 @@
         for (int i = 0; i < n; i ++) {
             poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr);
 
+	    uint32_t events = ev_buf[i].events;
+	    ev_buf[i].events = 0;
+	    struct pollfd pfd;
+	    pfd.fd = pe->fd;
+	    pfd.events = 0;
+	    if(pe->ev.events & EPOLLIN)
+		pfd.events |= POLLIN;
+	    if(pe->ev.events & EPOLLOUT)
+		pfd.events |= POLLOUT;
+	    pfd.revents = 0;
+	    
+	    if(events & EPOLLIN)
+		pfd.revents |= POLLIN;
+	    if(events & EPOLLOUT)
+		pfd.revents |= POLLOUT;
+	    if(events & EPOLLERR)
+		pfd.revents |= POLLERR;
+	    if(events & EPOLLHUP)
+		pfd.revents |= POLLHUP;
+	    
             if (pe->fd == retired_fd)
                 continue;
-            if (ev_buf [i].events & (EPOLLERR | EPOLLHUP))
+            if (events & (EPOLLERR | EPOLLHUP))
                 pe->events->in_event ();
             if (pe->fd == retired_fd)
                continue;
-            if (ev_buf [i].events & EPOLLOUT)
+            if (events & EPOLLOUT)
                 pe->events->out_event ();
             if (pe->fd == retired_fd)
                 continue;
-            if (ev_buf [i].events & EPOLLIN)
-                pe->events->in_event ();
+            if (events & EPOLLIN)
+		pe->events->in_event_ex (&pfd);
         }
 
         //  Destroy retired event sources.
diff -urwB zeromq-3.2.4-interm/src/i_poll_events.hpp zeromq-3.2.4/src/i_poll_events.hpp
--- zeromq-3.2.4-interm/src/i_poll_events.hpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/i_poll_events.hpp	2014-11-13 11:45:22.509276876 +0100
@@ -22,6 +22,8 @@
 #ifndef __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
 #define __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
  
+#include <poll.h>
+
 namespace zmq
 {
  
@@ -34,6 +36,7 @@
  
         // Called by I/O thread when file descriptor is ready for reading.
         virtual void in_event () = 0;
+        virtual void in_event_ex (pollfd *) = 0;
  
         // Called by I/O thread when file descriptor is ready for writing.
         virtual void out_event () = 0;
diff -urwB zeromq-3.2.4-interm/src/io_object.cpp zeromq-3.2.4/src/io_object.cpp
--- zeromq-3.2.4-interm/src/io_object.cpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/io_object.cpp	2014-11-13 12:12:41.171976993 +0100
@@ -97,6 +97,11 @@
     zmq_assert (false);
 }
 
+void zmq::io_object_t::in_event_ex (pollfd *pfd)
+{
+    zmq_assert (false);
+}
+
 void zmq::io_object_t::out_event ()
 {
     zmq_assert (false);
diff -urwB zeromq-3.2.4-interm/src/io_object.hpp zeromq-3.2.4/src/io_object.hpp
--- zeromq-3.2.4-interm/src/io_object.hpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/io_object.hpp	2014-11-13 11:42:11.089118102 +0100
@@ -65,6 +65,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB zeromq-3.2.4-interm/src/io_thread.cpp zeromq-3.2.4/src/io_thread.cpp
--- zeromq-3.2.4-interm/src/io_thread.cpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/io_thread.cpp	2014-11-14 11:57:40.836245798 +0100
@@ -79,6 +79,23 @@
     errno_assert (rc != 0 && errno == EAGAIN);
 }
 
+void zmq::io_thread_t::in_event_ex (pollfd *pfd)
+{
+    //  TODO: Do we want to limit number of commands I/O thread can
+    //  process in a single go?
+
+    command_t cmd;
+    int rc = mailbox.recv_ex (&cmd, 0, pfd); // will clear POLLIN
+    
+    while (rc == 0 || errno == EINTR) {
+        if (rc == 0)
+            cmd.destination->process_command (cmd);
+        rc = mailbox.recv_ex (&cmd, 0, pfd);
+    }
+
+    errno_assert (rc != 0 && errno == EAGAIN);
+}
+
 void zmq::io_thread_t::out_event ()
 {
     //  We are never polling for POLLOUT here. This function is never called.
diff -urwB zeromq-3.2.4-interm/src/io_thread.hpp zeromq-3.2.4/src/io_thread.hpp
--- zeromq-3.2.4-interm/src/io_thread.hpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/io_thread.hpp	2014-11-13 13:03:40.680393409 +0100
@@ -59,6 +59,7 @@
 
         //  i_poll_events implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB zeromq-3.2.4-interm/src/ipc_connecter.cpp zeromq-3.2.4/src/ipc_connecter.cpp
--- zeromq-3.2.4-interm/src/ipc_connecter.cpp	2012-11-23 08:35:33.000000000 +0100
+++ zeromq-3.2.4/src/ipc_connecter.cpp	2014-11-13 12:57:10.186264608 +0100
@@ -100,6 +100,14 @@
     out_event ();
 }
 
+void zmq::ipc_connecter_t::in_event_ex (pollfd *pfd)
+{
+    //  We are not polling for incomming data, so we are actually called
+    //  because of error here. However, we can get error on out event as well
+    //  on some platforms, so we'll simply handle both events in the same way.
+    out_event ();
+}
+
 void zmq::ipc_connecter_t::out_event ()
 {
     fd_t fd = connect ();
diff -urwB zeromq-3.2.4-interm/src/ipc_connecter.hpp zeromq-3.2.4/src/ipc_connecter.hpp
--- zeromq-3.2.4-interm/src/ipc_connecter.hpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/ipc_connecter.hpp	2014-11-13 12:55:54.156407616 +0100
@@ -59,6 +59,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB zeromq-3.2.4-interm/src/ipc_listener.cpp zeromq-3.2.4/src/ipc_listener.cpp
--- zeromq-3.2.4-interm/src/ipc_listener.cpp	2012-11-23 08:35:52.000000000 +0100
+++ zeromq-3.2.4/src/ipc_listener.cpp	2014-11-13 12:59:05.723526546 +0100
@@ -99,6 +99,11 @@
     socket->event_accepted (endpoint, fd);
 }
 
+void zmq::ipc_listener_t::in_event_ex (pollfd *pfd)
+{
+    zmq::ipc_listener_t::in_event();
+}
+
 int zmq::ipc_listener_t::get_address (std::string &addr_)
 {
     struct sockaddr_storage ss;
diff -urwB zeromq-3.2.4-interm/src/ipc_listener.hpp zeromq-3.2.4/src/ipc_listener.hpp
--- zeromq-3.2.4-interm/src/ipc_listener.hpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/ipc_listener.hpp	2014-11-13 12:57:44.364745769 +0100
@@ -60,6 +60,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
 
         //  Close the listening socket.
         int close ();
diff -urwB zeromq-3.2.4-interm/src/mailbox.cpp zeromq-3.2.4/src/mailbox.cpp
--- zeromq-3.2.4-interm/src/mailbox.cpp	2013-05-01 05:30:36.000000000 +0200
+++ zeromq-3.2.4/src/mailbox.cpp	2014-11-13 11:41:41.849549736 +0100
@@ -19,6 +19,10 @@
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
+#if defined ZMQ_POLL_BASED_ON_POLL
+#include <poll.h>
+#endif
+
 #include "mailbox.hpp"
 #include "err.hpp"
 
@@ -85,3 +89,31 @@
     return 0;
 }
 
+int zmq::mailbox_t::recv_ex (command_t *cmd_, int timeout_, pollfd *pfd_)
+{
+    //  Try to get the command straight away.
+    if (active) {
+        bool ok = cpipe.read (cmd_);
+        if (ok)
+            return 0;
+
+        //  If there are no more commands available, switch into passive state.
+        active = false;
+        signaler.recv ();
+    }
+
+    //  Wait for signal from the command sender.
+    int rc = signaler.wait_ex (timeout_, pfd_);
+    if (rc != 0 && (errno == EAGAIN || errno == EINTR))
+        return -1;
+
+    //  We've got the signal. Now we can switch into active state.
+    active = true;
+
+    //  Get a command.
+    errno_assert (rc == 0);
+    bool ok = cpipe.read (cmd_);
+    zmq_assert (ok);
+    return 0;
+}
+
diff -urwB zeromq-3.2.4-interm/src/mailbox.hpp zeromq-3.2.4/src/mailbox.hpp
--- zeromq-3.2.4-interm/src/mailbox.hpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/mailbox.hpp	2014-11-13 11:41:16.399927840 +0100
@@ -24,6 +24,8 @@
 
 #include <stddef.h>
 
+#include <poll.h>
+
 #include "platform.hpp"
 #include "signaler.hpp"
 #include "fd.hpp"
@@ -45,6 +47,7 @@
         fd_t get_fd ();
         void send (const command_t &cmd_);
         int recv (command_t *cmd_, int timeout_);
+        int recv_ex (command_t *cmd_, int timeout_, pollfd *pfd_);
         
     private:
 
diff -urwB zeromq-3.2.4-interm/src/pgm_receiver.cpp zeromq-3.2.4/src/pgm_receiver.cpp
--- zeromq-3.2.4-interm/src/pgm_receiver.cpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/pgm_receiver.cpp	2014-11-13 13:01:34.993282854 +0100
@@ -262,6 +262,11 @@
     session->flush ();
 }
 
+void zmq::pgm_receiver_t::in_event_ex (pollfd *pfd)
+{
+    zmq::pgm_receiver_t::in_event();
+}
+
 void zmq::pgm_receiver_t::timer_event (int token)
 {
     zmq_assert (token == rx_timer_id);
diff -urwB zeromq-3.2.4-interm/src/pgm_receiver.hpp zeromq-3.2.4/src/pgm_receiver.hpp
--- zeromq-3.2.4-interm/src/pgm_receiver.hpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/pgm_receiver.hpp	2014-11-13 12:59:45.474928619 +0100
@@ -65,6 +65,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void timer_event (int token);
 
     private:
diff -urwB zeromq-3.2.4-interm/src/pgm_sender.cpp zeromq-3.2.4/src/pgm_sender.cpp
--- zeromq-3.2.4-interm/src/pgm_sender.cpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/pgm_sender.cpp	2014-11-13 13:03:08.792872122 +0100
@@ -153,6 +153,11 @@
     }
 }
 
+void zmq::pgm_sender_t::in_event_ex (pollfd *pfd)
+{
+    zmq::pgm_sender_t::in_event ();
+}
+
 void zmq::pgm_sender_t::out_event ()
 {
     //  POLLOUT event from send socket. If write buffer is empty, 
diff -urwB zeromq-3.2.4-interm/src/pgm_sender.hpp zeromq-3.2.4/src/pgm_sender.hpp
--- zeromq-3.2.4-interm/src/pgm_sender.hpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/pgm_sender.hpp	2014-11-13 13:02:07.983786701 +0100
@@ -63,6 +63,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int token);
 
diff -urwB zeromq-3.2.4-interm/src/reaper.cpp zeromq-3.2.4/src/reaper.cpp
--- zeromq-3.2.4-interm/src/reaper.cpp	2014-03-11 12:09:15.000000000 +0100
+++ zeromq-3.2.4/src/reaper.cpp	2014-11-13 11:44:15.547267347 +0100
@@ -73,6 +73,24 @@
     }
 }
 
+void zmq::reaper_t::in_event_ex (pollfd *pfd)
+{
+    while (true) {
+
+        //  Get the next command. If there is none, exit.
+        command_t cmd;
+        int rc = mailbox.recv_ex (&cmd, 0, pfd);
+        if (rc != 0 && errno == EINTR)
+            continue;
+        if (rc != 0 && errno == EAGAIN)
+            break;
+        errno_assert (rc == 0);
+
+        //  Process the command.
+        cmd.destination->process_command (cmd);
+    }
+}
+
 void zmq::reaper_t::out_event ()
 {
     zmq_assert (false);
diff -urwB zeromq-3.2.4-interm/src/reaper.hpp zeromq-3.2.4/src/reaper.hpp
--- zeromq-3.2.4-interm/src/reaper.hpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/reaper.hpp	2014-11-13 11:44:26.167111581 +0100
@@ -46,6 +46,7 @@
 
         //  i_poll_events implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB zeromq-3.2.4-interm/src/signaler.cpp zeromq-3.2.4/src/signaler.cpp
--- zeromq-3.2.4-interm/src/signaler.cpp	2014-03-12 15:51:34.000000000 +0100
+++ zeromq-3.2.4/src/signaler.cpp	2014-11-10 15:54:40.453118275 +0100
@@ -193,6 +193,76 @@
 #endif
 }
 
+int zmq::signaler_t::wait_ex (int timeout_, pollfd *pfd_)
+{
+#ifdef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+
+    struct pollfd pfd;
+    pfd.fd = r;
+    pfd.events = POLLIN;
+    if(timeout_)
+    {
+	int rc = poll (&pfd, 1, timeout_);
+	if (unlikely (rc < 0)) {
+	    errno_assert (errno == EINTR);
+	    return -1;
+	}
+	else if (unlikely (rc == 0)) {
+	    errno = EAGAIN;
+	    return -1;
+	}
+	zmq_assert (rc == 1);
+	zmq_assert (pfd.revents & POLLIN);
+	return 0;
+    }
+    else
+    {
+	if((pfd_->revents & POLLIN) == 0)
+	{
+	    // simulate EAGAIN
+	    errno = EAGAIN;
+	    return -1;
+	}
+	zmq_assert (pfd_->revents & POLLIN);
+	// MUST clear POLLIN
+	pfd_->revents &= ~POLLIN;
+	return 0;
+    }
+    
+#elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+
+    fd_set fds;
+    FD_ZERO (&fds);
+    FD_SET (r, &fds);
+    struct timeval timeout;
+    if (timeout_ >= 0) {
+        timeout.tv_sec = timeout_ / 1000;
+        timeout.tv_usec = timeout_ % 1000 * 1000;
+    }
+#ifdef ZMQ_HAVE_WINDOWS
+    int rc = select (0, &fds, NULL, NULL,
+        timeout_ >= 0 ? &timeout : NULL);
+    wsa_assert (rc != SOCKET_ERROR);
+#else
+    int rc = select (r + 1, &fds, NULL, NULL,
+        timeout_ >= 0 ? &timeout : NULL);
+    if (unlikely (rc < 0)) {
+        errno_assert (errno == EINTR);
+        return -1;
+    }
+#endif
+    if (unlikely (rc == 0)) {
+        errno = EAGAIN;
+        return -1;
+    }
+    zmq_assert (rc == 1);
+    return 0;
+
+#else
+#error
+#endif
+}
+
 void zmq::signaler_t::recv ()
 {
     //  Attempt to read a signal.
diff -urwB zeromq-3.2.4-interm/src/signaler.hpp zeromq-3.2.4/src/signaler.hpp
--- zeromq-3.2.4-interm/src/signaler.hpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/signaler.hpp	2014-11-10 14:46:59.063018398 +0100
@@ -21,6 +21,8 @@
 #ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
 #define __ZMQ_SIGNALER_HPP_INCLUDED__
 
+#include <poll.h>
+
 #include "fd.hpp"
 
 namespace zmq
@@ -41,6 +43,7 @@
         fd_t get_fd ();
         void send ();
         int wait (int timeout_);
+        int wait_ex (int timeout_, pollfd *pfd_);
         void recv ();
         
     private:
diff -urwB zeromq-3.2.4-interm/src/socket_base.cpp zeromq-3.2.4/src/socket_base.cpp
--- zeromq-3.2.4-interm/src/socket_base.cpp	2014-03-11 12:08:49.000000000 +0100
+++ zeromq-3.2.4/src/socket_base.cpp	2014-11-14 12:03:58.872633507 +0100
@@ -39,6 +39,8 @@
 #include <unistd.h>
 #endif
 
+#include <poll.h>
+
 #include "socket_base.hpp"
 #include "tcp_listener.hpp"
 #include "ipc_listener.hpp"
@@ -303,6 +305,38 @@
     return options.getsockopt (option_, optval_, optvallen_);
 }
 
+// only for ZMQ_EVENTS
+int zmq::socket_base_t::getsockopt_ex (int option_, void *optval_,
+				       size_t *optvallen_, pollfd *pfd_)
+{
+    if (unlikely (ctx_terminated)) {
+        errno = ETERM;
+        return -1;
+    }
+
+    assert (option_ == ZMQ_EVENTS);
+
+    if (option_ == ZMQ_EVENTS) {
+        if (*optvallen_ < sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        int rc = process_commands_ex (0, false, pfd_);
+        if (rc != 0 && (errno == EINTR || errno == ETERM))
+            return -1;
+        errno_assert (rc == 0);
+        *((int*) optval_) = 0;
+        if (has_out ())
+            *((int*) optval_) |= ZMQ_POLLOUT;
+        if (has_in ())
+            *((int*) optval_) |= ZMQ_POLLIN;
+        *optvallen_ = sizeof (int);
+        return 0;
+    }
+
+    return 0;
+}
+
 int zmq::socket_base_t::bind (const char *addr_)
 {
     if (unlikely (ctx_terminated)) {
@@ -872,6 +906,62 @@
     return 0;
 }
 
+int zmq::socket_base_t::process_commands_ex (int timeout_, bool throttle_, pollfd *pfd_)
+{
+    int rc;
+    command_t cmd;
+    if (timeout_ != 0) {
+
+        //  If we are asked to wait, simply ask mailbox to wait.
+        rc = mailbox.recv_ex (&cmd, timeout_, pfd_);
+    }
+    else {
+
+        //  If we are asked not to wait, check whether we haven't processed
+        //  commands recently, so that we can throttle the new commands.
+
+        //  Get the CPU's tick counter. If 0, the counter is not available.
+        uint64_t tsc = zmq::clock_t::rdtsc ();
+
+        //  Optimised version of command processing - it doesn't have to check
+        //  for incoming commands each time. It does so only if certain time
+        //  elapsed since last command processing. Command delay varies
+        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
+        //  etc. The optimisation makes sense only on platforms where getting
+        //  a timestamp is a very cheap operation (tens of nanoseconds).
+        if (tsc && throttle_) {
+
+            //  Check whether TSC haven't jumped backwards (in case of migration
+            //  between CPU cores) and whether certain time have elapsed since
+            //  last command processing. If it didn't do nothing.
+            if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
+                return 0;
+            last_tsc = tsc;
+        }
+
+        //  Check whether there are any commands pending for this thread.
+        rc = mailbox.recv_ex (&cmd, 0, pfd_);
+    }
+
+    //  Process all available commands.
+    while (rc == 0) {
+        cmd.destination->process_command (cmd);
+        rc = mailbox.recv_ex (&cmd, 0, pfd_);
+    }
+
+    if (errno == EINTR)
+        return -1;
+
+    zmq_assert (errno == EAGAIN);
+
+    if (ctx_terminated) {
+        errno = ETERM;
+        return -1;
+    }
+
+    return 0;
+}
+
 void zmq::socket_base_t::process_stop ()
 {
     //  Here, someone have called zmq_term while the socket was still alive.
@@ -960,6 +1050,16 @@
     check_destroy ();
 }
 
+void zmq::socket_base_t::in_event_ex (pollfd *pfd)
+{
+    //  This function is invoked only once the socket is running in the context
+    //  of the reaper thread. Process any commands from other threads/sockets
+    //  that may be available at the moment. Ultimately, the socket will
+    //  be destroyed.
+    process_commands (0, false);
+    check_destroy ();
+}
+
 void zmq::socket_base_t::out_event ()
 {
     zmq_assert (false);
diff -urwB zeromq-3.2.4-interm/src/socket_base.hpp zeromq-3.2.4/src/socket_base.hpp
--- zeromq-3.2.4-interm/src/socket_base.hpp	2014-04-09 11:14:26.000000000 +0200
+++ zeromq-3.2.4/src/socket_base.hpp	2014-11-13 11:42:15.699047646 +0100
@@ -77,6 +77,7 @@
         //  Interface for communication with the API layer.
         int setsockopt (int option_, const void *optval_, size_t optvallen_);
         int getsockopt (int option_, void *optval_, size_t *optvallen_);
+        int getsockopt_ex (int option_, void *optval_, size_t *optvallen_, pollfd *pfd_);
         int bind (const char *addr_);
         int connect (const char *addr_);
         int term_endpoint (const char *addr_);
@@ -96,6 +97,7 @@
         //  i_poll_events implementation. This interface is used when socket
         //  is handled by the poller in the reaper thread.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
@@ -210,6 +212,7 @@
         //  If throttle argument is true, commands are processed at most once
         //  in a predefined time period.
         int process_commands (int timeout_, bool throttle_);
+        int process_commands_ex (int timeout_, bool throttle_, pollfd *pfd_);
 
         //  Handlers for incoming commands.
         void process_stop ();
diff -urwB zeromq-3.2.4-interm/src/stream_engine.cpp zeromq-3.2.4/src/stream_engine.cpp
--- zeromq-3.2.4-interm/src/stream_engine.cpp	2014-04-25 11:53:51.000000000 +0200
+++ zeromq-3.2.4/src/stream_engine.cpp	2014-11-13 15:29:55.984958203 +0100
@@ -244,6 +245,11 @@
     }
 }
 
+void zmq::stream_engine_t::in_event_ex (pollfd *pfd)
+{
+    zmq::stream_engine_t::in_event();
+}
+
 void zmq::stream_engine_t::out_event ()
 {
     //  If write buffer is empty, try to read new data from the encoder.
diff -urwB zeromq-3.2.4-interm/src/stream_engine.hpp zeromq-3.2.4/src/stream_engine.hpp
--- zeromq-3.2.4-interm/src/stream_engine.hpp	2013-02-01 10:03:55.000000000 +0100
+++ zeromq-3.2.4/src/stream_engine.hpp	2014-11-13 15:27:26.236236302 +0100
@@ -62,6 +62,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
 
     private:
diff -urwB zeromq-3.2.4-interm/src/tcp_connecter.cpp zeromq-3.2.4/src/tcp_connecter.cpp
--- zeromq-3.2.4-interm/src/tcp_connecter.cpp	2013-05-02 20:41:25.000000000 +0200
+++ zeromq-3.2.4/src/tcp_connecter.cpp	2014-11-13 13:05:15.161971274 +0100
@@ -110,6 +110,14 @@
     out_event ();
 }
 
+void zmq::tcp_connecter_t::in_event_ex (pollfd *pfd)
+{
+    //  We are not polling for incomming data, so we are actually called
+    //  because of error here. However, we can get error on out event as well
+    //  on some platforms, so we'll simply handle both events in the same way.
+    out_event ();
+}
+
 void zmq::tcp_connecter_t::out_event ()
 {
     fd_t fd = connect ();
diff -urwB zeromq-3.2.4-interm/src/tcp_connecter.hpp zeromq-3.2.4/src/tcp_connecter.hpp
--- zeromq-3.2.4-interm/src/tcp_connecter.hpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/tcp_connecter.hpp	2014-11-13 13:04:30.041651724 +0100
@@ -57,6 +57,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB zeromq-3.2.4-interm/src/tcp_listener.cpp zeromq-3.2.4/src/tcp_listener.cpp
--- zeromq-3.2.4-interm/src/tcp_listener.cpp	2012-11-23 08:38:26.000000000 +0100
+++ zeromq-3.2.4/src/tcp_listener.cpp	2014-11-13 13:07:09.469256737 +0100
@@ -111,6 +111,11 @@
     socket->event_accepted (endpoint, fd);
 }
 
+void zmq::tcp_listener_t::in_event_ex (pollfd *pfd)
+{
+    zmq::tcp_listener_t::in_event ();
+}
+
 void zmq::tcp_listener_t::close ()
 {
     zmq_assert (s != retired_fd);
diff -urwB zeromq-3.2.4-interm/src/tcp_listener.hpp zeromq-3.2.4/src/tcp_listener.hpp
--- zeromq-3.2.4-interm/src/tcp_listener.hpp	2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/tcp_listener.hpp	2014-11-13 13:05:37.060644451 +0100
@@ -57,6 +57,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
 
         //  Close the listening socket.
         void close ();
diff -urwB zeromq-3.2.4-interm/src/zmq.cpp zeromq-3.2.4/src/zmq.cpp
--- zeromq-3.2.4-interm/src/zmq.cpp	2014-04-09 14:39:35.000000000 +0200
+++ zeromq-3.2.4/src/zmq.cpp	2014-11-20 13:21:23.465769599 +0100
@@ -264,6 +265,17 @@
     return result;
 }
 
+int zmq_getsockopt_ex (void *s_, int option_, void *optval_, size_t *optvallen_, pollfd *pfd_)
+{
+    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
+        errno = ENOTSOCK;
+        return -1;
+    }
+    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
+    int result = s->getsockopt_ex (option_, optval_, optvallen_, pfd_);
+    return result;
+}
+
 int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
 {
     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
@@ -703,12 +720,15 @@
             //  using the ZMQ_EVENTS socket option.
             if (items_ [i].socket) {
                 size_t zmq_events_size = sizeof (uint32_t);
-                uint32_t zmq_events;
-                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
-                    &zmq_events_size) == -1) {
+                uint32_t zmq_events = 0;
+
+		if (zmq_getsockopt_ex (items_ [i].socket, ZMQ_EVENTS,
+				       &zmq_events,
+				       &zmq_events_size, &pollfds [i]) == -1) {
                     free (pollfds);
                     return -1;
                 }
+		
                 if ((items_ [i].events & ZMQ_POLLOUT) &&
                       (zmq_events & ZMQ_POLLOUT))
                     items_ [i].revents |= ZMQ_POLLOUT;


More information about the zeromq-dev mailing list