[zeromq-dev] adding timeout in zmq_poll

Pavel Gushcha pavimus at gmail.com
Tue Dec 29 16:33:50 CET 2009


>> I found discrepancy in zmq_poll manpage. in zmq_pollitem_t section
>> ZMQ_POLLIN and ZMQ_POLLOUT is discussed, but in example section only
>> POLLIN is used.
>
> The problem is that until watermarks are ported to 0MQ/2.0 you can _always_
> write to a socket. Thus polling for ZMQ_POLLOUT never blocks at the moment.
>

I think that piece of man page with example:

zmq_pollitem_t items [2];
items [0].socket = s;
items [0].events = POLLIN;
items [1].socket = NULL;
items [1].fd = my_fd;
items [1].events = POLLIN;

should be written as:

zmq_pollitem_t items [2];
items [0].socket = s;
items [0].events = ZMQ_POLLIN;
items [1].socket = NULL;
items [1].fd = my_fd;
items [1].events = POLLIN;

Patch for adding timeout to zmq_poll is attached. i added long timeout
parameter instead unsigned long. In this case developers can specify
negative value to get commonly used behaviour.

I tested patch under linux and it works. Testing programs i attached
too (they are modified sources of chat example). Running:
./display tcp://127.0.0.1:5000
./prompt tcp://127.0.0.1:5000 nickname

I updated common lisp bindings but i can't check it (i'm not familiar
with Lisp). May be Vitaly may do this?

I tried to test code under windows, but without success. I can't
compile original version of ZMQ2 sources with VS 2008 Express edition
as described in http://www.zeromq.org/area:download-v20-alpha3#toc10.
I get errors like:

x:\temp\_1\zeromq2\src\socket_base.cpp(334) : error C2065:
'EINPROGRESS' : undeclared identifier
x:\temp\_1\zeromq2\src\sub.cpp(147) : error C2065: 'EINPROGRESS' :
undeclared identifier

But modifications in linux/windows parts of zmq_poll() are similar, i
hope all will be ok.

Martin, please review patch and commit it to trunk (under required MIT
license) if there no warnings.
-------------- next part --------------
diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h
index bda6c01..75d1671 100644
--- a/bindings/c/zmq.h
+++ b/bindings/c/zmq.h
@@ -199,7 +199,7 @@ typedef struct
     short revents;
 } zmq_pollitem_t;
 
-ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems);
+ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
 
 ////////////////////////////////////////////////////////////////////////////////
 //  Helper functions.
diff --git a/bindings/cl/zeromq.lisp b/bindings/cl/zeromq.lisp
index 90b42da..0bff8b2 100644
--- a/bindings/cl/zeromq.lisp
+++ b/bindings/cl/zeromq.lisp
@@ -231,7 +231,9 @@
 
 (defcfun ("zmq_poll" %poll) :int
   (items	:pointer)
-  (nitems	:int))
+  (nitems	:int)
+  (timeout	:long))
+
 
 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 ;;  Helper functions.
diff --git a/bindings/cpp/zmq.hpp b/bindings/cpp/zmq.hpp
index 4349f0b..11ec0b3 100644
--- a/bindings/cpp/zmq.hpp
+++ b/bindings/cpp/zmq.hpp
@@ -33,9 +33,9 @@ namespace zmq
     typedef zmq_free_fn free_fn;
     typedef zmq_pollitem_t pollitem_t;
 
-    inline int poll (zmq_pollitem_t *items_, int nitems_)
+    inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
     {
-        return zmq_poll (items_, nitems_);
+        return zmq_poll (items_, nitems_, timeout_);
     }
 
     class error_t : public std::exception
diff --git a/man/man3/zmq_poll.3 b/man/man3/zmq_poll.3
index 5ce5055..7fe86bd 100644
--- a/man/man3/zmq_poll.3
+++ b/man/man3/zmq_poll.3
@@ -2,7 +2,7 @@
 .SH NAME
 zmq_poll \- polls for events on a set of 0MQ and POSIX sockets
 .SH SYNOPSIS
