[zeromq-dev] vector io for TCP sockets

Auer, Jens jens.auer at cgi.com
Tue May 24 15:22:07 CEST 2016


Hi,

I have been thinking on how to change the stream_engine_t class to use vector io to save copying the message data into the send buffer. The basic idea is to send the data directly from the messages in the queue. To do this, the headers must be created in an independent array and the lock-free queue needs to provide an iterator to traverse the elements in the queue. I think this should be possible with the current implementation because it is a producer/consumer queue where only one side can remove items, and this is the stream_engine. The other side can only add new elements, and that should not invalidate the iterator (which is probably a pointer to the current element). 

Every message then uses two io_vec elements, the first pointing to the header data and the second one pointing to the message data.

I've tried to write down a pseudo-code implementation of the algorithm:
- The write buffer in stream_engine_t is removed.
- stream_engine_t gets an array iovecs of IOV_MAX io_vec elements
- stream_engine_t gets an array headerbuf of IOV_MAX/2 * 10 bytes to hold the headers 
- stream_engine_t stores an offset counting the already sent bytes of the first message to be sent

When called, out_event will check if the mailbox Q is not empty. If so:
char* h = &headerbuf[0];
io_vec* v = &iovecs[0];
msg = Q.cbegin();
last_msg = Q.cend();

bool firstMessage = true;
while (number of processed messages <= IOV_MAX/2 && msg != last_msg) {
    hlen = create_header_at(h, *msg); // hlen == 2 || hlen == 10
    h += hlen;

    // send header 
    v->iov_base = h;
    v->iov_len = hlen;
    v += 1;

    // send data
    v->io_base = msg->data() + offset;
    offset = 0;
    v->iov_len = msg->size();        
    v += 1;

    ++msg;
}

// v - iovecs is the number of created io_vec elements 
ssize_t sent = writev(socket, iovecs, (v - iovecs) );

// error checking ...

// no pop messages from the queue and check if there is a remainder
while ( (sent - Q.front().size() > 0) {
    msg_t msg = Q.pop();
    sent -= msg.size();
}
if (sent >= 0) {
    offset = sent;
}

It looks reasonable to me, but I am not sure about the implications, especially when considering different message sizes. For very small messages, this will probably be slower than the original stream_engine. IOV_MAX is 1024 on my Linux machine, so for one-byte messages this would send 512 bytes of message data with one syscall, compared to 8192/3=2730 messages with the old implementation. There should be a message size where it will get faster, but I have not much experience with vector io to guess anything. Maybe somebody could provide some feedback?

Best wishes,
  Jens

--
Dr. Jens Auer | CGI | Software Engineer
CGI Deutschland Ltd. & Co. KG
Rheinstraße 95 | 64295 Darmstadt | Germany
T: +49 6151 36860 154
jens.auer at cgi.com
Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter de.cgi.com/pflichtangaben.

CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to CGI Group Inc. and its affiliates may be contained in this message. If you are not a recipient indicated or intended in this message (or responsible for delivery of this message to such person), or you think for any reason that this message may have been addressed to you in error, you may not use or copy or deliver this message to anyone else. In such case, you should destroy this message and are asked to notify the sender by reply e-mail.





More information about the zeromq-dev mailing list