[zeromq-dev] [PATCH] make streamer and forwarder honour the more flag
Jon Dyte
jon at totient.co.uk
Thu Aug 26 22:14:53 CEST 2010
Submitted under the MIT/X11 License
please review, but it looks straightforward enough.
Jon
diff --git a/src/forwarder.cpp b/src/forwarder.cpp
index 503868b..e7f1eb9 100644
--- a/src/forwarder.cpp
+++ b/src/forwarder.cpp
@@ -29,6 +29,9 @@ int zmq::forwarder (socket_base_t *insocket_,
socket_base_t *outsocket_)
int rc = zmq_msg_init (&msg);
errno_assert (rc == 0);
+ int64_t more;
+ size_t more_sz = sizeof(more);
+
while (true) {
rc = insocket_->recv (&msg, 0);
if (rc < 0) {
@@ -37,7 +40,14 @@ int zmq::forwarder (socket_base_t *insocket_,
socket_base_t *outsocket_)
errno_assert (false);
}
- rc = outsocket_->send (&msg, 0);
+ rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz);
+ if (rc < 0) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
+
+ rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
if (rc < 0) {
if (errno == ETERM)
return -1;
diff --git a/src/streamer.cpp b/src/streamer.cpp
index 9799007..22c8d42 100644
--- a/src/streamer.cpp
+++ b/src/streamer.cpp
@@ -29,6 +29,9 @@ int zmq::streamer (socket_base_t *insocket_,
socket_base_t *outsocket_)
int rc = zmq_msg_init (&msg);
errno_assert (rc == 0);
+ int64_t more;
+ size_t more_sz = sizeof(more);
+ while (true) {
rc = insocket_->recv (&msg, 0);
if (rc < 0) {
@@ -36,8 +39,15 @@ int zmq::streamer (socket_base_t *insocket_,
socket_base_t *outsocket_)
return -1;
errno_assert (false);
}
+ + rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz);
+ if (rc < 0) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
- rc = outsocket_->send (&msg, 0);
+ rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
if (rc < 0) {
if (errno == ETERM)
return -1;
More information about the zeromq-dev
mailing list