-.B int zmq_poll (zmq_pollitem_t *items, int nitems);
+.B int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
 .SH DESCRIPTION
 Waits for the events specified by 
 .IR items
@@ -39,7 +39,9 @@ poll for incoming messages.
 wait while message can be set socket. Poll will return if a message of at least
 one byte can be written to the socket. However, there is no guarantee that
 arbitrarily large message can be sent.
-
+.IR timeout
+The timeout argument specifies an upper limit on the time for which zmq_poll() will block, 
+in microseconds. Specifying a negative value in timeout means an infinite timeout.  
 .SH RETURN VALUE
 Function returns number of items signaled or -1 in the case of error.
 .SH ERRORS
diff --git a/src/zmq.cpp b/src/zmq.cpp
index cce07af..dc3adfa 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -264,13 +264,12 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
     return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
 }
 
-int zmq_poll (zmq_pollitem_t *items_, int nitems_)
+int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout)
 {
 #if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
     defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
     defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
     defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX
-
     pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
     zmq_assert (pollfds);
     int npollfds = 0;
@@ -322,17 +321,21 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_)
     }
 
     int nevents = 0;
-    bool initial = true;
-    while (!nevents) {
+    int loop_number = 0;
+    int poll_timeout = timeout > 0 ? timeout / 1000 : -1;
+    while (!nevents && (loop_number < 2 || timeout < 0) ) {
 
         //  Wait for activity. In the first iteration just check for events,
         //  don't wait. Waiting would prevent exiting on any events that may
         //  already be signaled on 0MQ sockets.
-        int rc = poll (pollfds, npollfds, initial ? 0 : -1);
-        if (rc == -1 && errno == EINTR)
+        int rc = poll (pollfds, npollfds, loop_number==0 ? 0 : poll_timeout);
+        if (rc == -1 && errno == EINTR) {
+            if (loop_number > 0) loop_number++;
             continue;
+        }
         errno_assert (rc >= 0);
-        initial = false;
+	
+        loop_number++;
 
         //  Process 0MQ commands if needed.
         if (nsockets && pollfds [npollfds -1].revents & POLLIN)
@@ -427,24 +430,28 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_)
     }
 
     int nevents = 0;
-    bool initial = true;
-    while (!nevents) {
+    int loop_number = 0;
+    timeval poll_timeout = {timeout / 1000000, timeout % 1000000};
+    timeval zero_timeout = {0, 0};
+    
+    while (!nevents && (loop_number < 2 || timeout < 0) ) {
 
         //  Wait for activity. In the first iteration just check for events,
         //  don't wait. Waiting would prevent exiting on any events that may
         //  already be signaled on 0MQ sockets.
-        timeval timeout = {0, 0};
         int rc = select (maxfd, &pollset_in, &pollset_out, &pollset_err,
-            initial ? &timeout : NULL);
+            loop_number==0 ? &zero_timeout : (timeout >=0 ? &poll_timeout :NULL) );
 #if defined ZMQ_HAVE_WINDOWS
         wsa_assert (rc != SOCKET_ERROR);
 #else
-        if (rc == -1 && errno == EINTR)
+        if (rc == -1 && errno == EINTR) {
+            if (loop_number > 0) loop_number++;
             continue;
+        }
 #endif
 
         errno_assert (rc >= 0);
-        initial = false;
+        loop_number++;
 
         //  Process 0MQ commands if needed.
         if (nsockets && FD_ISSET (notify_fd, &pollset_in))
-------------- next part --------------
A non-text attachment was scrubbed...
Name: display.cpp
Type: text/x-c++src
Size: 1694 bytes
Desc: not available
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20091229/e97756f1/attachment.cpp>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: prompt.cpp
Type: text/x-c++src
Size: 2015 bytes
Desc: not available
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20091229/e97756f1/attachment-0001.cpp>


More information about the zeromq-dev mailing list