[zeromq-dev] zmq_poll performance question
Francis Le Bourse
zno-reply-francis.lebourse at sfr-sh.fr
Tue Nov 25 13:45:09 CET 2014
On 11/24/2014 8:08 PM, Arnaud Kapp wrote:
>> Currently, the patch is written for 3.2.4. I'll wait to put it in libzmq master
> Oh okay. This is the commit that added the flag:
> https://github.com/zeromq/libzmq/commit/779c37abc433cb6595ddeedaf86b280317656bdd
I have rewritten the patch to take the POLLPRI flag into account, and to
apply cleanly to the current master (Wed 11/25).
Cheers,
Francis
>
> libzmq was 4.1 at the time I believe.
>
> I'll probably look at it this week-end then :)
>
> On Mon, Nov 24, 2014 at 12:10 PM, Francis Le Bourse
> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>> Hi,
>> On 11/24/2014 11:35 AM, Arnaud Kapp wrote:
>>> Hello,
>>>
>>> I recently added support for POLLPRI flag.
>>> It looks like it's not handled in your patch
>> No, it isn't handled. In which version do you have added this flag ?
>> Currently, the patch is written for 3.2.4. I'll wait to put it in libzmq
>> master.
>>> and that it needs custom
>>> support. Since there is no test related to this flags you wouldn't
>>> notice.
>>>
>>> I can give it a look if you want.
>> That would be nice.
>>
>> Cheers,
>> Francis
>>
>>
>>> On Sat, Nov 22, 2014 at 2:16 PM, Pieter Hintjens <ph at imatix.com> wrote:
>>>> I suggest you send the patch to libzmq master, and ensure all test
>>>> cases pass. Then we can get this into the next version.
>>>>
>>>> On Fri, Nov 21, 2014 at 2:50 PM, Francis Le Bourse
>>>> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>>>>> Hi,
>>>>>
>>>>> On 11/6/2014 3:18 PM, Pieter Hintjens wrote:
>>>>>> Oh, ok. Sounds like you have a good candidate for some before/after
>>>>>> measurement and optimization. Are you going to try to make a patch for
>>>>>> this?
>>>>> I have a patch candidate for this optimization, the performance
>>>>> improvement
>>>>> is very good and it doesn't seem to introduce any new instability.
>>>>> What is modified:
>>>>> - zmq_poll(), there is only one poll() now,
>>>>> - and epoll() from epoll.cpp
>>>>> Other calls to poll() and select() are left unmodified.
>>>>>
>>>>> I woulld be happy to have any feedback.
>>>>>
>>>>>
>>>>> Cheers,
>>>>> Francis
>>>>>>
>>>>>> On Thu, Nov 6, 2014 at 2:09 PM, Francis Le Bourse
>>>>>> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>>>>>>> On 11/6/2014 11:47 AM, Pieter Hintjens wrote:
>>>>>>>> A simple optimization is, when you are polling sockets for input, to
>>>>>>>> continue reading from an active socket using a non-blocking read. So
>>>>>>>> you process all waiting messages on a socket and then only switch
>>>>>>>> back
>>>>>>>> to poll when needed.
>>>>>>> Thank you for you quick reply.
>>>>>>>
>>>>>>> Yes, but the question was more about the zmq_poll() internals.
>>>>>>> For 600+ file descriptors, zmq_poll() calls poll() a huge number of
>>>>>>> times
>>>>>>> for only a few that will trigger a POLLIN and the relevant information
>>>>>>> is
>>>>>>> already known / present in the pollfds array. The performance hit is
>>>>>>> there.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Francis
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> On Thu, Nov 6, 2014 at 11:28 AM, Francis Le Bourse
>>>>>>>> <zno-reply-francis.lebourse at sfr-sh.fr> wrote:
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am looking at a performance issue in zmq, when the number of
>>>>>>>>> zsockets
>>>>>>>>> /
>>>>>>>>> file descriptors becomes large.
>>>>>>>>> The relevant calls are:
>>>>>>>>> poll+0x57
>>>>>>>>> zmq_poll+0x2e3
>>>>>>>>> zloop_start+0x1e8
>>>>>>>>> main+0xb40
>>>>>>>>> __libc_start_main+0xfd
>>>>>>>>> immediately followed by a loop of
>>>>>>>>> poll+0x57
>>>>>>>>> zmq::signaler_t::wait(int)+0x33
>>>>>>>>> zmq::mailbox_t::recv(zmq::command_t*, int)+0x78
>>>>>>>>> zmq::socket_base_t::process_commands(int, bool)+0xbe
>>>>>>>>> zmq::socket_base_t::getsockopt(int, void*, unsigned
>>>>>>>>> long*)+0x135
>>>>>>>>> zmq_getsockopt+0x75
>>>>>>>>> zmq_poll+0x3da
>>>>>>>>> zloop_start+0x1e8
>>>>>>>>> main+0xb40
>>>>>>>>> __libc_start_main+0xfd
>>>>>>>>>
>>>>>>>>> The code in the loop is executed once per file descriptor in the
>>>>>>>>> initial
>>>>>>>>> pollarray, redoing a poll() system call each time.
>>>>>>>>> Is there a reason to proceed that way ?
>>>>>>>>> Would be possible to reuse the results of the first poll() in order
>>>>>>>>> to
>>>>>>>>> bypass the second set of system calls ?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Francis
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> _______________________________________________
>>>>>>>>> zeromq-dev mailing list
>>>>>>>>> zeromq-dev at lists.zeromq.org
>>>>>>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>>>>>>>
>>>> _______________________________________________
>>>> zeromq-dev mailing list
>>>> zeromq-dev at lists.zeromq.org
>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>
>>>
>>
>
>
-------------- next part --------------
diff -urwB libzmq-original/src/ctx.cpp libzmq/src/ctx.cpp
--- libzmq-original/src/ctx.cpp 2014-11-25 11:01:35.704930597 +0100
+++ libzmq/src/ctx.cpp 2014-11-25 11:03:41.041081315 +0100
@@ -28,6 +28,8 @@
#include <new>
#include <string.h>
+#include <poll.h>
+
#include "ctx.hpp"
#include "socket_base.hpp"
#include "io_thread.hpp"
diff -urwB libzmq-original/src/epoll.cpp libzmq/src/epoll.cpp
--- libzmq-original/src/epoll.cpp 2014-11-25 11:01:35.707930552 +0100
+++ libzmq/src/epoll.cpp 2014-11-25 12:31:57.856918921 +0100
@@ -27,6 +27,8 @@
#include <algorithm>
#include <new>
+#include <poll.h>
+
#include "epoll.hpp"
#include "err.hpp"
#include "config.hpp"
@@ -152,18 +154,43 @@
for (int i = 0; i < n; i ++) {
poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr);
+ uint32_t events = ev_buf[i].events;
+ ev_buf[i].events = 0;
+
+ // build a pollfd structure with the events returned by
+ // epoll and call in_events_ex in order to bypass poll()
+ // if EPOLLIN is present in events
+
+ struct pollfd pfd;
+ pfd.fd = pe->fd;
+ pfd.events = 0;
+ if(pe->ev.events & EPOLLIN)
+ pfd.events |= POLLIN;
+ if(pe->ev.events & EPOLLOUT)
+ pfd.events |= POLLOUT;
+ pfd.revents = 0;
+
+ if(events & EPOLLIN)
+ pfd.revents |= POLLIN;
+ if(events & EPOLLOUT)
+ pfd.revents |= POLLOUT;
+ if(events & EPOLLERR)
+ pfd.revents |= POLLERR;
+ if(events & EPOLLHUP)
+ pfd.revents |= POLLHUP;
+
if (pe->fd == retired_fd)
continue;
- if (ev_buf [i].events & (EPOLLERR | EPOLLHUP))
+ if (events & (EPOLLERR | EPOLLHUP))
pe->events->in_event ();
if (pe->fd == retired_fd)
continue;
- if (ev_buf [i].events & EPOLLOUT)
+ if (events & EPOLLOUT)
pe->events->out_event ();
if (pe->fd == retired_fd)
continue;
- if (ev_buf [i].events & EPOLLIN)
- pe->events->in_event ();
+ if (events & EPOLLIN)
+ pe->events->in_event_ex (&pfd);
}
// Destroy retired event sources.
diff -urwB libzmq-original/src/i_poll_events.hpp libzmq/src/i_poll_events.hpp
--- libzmq-original/src/i_poll_events.hpp 2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/i_poll_events.hpp 2014-11-25 11:03:41.042081411 +0100
@@ -20,6 +20,8 @@
#ifndef __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
#define __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
+#include <poll.h>
+
namespace zmq
{
@@ -32,6 +34,7 @@
// Called by I/O thread when file descriptor is ready for reading.
virtual void in_event () = 0;
+ virtual void in_event_ex (pollfd *) = 0;
// Called by I/O thread when file descriptor is ready for writing.
virtual void out_event () = 0;
diff -urwB libzmq-original/src/io_object.cpp libzmq/src/io_object.cpp
--- libzmq-original/src/io_object.cpp 2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/io_object.cpp 2014-11-25 11:03:41.042081411 +0100
@@ -95,6 +95,11 @@
zmq_assert (false);
}
+void zmq::io_object_t::in_event_ex (pollfd *pfd)
+{
+ zmq_assert (false);
+}
+
void zmq::io_object_t::out_event ()
{
zmq_assert (false);
diff -urwB libzmq-original/src/io_object.hpp libzmq/src/io_object.hpp
--- libzmq-original/src/io_object.hpp 2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/io_object.hpp 2014-11-25 11:03:41.043081508 +0100
@@ -63,6 +63,7 @@
// i_poll_events interface implementation.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int id_);
diff -urwB libzmq-original/src/io_thread.cpp libzmq/src/io_thread.cpp
--- libzmq-original/src/io_thread.cpp 2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/io_thread.cpp 2014-11-25 11:03:41.043081508 +0100
@@ -77,6 +77,23 @@
errno_assert (rc != 0 && errno == EAGAIN);
}
+void zmq::io_thread_t::in_event_ex (pollfd *pfd)
+{
+ // TODO: Do we want to limit number of commands I/O thread can
+ // process in a single go?
+
+ command_t cmd;
+ int rc = mailbox.recv_ex (&cmd, 0, pfd); // will clear POLLIN
+
+ while (rc == 0 || errno == EINTR) {
+ if (rc == 0)
+ cmd.destination->process_command (cmd);
+ rc = mailbox.recv_ex (&cmd, 0, pfd);
+ }
+
+ errno_assert (rc != 0 && errno == EAGAIN);
+}
+
void zmq::io_thread_t::out_event ()
{
// We are never polling for POLLOUT here. This function is never called.
diff -urwB libzmq-original/src/io_thread.hpp libzmq/src/io_thread.hpp
--- libzmq-original/src/io_thread.hpp 2014-11-25 11:01:35.709930522 +0100
+++ libzmq/src/io_thread.hpp 2014-11-25 11:03:41.043081508 +0100
@@ -57,6 +57,7 @@
// i_poll_events implementation.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int id_);
diff -urwB libzmq-original/src/ipc_connecter.cpp libzmq/src/ipc_connecter.cpp
--- libzmq-original/src/ipc_connecter.cpp 2014-11-25 11:01:35.710930507 +0100
+++ libzmq/src/ipc_connecter.cpp 2014-11-25 11:03:41.043081508 +0100
@@ -99,6 +99,14 @@
out_event ();
}
+void zmq::ipc_connecter_t::in_event_ex (pollfd *pfd)
+{
+ // We are not polling for incomming data, so we are actually called
+ // because of error here. However, we can get error on out event as well
+ // on some platforms, so we'll simply handle both events in the same way.
+ out_event ();
+}
+
void zmq::ipc_connecter_t::out_event ()
{
fd_t fd = connect ();
diff -urwB libzmq-original/src/ipc_connecter.hpp libzmq/src/ipc_connecter.hpp
--- libzmq-original/src/ipc_connecter.hpp 2014-11-25 11:01:35.710930507 +0100
+++ libzmq/src/ipc_connecter.hpp 2014-11-25 11:03:41.044081604 +0100
@@ -58,6 +58,7 @@
// Handlers for I/O events.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int id_);
diff -urwB libzmq-original/src/ipc_listener.cpp libzmq/src/ipc_listener.cpp
--- libzmq-original/src/ipc_listener.cpp 2014-11-25 11:01:35.710930507 +0100
+++ libzmq/src/ipc_listener.cpp 2014-11-25 11:03:41.044081604 +0100
@@ -110,6 +110,11 @@
socket->event_accepted (endpoint, fd);
}
+void zmq::ipc_listener_t::in_event_ex (pollfd *pfd)
+{
+ zmq::ipc_listener_t::in_event();
+}
+
int zmq::ipc_listener_t::get_address (std::string &addr_)
{
struct sockaddr_storage ss;
diff -urwB libzmq-original/src/ipc_listener.hpp libzmq/src/ipc_listener.hpp
--- libzmq-original/src/ipc_listener.hpp 2014-11-25 11:01:35.710930507 +0100
+++ libzmq/src/ipc_listener.hpp 2014-11-25 11:03:41.044081604 +0100
@@ -59,6 +59,7 @@
// Handlers for I/O events.
void in_event ();
+ void in_event_ex (pollfd *);
// Close the listening socket.
int close ();
diff -urwB libzmq-original/src/mailbox.cpp libzmq/src/mailbox.cpp
--- libzmq-original/src/mailbox.cpp 2014-11-25 11:01:35.711930493 +0100
+++ libzmq/src/mailbox.cpp 2014-11-25 11:03:41.045081698 +0100
@@ -17,6 +17,10 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#if defined ZMQ_POLL_BASED_ON_POLL
+#include <poll.h>
+#endif
+
#include "mailbox.hpp"
#include "err.hpp"
@@ -84,3 +88,32 @@
zmq_assert (ok);
return 0;
}
+
+int zmq::mailbox_t::recv_ex (command_t *cmd_, int timeout_, pollfd *pfd_)
+{
+ // Try to get the command straight away.
+ if (active) {
+ bool ok = cpipe.read (cmd_);
+ if (ok)
+ return 0;
+
+ // If there are no more commands available, switch into passive state.
+ active = false;
+ signaler.recv ();
+ }
+
+ // Wait for signal from the command sender.
+ int rc = signaler.wait_ex (timeout_, pfd_);
+ if (rc != 0 && (errno == EAGAIN || errno == EINTR))
+ return -1;
+
+ // We've got the signal. Now we can switch into active state.
+ active = true;
+
+ // Get a command.
+ errno_assert (rc == 0);
+ bool ok = cpipe.read (cmd_);
+ zmq_assert (ok);
+ return 0;
+}
+
diff -urwB libzmq-original/src/mailbox.hpp libzmq/src/mailbox.hpp
--- libzmq-original/src/mailbox.hpp 2014-11-25 11:01:35.712930479 +0100
+++ libzmq/src/mailbox.hpp 2014-11-25 11:03:41.045081698 +0100
@@ -22,6 +22,8 @@
#include <stddef.h>
+#include <poll.h>
+
#include "platform.hpp"
#include "signaler.hpp"
#include "fd.hpp"
@@ -43,6 +45,7 @@
fd_t get_fd () const;
void send (const command_t &cmd_);
int recv (command_t *cmd_, int timeout_);
+ int recv_ex (command_t *cmd_, int timeout_, pollfd *pfd_);
#ifdef HAVE_FORK
// close the file descriptors in the signaller. This is used in a forked
diff -urwB libzmq-original/src/pgm_receiver.cpp libzmq/src/pgm_receiver.cpp
--- libzmq-original/src/pgm_receiver.cpp 2014-11-25 11:01:35.715930435 +0100
+++ libzmq/src/pgm_receiver.cpp 2014-11-25 11:03:41.045081698 +0100
@@ -275,6 +275,11 @@
}
+void zmq::pgm_receiver_t::in_event_ex (pollfd *pfd)
+{
+ zmq::pgm_receiver_t::in_event();
+}
+
void zmq::pgm_receiver_t::timer_event (int token)
{
zmq_assert (token == rx_timer_id);
diff -urwB libzmq-original/src/pgm_receiver.hpp libzmq/src/pgm_receiver.hpp
--- libzmq-original/src/pgm_receiver.hpp 2014-11-25 11:01:35.715930435 +0100
+++ libzmq/src/pgm_receiver.hpp 2014-11-25 11:03:41.045081698 +0100
@@ -63,6 +63,7 @@
// i_poll_events interface implementation.
void in_event ();
+ void in_event_ex (pollfd *);
void timer_event (int token);
private:
diff -urwB libzmq-original/src/pgm_sender.cpp libzmq/src/pgm_sender.cpp
--- libzmq-original/src/pgm_sender.cpp 2014-11-25 11:01:35.715930435 +0100
+++ libzmq/src/pgm_sender.cpp 2014-11-25 11:03:41.046081791 +0100
@@ -157,6 +157,11 @@
}
}
+void zmq::pgm_sender_t::in_event_ex (pollfd *pfd)
+{
+ zmq::pgm_sender_t::in_event ();
+}
+
void zmq::pgm_sender_t::out_event ()
{
// POLLOUT event from send socket. If write buffer is empty,
diff -urwB libzmq-original/src/pgm_sender.hpp libzmq/src/pgm_sender.hpp
--- libzmq-original/src/pgm_sender.hpp 2014-11-25 11:01:35.716930420 +0100
+++ libzmq/src/pgm_sender.hpp 2014-11-25 11:03:41.046081791 +0100
@@ -62,6 +62,7 @@
// i_poll_events interface implementation.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int token);
diff -urwB libzmq-original/src/reaper.cpp libzmq/src/reaper.cpp
--- libzmq-original/src/reaper.cpp 2014-11-25 11:01:35.720930360 +0100
+++ libzmq/src/reaper.cpp 2014-11-25 11:03:41.046081791 +0100
@@ -83,6 +83,24 @@
}
}
+void zmq::reaper_t::in_event_ex (pollfd *pfd)
+{
+ while (true) {
+
+ // Get the next command. If there is none, exit.
+ command_t cmd;
+ int rc = mailbox.recv_ex (&cmd, 0, pfd);
+ if (rc != 0 && errno == EINTR)
+ continue;
+ if (rc != 0 && errno == EAGAIN)
+ break;
+ errno_assert (rc == 0);
+
+ // Process the command.
+ cmd.destination->process_command (cmd);
+ }
+}
+
void zmq::reaper_t::out_event ()
{
zmq_assert (false);
diff -urwB libzmq-original/src/reaper.hpp libzmq/src/reaper.hpp
--- libzmq-original/src/reaper.hpp 2014-11-25 11:01:35.720930360 +0100
+++ libzmq/src/reaper.hpp 2014-11-25 11:03:41.047081882 +0100
@@ -45,6 +45,7 @@
// i_poll_events implementation.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int id_);
diff -urwB libzmq-original/src/signaler.cpp libzmq/src/signaler.cpp
--- libzmq-original/src/signaler.cpp 2014-11-25 11:01:35.721930346 +0100
+++ libzmq/src/signaler.cpp 2014-11-25 12:19:39.843830861 +0100
@@ -260,6 +260,94 @@
#endif
}
+int zmq::signaler_t::wait_ex (int timeout_, pollfd *pfd_)
+{
+#ifdef HAVE_FORK
+ if (unlikely (pid != getpid ())) {
+ // we have forked and the file descriptor is closed. Emulate an interupt
+ // response.
+ //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
+ errno = EINTR;
+ return -1;
+ }
+#endif
+
+#ifdef ZMQ_POLL_BASED_ON_POLL
+ struct pollfd pfd;
+ pfd.fd = r;
+ pfd.events = POLLIN;
+ if(timeout_)
+ {
+ int rc = poll (&pfd, 1, timeout_);
+ if (unlikely (rc < 0)) {
+ errno_assert (errno == EINTR);
+ return -1;
+ }
+ else if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+#ifdef HAVE_FORK
+ else
+ if (unlikely (pid != getpid ())) {
+ // we have forked and the file descriptor is closed. Emulate an interupt
+ // response.
+ //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
+ errno = EINTR;
+ return -1;
+ }
+#endif
+ zmq_assert (rc == 1);
+ zmq_assert (pfd.revents & POLLIN);
+ return 0;
+ }
+ else
+ {
+ if((pfd_->revents & POLLIN) == 0)
+ {
+ // simulate EAGAIN
+ errno = EAGAIN;
+ return -1;
+ }
+ zmq_assert (pfd_->revents & POLLIN);
+ // MUST clear POLLIN
+ pfd_->revents &= ~POLLIN;
+ return 0;
+ }
+
+#elif defined ZMQ_POLL_BASED_ON_SELECT
+ fd_set fds;
+ FD_ZERO (&fds);
+ FD_SET (r, &fds);
+ struct timeval timeout;
+ if (timeout_ >= 0) {
+ timeout.tv_sec = timeout_ / 1000;
+ timeout.tv_usec = timeout_ % 1000 * 1000;
+ }
+#ifdef ZMQ_HAVE_WINDOWS
+ int rc = select (0, &fds, NULL, NULL,
+ timeout_ >= 0 ? &timeout : NULL);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ int rc = select (r + 1, &fds, NULL, NULL,
+ timeout_ >= 0 ? &timeout : NULL);
+ if (unlikely (rc < 0)) {
+ errno_assert (errno == EINTR);
+ return -1;
+ }
+#endif
+ if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ zmq_assert (rc == 1);
+ return 0;
+
+#else
+#error
+#endif
+}
+
void zmq::signaler_t::recv ()
{
// Attempt to read a signal.
diff -urwB libzmq-original/src/signaler.hpp libzmq/src/signaler.hpp
--- libzmq-original/src/signaler.hpp 2014-11-25 11:01:35.722930332 +0100
+++ libzmq/src/signaler.hpp 2014-11-25 11:03:41.047081882 +0100
@@ -24,6 +24,8 @@
#include <unistd.h>
#endif
+#include <poll.h>
+
#include "fd.hpp"
namespace zmq
@@ -44,6 +46,7 @@
fd_t get_fd () const;
void send ();
int wait (int timeout_);
+ int wait_ex (int timeout_, pollfd *pfd_);
void recv ();
#ifdef HAVE_FORK
diff -urwB libzmq-original/src/socket_base.cpp libzmq/src/socket_base.cpp
--- libzmq-original/src/socket_base.cpp 2014-11-25 11:01:35.722930332 +0100
+++ libzmq/src/socket_base.cpp 2014-11-25 11:03:41.048081973 +0100
@@ -36,6 +36,8 @@
#include <unistd.h>
#endif
+#include <poll.h>
+
#include "socket_base.hpp"
#include "tcp_listener.hpp"
#include "ipc_listener.hpp"
@@ -346,6 +348,38 @@
return options.getsockopt (option_, optval_, optvallen_);
}
+// only for ZMQ_EVENTS
+int zmq::socket_base_t::getsockopt_ex (int option_, void *optval_,
+ size_t *optvallen_, pollfd *pfd_)
+{
+ if (unlikely (ctx_terminated)) {
+ errno = ETERM;
+ return -1;
+ }
+
+ assert (option_ == ZMQ_EVENTS);
+
+ if (option_ == ZMQ_EVENTS) {
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ int rc = process_commands_ex (0, false, pfd_);
+ if (rc != 0 && (errno == EINTR || errno == ETERM))
+ return -1;
+ errno_assert (rc == 0);
+ *((int*) optval_) = 0;
+ if (has_out ())
+ *((int*) optval_) |= ZMQ_POLLOUT;
+ if (has_in ())
+ *((int*) optval_) |= ZMQ_POLLIN;
+ *optvallen_ = sizeof (int);
+ return 0;
+ }
+
+ return 0;
+}
+
int zmq::socket_base_t::bind (const char *addr_)
{
if (unlikely (ctx_terminated)) {
@@ -1026,6 +1060,62 @@
return 0;
}
+int zmq::socket_base_t::process_commands_ex (int timeout_, bool throttle_, pollfd *pfd_)
+{
+ int rc;
+ command_t cmd;
+ if (timeout_ != 0) {
+
+ // If we are asked to wait, simply ask mailbox to wait.
+ rc = mailbox.recv_ex (&cmd, timeout_, pfd_);
+ }
+ else {
+
+ // If we are asked not to wait, check whether we haven't processed
+ // commands recently, so that we can throttle the new commands.
+
+ // Get the CPU's tick counter. If 0, the counter is not available.
+ uint64_t tsc = zmq::clock_t::rdtsc ();
+
+ // Optimised version of command processing - it doesn't have to check
+ // for incoming commands each time. It does so only if certain time
+ // elapsed since last command processing. Command delay varies
+ // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
+ // etc. The optimisation makes sense only on platforms where getting
+ // a timestamp is a very cheap operation (tens of nanoseconds).
+ if (tsc && throttle_) {
+
+ // Check whether TSC haven't jumped backwards (in case of migration
+ // between CPU cores) and whether certain time have elapsed since
+ // last command processing. If it didn't do nothing.
+ if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
+ return 0;
+ last_tsc = tsc;
+ }
+
+ // Check whether there are any commands pending for this thread.
+ rc = mailbox.recv_ex (&cmd, 0, pfd_);
+ }
+
+ // Process all available commands.
+ while (rc == 0) {
+ cmd.destination->process_command (cmd);
+ rc = mailbox.recv_ex (&cmd, 0, pfd_);
+ }
+
+ if (errno == EINTR)
+ return -1;
+
+ zmq_assert (errno == EAGAIN);
+
+ if (ctx_terminated) {
+ errno = ETERM;
+ return -1;
+ }
+
+ return 0;
+}
+
void zmq::socket_base_t::process_stop ()
{
// Here, someone have called zmq_term while the socket was still alive.
@@ -1124,6 +1214,16 @@
check_destroy ();
}
+void zmq::socket_base_t::in_event_ex (pollfd *pfd)
+{
+ // This function is invoked only once the socket is running in the context
+ // of the reaper thread. Process any commands from other threads/sockets
+ // that may be available at the moment. Ultimately, the socket will
+ // be destroyed.
+ process_commands (0, false);
+ check_destroy ();
+}
+
void zmq::socket_base_t::out_event ()
{
zmq_assert (false);
diff -urwB libzmq-original/src/socket_base.hpp libzmq/src/socket_base.hpp
--- libzmq-original/src/socket_base.hpp 2014-11-25 11:01:35.722930332 +0100
+++ libzmq/src/socket_base.hpp 2014-11-25 11:03:41.048081973 +0100
@@ -75,6 +75,7 @@
// Interface for communication with the API layer.
int setsockopt (int option_, const void *optval_, size_t optvallen_);
int getsockopt (int option_, void *optval_, size_t *optvallen_);
+ int getsockopt_ex (int option_, void *optval_, size_t *optvallen_, pollfd *pfd_);
int bind (const char *addr_);
int connect (const char *addr_);
int term_endpoint (const char *addr_);
@@ -94,6 +95,7 @@
// i_poll_events implementation. This interface is used when socket
// is handled by the poller in the reaper thread.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int id_);
@@ -219,6 +221,7 @@
// If throttle argument is true, commands are processed at most once
// in a predefined time period.
int process_commands (int timeout_, bool throttle_);
+ int process_commands_ex (int timeout_, bool throttle_, pollfd *pfd_);
// Handlers for incoming commands.
void process_stop ();
diff -urwB libzmq-original/src/stream_engine.cpp libzmq/src/stream_engine.cpp
--- libzmq-original/src/stream_engine.cpp 2014-11-25 11:01:35.723930318 +0100
+++ libzmq/src/stream_engine.cpp 2014-11-25 11:03:41.049082060 +0100
@@ -320,6 +320,11 @@
session->flush ();
}
+void zmq::stream_engine_t::in_event_ex (pollfd *pfd)
+{
+ zmq::stream_engine_t::in_event();
+}
+
void zmq::stream_engine_t::out_event ()
{
zmq_assert (!io_error);
diff -urwB libzmq-original/src/stream_engine.hpp libzmq/src/stream_engine.hpp
--- libzmq-original/src/stream_engine.hpp 2014-11-25 11:01:35.724930303 +0100
+++ libzmq/src/stream_engine.hpp 2014-11-25 11:03:41.049082060 +0100
@@ -73,6 +73,7 @@
// i_poll_events interface implementation.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int id_);
diff -urwB libzmq-original/src/tcp_connecter.cpp libzmq/src/tcp_connecter.cpp
--- libzmq-original/src/tcp_connecter.cpp 2014-11-25 11:01:35.725930288 +0100
+++ libzmq/src/tcp_connecter.cpp 2014-11-25 11:03:41.049082060 +0100
@@ -108,6 +108,14 @@
out_event ();
}
+void zmq::tcp_connecter_t::in_event_ex (pollfd *pfd)
+{
+ // We are not polling for incomming data, so we are actually called
+ // because of error here. However, we can get error on out event as well
+ // on some platforms, so we'll simply handle both events in the same way.
+ out_event ();
+}
+
void zmq::tcp_connecter_t::out_event ()
{
rm_fd (handle);
diff -urwB libzmq-original/src/tcp_connecter.hpp libzmq/src/tcp_connecter.hpp
--- libzmq-original/src/tcp_connecter.hpp 2014-11-25 11:01:35.725930288 +0100
+++ libzmq/src/tcp_connecter.hpp 2014-11-25 11:03:41.050082146 +0100
@@ -55,6 +55,7 @@
// Handlers for I/O events.
void in_event ();
+ void in_event_ex (pollfd *);
void out_event ();
void timer_event (int id_);
diff -urwB libzmq-original/src/tcp_listener.cpp libzmq/src/tcp_listener.cpp
--- libzmq-original/src/tcp_listener.cpp 2014-11-25 11:01:35.725930288 +0100
+++ libzmq/src/tcp_listener.cpp 2014-11-25 11:03:41.050082146 +0100
@@ -114,6 +114,11 @@
socket->event_accepted (endpoint, fd);
}
+void zmq::tcp_listener_t::in_event_ex (pollfd *pfd)
+{
+ zmq::tcp_listener_t::in_event ();
+}
+
void zmq::tcp_listener_t::close ()
{
zmq_assert (s != retired_fd);
diff -urwB libzmq-original/src/tcp_listener.hpp libzmq/src/tcp_listener.hpp
--- libzmq-original/src/tcp_listener.hpp 2014-11-25 11:01:35.725930288 +0100
+++ libzmq/src/tcp_listener.hpp 2014-11-25 11:03:41.050082146 +0100
@@ -55,6 +55,7 @@
// Handlers for I/O events.
void in_event ();
+ void in_event_ex (pollfd *);
// Close the listening socket.
void close ();
diff -urwB libzmq-original/src/zmq.cpp libzmq/src/zmq.cpp
--- libzmq-original/src/zmq.cpp 2014-11-25 11:01:35.729930228 +0100
+++ libzmq/src/zmq.cpp 2014-11-25 13:41:42.364243253 +0100
@@ -263,6 +263,17 @@
return result;
}
+int zmq_getsockopt_ex (void *s_, int option_, void *optval_, size_t *optvallen_, pollfd *pfd_)
+{
+ if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
+ errno = ENOTSOCK;
+ return -1;
+ }
+ zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
+ int result = s->getsockopt_ex (option_, optval_, optvallen_, pfd_);
+ return result;
+}
+
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
@@ -761,10 +772,11 @@
// using the ZMQ_EVENTS socket option.
if (items_ [i].socket) {
size_t zmq_events_size = sizeof (uint32_t);
- uint32_t zmq_events;
- if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
- &zmq_events_size) == -1) {
- if (pollfds != spollfds)
+ uint32_t zmq_events = 0;
+
+ if (zmq_getsockopt_ex (items_ [i].socket, ZMQ_EVENTS,
+ &zmq_events,
+ &zmq_events_size, &pollfds [i]) == -1) {
free (pollfds);
return -1;
}
@@ -774,6 +786,9 @@
if ((items_ [i].events & ZMQ_POLLIN) &&
(zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN;
+ if ((items_ [i].events & ZMQ_POLLPRI) &&
+ (zmq_events & ZMQ_POLLPRI))
+ items_ [i].revents |= ZMQ_POLLPRI;
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
More information about the zeromq-dev
mailing list