[PATCH 1/2] added queue class to make zmq_queue device actually useful

jon (none) jon at ubiq.
Fri Mar 12 00:24:38 CET 2010


---
 devices/zmq_queue/zmq_queue.cpp |  116 +++++++++++++++++++++++++++++++++++++--
 1 files changed, 111 insertions(+), 5 deletions(-)

diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp
index a90aac7..9c45efe 100644
--- a/devices/zmq_queue/zmq_queue.cpp
+++ b/devices/zmq_queue/zmq_queue.cpp
@@ -20,6 +20,114 @@
 #include "../../bindings/cpp/zmq.hpp"
 #include "../../foreign/xmlParser/xmlParser.cpp"
 
+class queue
+{
+public:
+  queue (zmq::socket_t& reply, zmq::socket_t& request)
+    : xrep(reply), xreq(request)
+  {
+    items[0].socket = reply; // socket_t has operator void*
+    items[0].fd = 0;
+    items[0].events = ZMQ_POLLIN;
+    items[0].revents = 0;
+
+    items[1].socket = request; // socket_t has operator void*
+    items[1].fd = 0;
+    items[1].events = ZMQ_POLLIN;
+    items[1].revents = 0;
+
+    m_next_request_method = &queue::get_request;
+    m_next_response_method = &queue::get_response;
+        
+  }
+  
+  void run()
+  {
+    while (true) {
+      int rc = zmq::poll(&items[0],2,-1);
+      if (rc < 0) break;
+      next_request();
+      next_response();
+    }
+  }
+
+
+private:
+  void next_request()
+  {
+    (this->*m_next_request_method)();
+  }
+
+  void next_response()
+  {
+    (this->*m_next_response_method)();
+  }
+
+
+  void get_request()
+  {
+    if ( items[0].revents & ZMQ_POLLIN ) {
+      int rc = xrep.recv(&request_msg, ZMQ_NOBLOCK);
+      if (!rc) return;
+      items[0].events &= ~ZMQ_POLLIN;
+      items[1].events |= ZMQ_POLLOUT;
+      m_next_request_method = &queue::send_request;
+    }
+  }
+
+  void send_request()
+  {
+    if ( items[1].revents & ZMQ_POLLOUT) {
+	int rc = xreq.send(request_msg, ZMQ_NOBLOCK);
+	if (!rc) return;
+	items[1].events &= ~ZMQ_POLLOUT;
+	items[0].events |= ZMQ_POLLIN;
+	m_next_request_method = &queue::get_request;
+    }
+  }
+
+  void get_response()
+  {
+    if ( items[1].revents & ZMQ_POLLIN ) {
+      int rc = xreq.recv(&response_msg, ZMQ_NOBLOCK);
+      if (!rc) return;
+      items[1].events &= ~ZMQ_POLLIN;
+      items[0].events |= ZMQ_POLLOUT;
+      m_next_response_method = &queue::send_response;
+    }
+  }
+
+  void send_response()
+  {
+    if ( items[0].revents & ZMQ_POLLOUT) {
+      int rc = xrep.send(response_msg, ZMQ_NOBLOCK);
+      if (!rc) return;
+      items[0].events &= ~ZMQ_POLLOUT;
+      items[1].events |= ZMQ_POLLIN;
+      m_next_response_method = &queue::get_response;
+    }
+  }
+
+
+
+private:
+  zmq::socket_t & xrep;
+  zmq::socket_t & xreq;
+  zmq_pollitem_t items[2];
+  zmq::message_t request_msg;
+  zmq::message_t response_msg;
+        
+  typedef void (queue::*next_method)();
+  
+  next_method m_next_request_method;
+  next_method m_next_response_method;
+
+  queue (queue const &);
+  void operator = (queue const &);
+
+};
+
+
 int main (int argc, char *argv [])
 {
     if (argc != 2) {
@@ -112,11 +220,9 @@ int main (int argc, char *argv [])
         n++;
     }
 
-    zmq::message_t msg;
-    while (true) {
-        in_socket.recv (&msg);
-        out_socket.send (msg);
-    }
+    queue q(in_socket, out_socket);
+
+    q.run();
 
     return 0;
 }
-- 
1.6.3.3


--------------000303070906040308080600
Content-Type: text/x-patch;
 name="0002-tidy-up-zmq_queue-and-adjust-include-path-since-the-.patch"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
 filename*0="0002-tidy-up-zmq_queue-and-adjust-include-path-since-the-.pa";
 filename*1="tch"



More information about the zeromq-dev mailing list