[zeromq-dev] zmq_poll performance question
Francis Le Bourse
zno-reply-francis.lebourse at sfr-sh.fr
Fri Nov 28 11:35:21 CET 2014
On 11/24/2014 8:08 PM, Arnaud Kapp wrote:
>> Currently, the patch is written for 3.2.4. I'll wait to put it in libzmq master
The first patch for 3.2.4 had an issue in zmq_poll(), I had tried a
little too aggressive optimization by bypassing the "first_pass"
processing. It is fixed in the current one.
The patch for the current head is clean.
Cheers,
Francis
> Oh okay. This is the commit that added the flag:
> https://github.com/zeromq/libzmq/commit/779c37abc433cb6595ddeedaf86b280317656bdd
>
> libzmq was 4.1 at the time I believe.
>
> I'll probably look at it this week-end then :)
>
> On Mon, Nov 24, 2014 at 12:10 PM, Francis Le Bourse
> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>> Hi,
>> On 11/24/2014 11:35 AM, Arnaud Kapp wrote:
>>> Hello,
>>>
>>> I recently added support for POLLPRI flag.
>>> It looks like it's not handled in your patch
>> No, it isn't handled. In which version do you have added this flag ?
>> Currently, the patch is written for 3.2.4. I'll wait to put it in libzmq
>> master.
>>> and that it needs custom
>>> support. Since there is no test related to this flags you wouldn't
>>> notice.
>>>
>>> I can give it a look if you want.
>> That would be nice.
>>
>> Cheers,
>> Francis
>>
>>
>>> On Sat, Nov 22, 2014 at 2:16 PM, Pieter Hintjens <ph at imatix.com> wrote:
>>>> I suggest you send the patch to libzmq master, and ensure all test
>>>> cases pass. Then we can get this into the next version.
>>>>
>>>> On Fri, Nov 21, 2014 at 2:50 PM, Francis Le Bourse
>>>> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>>>>> Hi,
>>>>>
>>>>> On 11/6/2014 3:18 PM, Pieter Hintjens wrote:
>>>>>> Oh, ok. Sounds like you have a good candidate for some before/after
>>>>>> measurement and optimization. Are you going to try to make a patch for
>>>>>> this?
>>>>> I have a patch candidate for this optimization, the performance
>>>>> improvement
>>>>> is very good and it doesn't seem to introduce any new instability.
>>>>> What is modified:
>>>>> - zmq_poll(), there is only one poll() now,
>>>>> - and epoll() from epoll.cpp
>>>>> Other calls to poll() and select() are left unmodified.
>>>>>
>>>>> I woulld be happy to have any feedback.
>>>>>
>>>>>
>>>>> Cheers,
>>>>> Francis
>>>>>>
>>>>>> On Thu, Nov 6, 2014 at 2:09 PM, Francis Le Bourse
>>>>>> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>>>>>>> On 11/6/2014 11:47 AM, Pieter Hintjens wrote:
>>>>>>>> A simple optimization is, when you are polling sockets for input, to
>>>>>>>> continue reading from an active socket using a non-blocking read. So
>>>>>>>> you process all waiting messages on a socket and then only switch
>>>>>>>> back
>>>>>>>> to poll when needed.
>>>>>>> Thank you for you quick reply.
>>>>>>>
>>>>>>> Yes, but the question was more about the zmq_poll() internals.
>>>>>>> For 600+ file descriptors, zmq_poll() calls poll() a huge number of
>>>>>>> times
>>>>>>> for only a few that will trigger a POLLIN and the relevant information
>>>>>>> is
>>>>>>> already known / present in the pollfds array. The performance hit is
>>>>>>> there.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Francis
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> On Thu, Nov 6, 2014 at 11:28 AM, Francis Le Bourse
>>>>>>>> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am looking at a performance issue in zmq, when the number of
>>>>>>>>> zsockets
>>>>>>>>> /
>>>>>>>>> file descriptors becomes large.
>>>>>>>>> The relevant calls are:
>>>>>>>>> poll+0x57
>>>>>>>>> zmq_poll+0x2e3
>>>>>>>>> zloop_start+0x1e8
>>>>>>>>> main+0xb40
>>>>>>>>> __libc_start_main+0xfd
>>>>>>>>> immediately followed by a loop of
>>>>>>>>> poll+0x57
>>>>>>>>> zmq::signaler_t::wait(int)+0x33
>>>>>>>>> zmq::mailbox_t::recv(zmq::command_t*, int)+0x78
>>>>>>>>> zmq::socket_base_t::process_commands(int, bool)+0xbe
>>>>>>>>> zmq::socket_base_t::getsockopt(int, void*, unsigned
>>>>>>>>> long*)+0x135
>>>>>>>>> zmq_getsockopt+0x75
>>>>>>>>> zmq_poll+0x3da
>>>>>>>>> zloop_start+0x1e8
>>>>>>>>> main+0xb40
>>>>>>>>> __libc_start_main+0xfd
>>>>>>>>>
>>>>>>>>> The code in the loop is executed once per file descriptor in the
>>>>>>>>> initial
>>>>>>>>> pollarray, redoing a poll() system call each time.
>>>>>>>>> Is there a reason to proceed that way ?
>>>>>>>>> Would be possible to reuse the results of the first poll() in order
>>>>>>>>> to
>>>>>>>>> bypass the second set of system calls ?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Francis
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> _______________________________________________
>>>>>>>>> zeromq-dev mailing list
>>>>>>>>> zeromq-dev at lists.zeromq.org
>>>>>>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>>>>>>>
>>>> _______________________________________________
>>>> zeromq-dev mailing list
>>>> zeromq-dev at lists.zeromq.org
>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>
>>>
>>
>
>
-------------- next part --------------
diff -urwB zeromq-3.2.4-interm/src/ctx.cpp zeromq-3.2.4/src/ctx.cpp
--- zeromq-3.2.4-interm/src/ctx.cpp 2014-11-14 11:50:34.679645346 +0100
+++ zeromq-3.2.4/src/ctx.cpp 2014-11-10 14:46:33.092394320 +0100
@@ -29,6 +29,8 @@
#include <new>
#include <string.h>
+#include <poll.h>
+
#include "ctx.hpp"
#include "socket_base.hpp"
#include "io_thread.hpp"
diff -urwB zeromq-3.2.4-interm/src/epoll.cpp zeromq-3.2.4/src/epoll.cpp
--- zeromq-3.2.4-interm/src/epoll.cpp 2012-11-13 13:39:41.000000000 +0100
+++ zeromq-3.2.4/src/epoll.cpp 2014-11-20 13:19:52.958137819 +0100
@@ -29,6 +29,8 @@
#include <algorithm>
#include <new>
+#include <poll.h>
+
#include "epoll.hpp"
#include "err.hpp"
#include "config.hpp"
@@ -148,18 +150,38 @@
for (int i = 0; i < n; i ++) {
poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr);
+ uint32_t events = ev_buf[i].events;
+ ev_buf[i].events = 0;
+ struct pollfd pfd;
+ pfd.fd = pe->fd;
+ pfd.events = 0;
+ if(pe->ev.events & EPOLLIN)
+ pfd.events |= POLLIN;
+ if(pe->ev.events & EPOLLOUT)
+ pfd.events |= POLLOUT;
+ pfd.revents = 0;
+
+ if(events & EPOLLIN)
+ pfd.revents |= POLLIN;
+ if(events & EPOLLOUT)
+ pfd.revents |= POLLOUT;
+ if(events & EPOLLERR)
+ pfd.revents |= POLLERR;
+ if(events & EPOLLHUP)
+ pfd.revents |= POLLHUP;
+
if (pe->fd == retired_fd)
continue;
- if (ev_buf [i].events & (EPOLLERR | EPOLLHUP))
+ if (events & (EPOLLERR | EPOLLHUP))
pe->events->in_event ();
if (pe->fd == retired_fd)
continue;
- if (ev_buf [i].events & EPOLLOUT)
+ if (events & EPOLLOUT)
pe->events->out_event ();
if (pe->fd == retired_fd)
continue;
- if (ev_buf [i].events & EPOLLIN)
- pe->events->in_event ();
+ if (events & EPOLLIN)
+ pe->events->in_event_ex (&pfd);
}
// Destroy retired event sources.
diff -urwB zeromq-3.2.4-interm/src/i_poll_events.hpp zeromq-3.2.4/src/i_poll_events.hpp
--- zeromq-3.2.4-interm/src/i_poll_events.hpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/i_poll_events.hpp 2014-11-13 11:45:22.509276876 +0100
@@ -22,6 +22,8 @@
#ifndef __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
#define __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
+#include <poll.h>
+
namespace zmq
{
@@ -34,6 +36,7 @@
// Called by I/O thread when file descriptor is ready for reading.
virtual void in_event () = 0;
+ virtual void in_event_ex (pollfd *) = 0;
// Called by I/O thread when file descriptor is ready for writing.
virtual void out_event () = 0;
diff -urwB zeromq-3.2.4-interm/src/io_object.cpp zeromq-3.2.4/src/io_object.cpp
--- zeromq-3.2.4-interm/src/io_object.cpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/io_object.cpp 2014-11-13 12:12:41.171976993 +0100
@@ -97,6 +97,11 @@
zmq_assert (false);
}
+void zmq::io_object_t::in_event_ex (pollfd *pfd)
+{
+ zmq_assert (false);
+}
+
void zmq::io_object_t::out_event ()
{
zmq_assert (false);
diff -urwB zeromq-3.2.4-interm/src/io_object.hpp zeromq-3.2.4/src/io_object.hpp
--- zeromq-3.2.4-interm/src/io_object.hpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/io_object.hpp 2014-11-13 11:42:11.089118102 +0100
@@ -65,6 +65,7 @@
// i_poll_events interface implementation.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int id_);
diff -urwB zeromq-3.2.4-interm/src/io_thread.cpp zeromq-3.2.4/src/io_thread.cpp
--- zeromq-3.2.4-interm/src/io_thread.cpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/io_thread.cpp 2014-11-14 11:57:40.836245798 +0100
@@ -79,6 +79,23 @@
errno_assert (rc != 0 && errno == EAGAIN);
}
+void zmq::io_thread_t::in_event_ex (pollfd *pfd)
+{
+ // TODO: Do we want to limit number of commands I/O thread can
+ // process in a single go?
+
+ command_t cmd;
+ int rc = mailbox.recv_ex (&cmd, 0, pfd); // will clear POLLIN
+
+ while (rc == 0 || errno == EINTR) {
+ if (rc == 0)
+ cmd.destination->process_command (cmd);
+ rc = mailbox.recv_ex (&cmd, 0, pfd);
+ }
+
+ errno_assert (rc != 0 && errno == EAGAIN);
+}
+
void zmq::io_thread_t::out_event ()
{
// We are never polling for POLLOUT here. This function is never called.
diff -urwB zeromq-3.2.4-interm/src/io_thread.hpp zeromq-3.2.4/src/io_thread.hpp
--- zeromq-3.2.4-interm/src/io_thread.hpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/io_thread.hpp 2014-11-13 13:03:40.680393409 +0100
@@ -59,6 +59,7 @@
// i_poll_events implementation.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int id_);
diff -urwB zeromq-3.2.4-interm/src/ipc_connecter.cpp zeromq-3.2.4/src/ipc_connecter.cpp
--- zeromq-3.2.4-interm/src/ipc_connecter.cpp 2012-11-23 08:35:33.000000000 +0100
+++ zeromq-3.2.4/src/ipc_connecter.cpp 2014-11-13 12:57:10.186264608 +0100
@@ -100,6 +100,14 @@
out_event ();
}
+void zmq::ipc_connecter_t::in_event_ex (pollfd *pfd)
+{
+ // We are not polling for incomming data, so we are actually called
+ // because of error here. However, we can get error on out event as well
+ // on some platforms, so we'll simply handle both events in the same way.
+ out_event ();
+}
+
void zmq::ipc_connecter_t::out_event ()
{
fd_t fd = connect ();
diff -urwB zeromq-3.2.4-interm/src/ipc_connecter.hpp zeromq-3.2.4/src/ipc_connecter.hpp
--- zeromq-3.2.4-interm/src/ipc_connecter.hpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/ipc_connecter.hpp 2014-11-13 12:55:54.156407616 +0100
@@ -59,6 +59,7 @@
// Handlers for I/O events.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int id_);
diff -urwB zeromq-3.2.4-interm/src/ipc_listener.cpp zeromq-3.2.4/src/ipc_listener.cpp
--- zeromq-3.2.4-interm/src/ipc_listener.cpp 2012-11-23 08:35:52.000000000 +0100
+++ zeromq-3.2.4/src/ipc_listener.cpp 2014-11-13 12:59:05.723526546 +0100
@@ -99,6 +99,11 @@
socket->event_accepted (endpoint, fd);
}
+void zmq::ipc_listener_t::in_event_ex (pollfd *pfd)
+{
+ zmq::ipc_listener_t::in_event();
+}
+
int zmq::ipc_listener_t::get_address (std::string &addr_)
{
struct sockaddr_storage ss;
diff -urwB zeromq-3.2.4-interm/src/ipc_listener.hpp zeromq-3.2.4/src/ipc_listener.hpp
--- zeromq-3.2.4-interm/src/ipc_listener.hpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/ipc_listener.hpp 2014-11-13 12:57:44.364745769 +0100
@@ -60,6 +60,7 @@
// Handlers for I/O events.
void in_event ();
+ void in_event_ex (pollfd *);
// Close the listening socket.
int close ();
diff -urwB zeromq-3.2.4-interm/src/mailbox.cpp zeromq-3.2.4/src/mailbox.cpp
--- zeromq-3.2.4-interm/src/mailbox.cpp 2013-05-01 05:30:36.000000000 +0200
+++ zeromq-3.2.4/src/mailbox.cpp 2014-11-13 11:41:41.849549736 +0100
@@ -19,6 +19,10 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#if defined ZMQ_POLL_BASED_ON_POLL
+#include <poll.h>
+#endif
+
#include "mailbox.hpp"
#include "err.hpp"
@@ -85,3 +89,31 @@
return 0;
}
+int zmq::mailbox_t::recv_ex (command_t *cmd_, int timeout_, pollfd *pfd_)
+{
+ // Try to get the command straight away.
+ if (active) {
+ bool ok = cpipe.read (cmd_);
+ if (ok)
+ return 0;
+
+ // If there are no more commands available, switch into passive state.
+ active = false;
+ signaler.recv ();
+ }
+
+ // Wait for signal from the command sender.
+ int rc = signaler.wait_ex (timeout_, pfd_);
+ if (rc != 0 && (errno == EAGAIN || errno == EINTR))
+ return -1;
+
+ // We've got the signal. Now we can switch into active state.
+ active = true;
+
+ // Get a command.
+ errno_assert (rc == 0);
+ bool ok = cpipe.read (cmd_);
+ zmq_assert (ok);
+ return 0;
+}
+
diff -urwB zeromq-3.2.4-interm/src/mailbox.hpp zeromq-3.2.4/src/mailbox.hpp
--- zeromq-3.2.4-interm/src/mailbox.hpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/mailbox.hpp 2014-11-13 11:41:16.399927840 +0100
@@ -24,6 +24,8 @@
#include <stddef.h>
+#include <poll.h>
+
#include "platform.hpp"
#include "signaler.hpp"
#include "fd.hpp"
@@ -45,6 +47,7 @@
fd_t get_fd ();
void send (const command_t &cmd_);
int recv (command_t *cmd_, int timeout_);
+ int recv_ex (command_t *cmd_, int timeout_, pollfd *pfd_);
private:
diff -urwB zeromq-3.2.4-interm/src/pgm_receiver.cpp zeromq-3.2.4/src/pgm_receiver.cpp
--- zeromq-3.2.4-interm/src/pgm_receiver.cpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/pgm_receiver.cpp 2014-11-13 13:01:34.993282854 +0100
@@ -262,6 +262,11 @@
session->flush ();
}
+void zmq::pgm_receiver_t::in_event_ex (pollfd *pfd)
+{
+ zmq::pgm_receiver_t::in_event();
+}
+
void zmq::pgm_receiver_t::timer_event (int token)
{
zmq_assert (token == rx_timer_id);
diff -urwB zeromq-3.2.4-interm/src/pgm_receiver.hpp zeromq-3.2.4/src/pgm_receiver.hpp
--- zeromq-3.2.4-interm/src/pgm_receiver.hpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/pgm_receiver.hpp 2014-11-13 12:59:45.474928619 +0100
@@ -65,6 +65,7 @@
// i_poll_events interface implementation.
void in_event ();
+ void in_event_ex (pollfd *);
void timer_event (int token);
private:
diff -urwB zeromq-3.2.4-interm/src/pgm_sender.cpp zeromq-3.2.4/src/pgm_sender.cpp
--- zeromq-3.2.4-interm/src/pgm_sender.cpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/pgm_sender.cpp 2014-11-13 13:03:08.792872122 +0100
@@ -153,6 +153,11 @@
}
}
+void zmq::pgm_sender_t::in_event_ex (pollfd *pfd)
+{
+ zmq::pgm_sender_t::in_event ();
+}
+
void zmq::pgm_sender_t::out_event ()
{
// POLLOUT event from send socket. If write buffer is empty,
diff -urwB zeromq-3.2.4-interm/src/pgm_sender.hpp zeromq-3.2.4/src/pgm_sender.hpp
--- zeromq-3.2.4-interm/src/pgm_sender.hpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/pgm_sender.hpp 2014-11-13 13:02:07.983786701 +0100
@@ -63,6 +63,7 @@
// i_poll_events interface implementation.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int token);
diff -urwB zeromq-3.2.4-interm/src/reaper.cpp zeromq-3.2.4/src/reaper.cpp
--- zeromq-3.2.4-interm/src/reaper.cpp 2014-03-11 12:09:15.000000000 +0100
+++ zeromq-3.2.4/src/reaper.cpp 2014-11-13 11:44:15.547267347 +0100
@@ -73,6 +73,24 @@
}
}
+void zmq::reaper_t::in_event_ex (pollfd *pfd)
+{
+ while (true) {
+
+ // Get the next command. If there is none, exit.
+ command_t cmd;
+ int rc = mailbox.recv_ex (&cmd, 0, pfd);
+ if (rc != 0 && errno == EINTR)
+ continue;
+ if (rc != 0 && errno == EAGAIN)
+ break;
+ errno_assert (rc == 0);
+
+ // Process the command.
+ cmd.destination->process_command (cmd);
+ }
+}
+
void zmq::reaper_t::out_event ()
{
zmq_assert (false);
diff -urwB zeromq-3.2.4-interm/src/reaper.hpp zeromq-3.2.4/src/reaper.hpp
--- zeromq-3.2.4-interm/src/reaper.hpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/reaper.hpp 2014-11-13 11:44:26.167111581 +0100
@@ -46,6 +46,7 @@
// i_poll_events implementation.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int id_);
diff -urwB zeromq-3.2.4-interm/src/signaler.cpp zeromq-3.2.4/src/signaler.cpp
--- zeromq-3.2.4-interm/src/signaler.cpp 2014-03-12 15:51:34.000000000 +0100
+++ zeromq-3.2.4/src/signaler.cpp 2014-11-10 15:54:40.453118275 +0100
@@ -193,6 +193,76 @@
#endif
}
+int zmq::signaler_t::wait_ex (int timeout_, pollfd *pfd_)
+{
+#ifdef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+
+ struct pollfd pfd;
+ pfd.fd = r;
+ pfd.events = POLLIN;
+ if(timeout_)
+ {
+ int rc = poll (&pfd, 1, timeout_);
+ if (unlikely (rc < 0)) {
+ errno_assert (errno == EINTR);
+ return -1;
+ }
+ else if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ zmq_assert (rc == 1);
+ zmq_assert (pfd.revents & POLLIN);
+ return 0;
+ }
+ else
+ {
+ if((pfd_->revents & POLLIN) == 0)
+ {
+ // simulate EAGAIN
+ errno = EAGAIN;
+ return -1;
+ }
+ zmq_assert (pfd_->revents & POLLIN);
+ // MUST clear POLLIN
+ pfd_->revents &= ~POLLIN;
+ return 0;
+ }
+
+#elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+
+ fd_set fds;
+ FD_ZERO (&fds);
+ FD_SET (r, &fds);
+ struct timeval timeout;
+ if (timeout_ >= 0) {
+ timeout.tv_sec = timeout_ / 1000;
+ timeout.tv_usec = timeout_ % 1000 * 1000;
+ }
+#ifdef ZMQ_HAVE_WINDOWS
+ int rc = select (0, &fds, NULL, NULL,
+ timeout_ >= 0 ? &timeout : NULL);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ int rc = select (r + 1, &fds, NULL, NULL,
+ timeout_ >= 0 ? &timeout : NULL);
+ if (unlikely (rc < 0)) {
+ errno_assert (errno == EINTR);
+ return -1;
+ }
+#endif
+ if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ zmq_assert (rc == 1);
+ return 0;
+
+#else
+#error
+#endif
+}
+
void zmq::signaler_t::recv ()
{
// Attempt to read a signal.
diff -urwB zeromq-3.2.4-interm/src/signaler.hpp zeromq-3.2.4/src/signaler.hpp
--- zeromq-3.2.4-interm/src/signaler.hpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/signaler.hpp 2014-11-10 14:46:59.063018398 +0100
@@ -21,6 +21,8 @@
#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
#define __ZMQ_SIGNALER_HPP_INCLUDED__
+#include <poll.h>
+
#include "fd.hpp"
namespace zmq
@@ -41,6 +43,7 @@
fd_t get_fd ();
void send ();
int wait (int timeout_);
+ int wait_ex (int timeout_, pollfd *pfd_);
void recv ();
private:
diff -urwB zeromq-3.2.4-interm/src/socket_base.cpp zeromq-3.2.4/src/socket_base.cpp
--- zeromq-3.2.4-interm/src/socket_base.cpp 2014-03-11 12:08:49.000000000 +0100
+++ zeromq-3.2.4/src/socket_base.cpp 2014-11-14 12:03:58.872633507 +0100
@@ -39,6 +39,8 @@
#include <unistd.h>
#endif
+#include <poll.h>
+
#include "socket_base.hpp"
#include "tcp_listener.hpp"
#include "ipc_listener.hpp"
@@ -303,6 +305,38 @@
return options.getsockopt (option_, optval_, optvallen_);
}
+// only for ZMQ_EVENTS
+int zmq::socket_base_t::getsockopt_ex (int option_, void *optval_,
+ size_t *optvallen_, pollfd *pfd_)
+{
+ if (unlikely (ctx_terminated)) {
+ errno = ETERM;
+ return -1;
+ }
+
+ assert (option_ == ZMQ_EVENTS);
+
+ if (option_ == ZMQ_EVENTS) {
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ int rc = process_commands_ex (0, false, pfd_);
+ if (rc != 0 && (errno == EINTR || errno == ETERM))
+ return -1;
+ errno_assert (rc == 0);
+ *((int*) optval_) = 0;
+ if (has_out ())
+ *((int*) optval_) |= ZMQ_POLLOUT;
+ if (has_in ())
+ *((int*) optval_) |= ZMQ_POLLIN;
+ *optvallen_ = sizeof (int);
+ return 0;
+ }
+
+ return 0;
+}
+
int zmq::socket_base_t::bind (const char *addr_)
{
if (unlikely (ctx_terminated)) {
@@ -872,6 +906,62 @@
return 0;
}
+int zmq::socket_base_t::process_commands_ex (int timeout_, bool throttle_, pollfd *pfd_)
+{
+ int rc;
+ command_t cmd;
+ if (timeout_ != 0) {
+
+ // If we are asked to wait, simply ask mailbox to wait.
+ rc = mailbox.recv_ex (&cmd, timeout_, pfd_);
+ }
+ else {
+
+ // If we are asked not to wait, check whether we haven't processed
+ // commands recently, so that we can throttle the new commands.
+
+ // Get the CPU's tick counter. If 0, the counter is not available.
+ uint64_t tsc = zmq::clock_t::rdtsc ();
+
+ // Optimised version of command processing - it doesn't have to check
+ // for incoming commands each time. It does so only if certain time
+ // elapsed since last command processing. Command delay varies
+ // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
+ // etc. The optimisation makes sense only on platforms where getting
+ // a timestamp is a very cheap operation (tens of nanoseconds).
+ if (tsc && throttle_) {
+
+ // Check whether TSC haven't jumped backwards (in case of migration
+ // between CPU cores) and whether certain time have elapsed since
+ // last command processing. If it didn't do nothing.
+ if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
+ return 0;
+ last_tsc = tsc;
+ }
+
+ // Check whether there are any commands pending for this thread.
+ rc = mailbox.recv_ex (&cmd, 0, pfd_);
+ }
+
+ // Process all available commands.
+ while (rc == 0) {
+ cmd.destination->process_command (cmd);
+ rc = mailbox.recv_ex (&cmd, 0, pfd_);
+ }
+
+ if (errno == EINTR)
+ return -1;
+
+ zmq_assert (errno == EAGAIN);
+
+ if (ctx_terminated) {
+ errno = ETERM;
+ return -1;
+ }
+
+ return 0;
+}
+
void zmq::socket_base_t::process_stop ()
{
// Here, someone have called zmq_term while the socket was still alive.
@@ -960,6 +1050,16 @@
check_destroy ();
}
+void zmq::socket_base_t::in_event_ex (pollfd *pfd)
+{
+ // This function is invoked only once the socket is running in the context
+ // of the reaper thread. Process any commands from other threads/sockets
+ // that may be available at the moment. Ultimately, the socket will
+ // be destroyed.
+ process_commands (0, false);
+ check_destroy ();
+}
+
void zmq::socket_base_t::out_event ()
{
zmq_assert (false);
diff -urwB zeromq-3.2.4-interm/src/socket_base.hpp zeromq-3.2.4/src/socket_base.hpp
--- zeromq-3.2.4-interm/src/socket_base.hpp 2014-04-09 11:14:26.000000000 +0200
+++ zeromq-3.2.4/src/socket_base.hpp 2014-11-13 11:42:15.699047646 +0100
@@ -77,6 +77,7 @@
// Interface for communication with the API layer.
int setsockopt (int option_, const void *optval_, size_t optvallen_);
int getsockopt (int option_, void *optval_, size_t *optvallen_);
+ int getsockopt_ex (int option_, void *optval_, size_t *optvallen_, pollfd *pfd_);
int bind (const char *addr_);
int connect (const char *addr_);
int term_endpoint (const char *addr_);
@@ -96,6 +97,7 @@
// i_poll_events implementation. This interface is used when socket
// is handled by the poller in the reaper thread.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int id_);
@@ -210,6 +212,7 @@
// If throttle argument is true, commands are processed at most once
// in a predefined time period.
int process_commands (int timeout_, bool throttle_);
+ int process_commands_ex (int timeout_, bool throttle_, pollfd *pfd_);
// Handlers for incoming commands.
void process_stop ();
diff -urwB zeromq-3.2.4-interm/src/stream_engine.cpp zeromq-3.2.4/src/stream_engine.cpp
--- zeromq-3.2.4-interm/src/stream_engine.cpp 2014-04-25 11:53:51.000000000 +0200
+++ zeromq-3.2.4/src/stream_engine.cpp 2014-11-13 15:29:55.984958203 +0100
@@ -244,6 +245,11 @@
}
}
+void zmq::stream_engine_t::in_event_ex (pollfd *pfd)
+{
+ zmq::stream_engine_t::in_event();
+}
+
void zmq::stream_engine_t::out_event ()
{
// If write buffer is empty, try to read new data from the encoder.
diff -urwB zeromq-3.2.4-interm/src/stream_engine.hpp zeromq-3.2.4/src/stream_engine.hpp
--- zeromq-3.2.4-interm/src/stream_engine.hpp 2013-02-01 10:03:55.000000000 +0100
+++ zeromq-3.2.4/src/stream_engine.hpp 2014-11-13 15:27:26.236236302 +0100
@@ -62,6 +62,7 @@
// i_poll_events interface implementation.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
private:
diff -urwB zeromq-3.2.4-interm/src/tcp_connecter.cpp zeromq-3.2.4/src/tcp_connecter.cpp
--- zeromq-3.2.4-interm/src/tcp_connecter.cpp 2013-05-02 20:41:25.000000000 +0200
+++ zeromq-3.2.4/src/tcp_connecter.cpp 2014-11-13 13:05:15.161971274 +0100
@@ -110,6 +110,14 @@
out_event ();
}
+void zmq::tcp_connecter_t::in_event_ex (pollfd *pfd)
+{
+ // We are not polling for incomming data, so we are actually called
+ // because of error here. However, we can get error on out event as well
+ // on some platforms, so we'll simply handle both events in the same way.
+ out_event ();
+}
+
void zmq::tcp_connecter_t::out_event ()
{
fd_t fd = connect ();
diff -urwB zeromq-3.2.4-interm/src/tcp_connecter.hpp zeromq-3.2.4/src/tcp_connecter.hpp
--- zeromq-3.2.4-interm/src/tcp_connecter.hpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/tcp_connecter.hpp 2014-11-13 13:04:30.041651724 +0100
@@ -57,6 +57,7 @@
// Handlers for I/O events.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int id_);
diff -urwB zeromq-3.2.4-interm/src/tcp_listener.cpp zeromq-3.2.4/src/tcp_listener.cpp
--- zeromq-3.2.4-interm/src/tcp_listener.cpp 2012-11-23 08:38:26.000000000 +0100
+++ zeromq-3.2.4/src/tcp_listener.cpp 2014-11-13 13:07:09.469256737 +0100
@@ -111,6 +111,11 @@
socket->event_accepted (endpoint, fd);
}
+void zmq::tcp_listener_t::in_event_ex (pollfd *pfd)
+{
+ zmq::tcp_listener_t::in_event ();
+}
+
void zmq::tcp_listener_t::close ()
{
zmq_assert (s != retired_fd);
diff -urwB zeromq-3.2.4-interm/src/tcp_listener.hpp zeromq-3.2.4/src/tcp_listener.hpp
--- zeromq-3.2.4-interm/src/tcp_listener.hpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/tcp_listener.hpp 2014-11-13 13:05:37.060644451 +0100
@@ -57,6 +57,7 @@
// Handlers for I/O events.
void in_event ();
+ void in_event_ex (pollfd *);
// Close the listening socket.
void close ();
diff -urwB zeromq-3.2.4-interm/src/zmq.cpp zeromq-3.2.4/src/zmq.cpp
--- zeromq-3.2.4-interm/src/zmq.cpp 2014-04-09 14:39:35.000000000 +0200
+++ zeromq-3.2.4/src/zmq.cpp 2014-11-20 13:21:23.465769599 +0100
@@ -264,6 +265,17 @@
return result;
}
+int zmq_getsockopt_ex (void *s_, int option_, void *optval_, size_t *optvallen_, pollfd *pfd_)
+{
+ if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
+ errno = ENOTSOCK;
+ return -1;
+ }
+ zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
+ int result = s->getsockopt_ex (option_, optval_, optvallen_, pfd_);
+ return result;
+}
+
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
@@ -703,12 +720,15 @@
// using the ZMQ_EVENTS socket option.
if (items_ [i].socket) {
size_t zmq_events_size = sizeof (uint32_t);
- uint32_t zmq_events;
- if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
- &zmq_events_size) == -1) {
+ uint32_t zmq_events = 0;
+
+ if (zmq_getsockopt_ex (items_ [i].socket, ZMQ_EVENTS,
+ &zmq_events,
+ &zmq_events_size, &pollfds [i]) == -1) {
free (pollfds);
return -1;
}
+
if ((items_ [i].events & ZMQ_POLLOUT) &&
(zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT;
More information about the zeromq-dev
mailing list