[PATCH 1/2] added queue class to make zmq_queue device actually useful
jon (none)
jon at ubiq.
Fri Mar 12 00:24:38 CET 2010
---
devices/zmq_queue/zmq_queue.cpp | 116 +++++++++++++++++++++++++++++++++++++--
1 files changed, 111 insertions(+), 5 deletions(-)
diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp
index a90aac7..9c45efe 100644
--- a/devices/zmq_queue/zmq_queue.cpp
+++ b/devices/zmq_queue/zmq_queue.cpp
@@ -20,6 +20,114 @@
#include "../../bindings/cpp/zmq.hpp"
#include "../../foreign/xmlParser/xmlParser.cpp"
+class queue
+{
+public:
+ queue (zmq::socket_t& reply, zmq::socket_t& request)
+ : xrep(reply), xreq(request)
+ {
+ items[0].socket = reply; // socket_t has operator void*
+ items[0].fd = 0;
+ items[0].events = ZMQ_POLLIN;
+ items[0].revents = 0;
+
+ items[1].socket = request; // socket_t has operator void*
+ items[1].fd = 0;
+ items[1].events = ZMQ_POLLIN;
+ items[1].revents = 0;
+
+ m_next_request_method = &queue::get_request;
+ m_next_response_method = &queue::get_response;
+
+ }
+
+ void run()
+ {
+ while (true) {
+ int rc = zmq::poll(&items[0],2,-1);
+ if (rc < 0) break;
+ next_request();
+ next_response();
+ }
+ }
+
+
+private:
+ void next_request()
+ {
+ (this->*m_next_request_method)();
+ }
+
+ void next_response()
+ {
+ (this->*m_next_response_method)();
+ }
+
+
+ void get_request()
+ {
+ if ( items[0].revents & ZMQ_POLLIN ) {
+ int rc = xrep.recv(&request_msg, ZMQ_NOBLOCK);
+ if (!rc) return;
+ items[0].events &= ~ZMQ_POLLIN;
+ items[1].events |= ZMQ_POLLOUT;
+ m_next_request_method = &queue::send_request;
+ }
+ }
+
+ void send_request()
+ {
+ if ( items[1].revents & ZMQ_POLLOUT) {
+ int rc = xreq.send(request_msg, ZMQ_NOBLOCK);
+ if (!rc) return;
+ items[1].events &= ~ZMQ_POLLOUT;
+ items[0].events |= ZMQ_POLLIN;
+ m_next_request_method = &queue::get_request;
+ }
+ }
+
+ void get_response()
+ {
+ if ( items[1].revents & ZMQ_POLLIN ) {
+ int rc = xreq.recv(&response_msg, ZMQ_NOBLOCK);
+ if (!rc) return;
+ items[1].events &= ~ZMQ_POLLIN;
+ items[0].events |= ZMQ_POLLOUT;
+ m_next_response_method = &queue::send_response;
+ }
+ }
+
+ void send_response()
+ {
+ if ( items[0].revents & ZMQ_POLLOUT) {
+ int rc = xrep.send(response_msg, ZMQ_NOBLOCK);
+ if (!rc) return;
+ items[0].events &= ~ZMQ_POLLOUT;
+ items[1].events |= ZMQ_POLLIN;
+ m_next_response_method = &queue::get_response;
+ }
+ }
+
+
+
+private:
+ zmq::socket_t & xrep;
+ zmq::socket_t & xreq;
+ zmq_pollitem_t items[2];
+ zmq::message_t request_msg;
+ zmq::message_t response_msg;
+
+ typedef void (queue::*next_method)();
+
+ next_method m_next_request_method;
+ next_method m_next_response_method;
+
+ queue (queue const &);
+ void operator = (queue const &);
+
+};
+
+
int main (int argc, char *argv [])
{
if (argc != 2) {
@@ -112,11 +220,9 @@ int main (int argc, char *argv [])
n++;
}
- zmq::message_t msg;
- while (true) {
- in_socket.recv (&msg);
- out_socket.send (msg);
- }
+ queue q(in_socket, out_socket);
+
+ q.run();
return 0;
}
--
1.6.3.3
--------------000303070906040308080600
Content-Type: text/x-patch;
name="0002-tidy-up-zmq_queue-and-adjust-include-path-since-the-.patch"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
filename*0="0002-tidy-up-zmq_queue-and-adjust-include-path-since-the-.pa";
filename*1="tch"
More information about the zeromq-dev
mailing list