[zeromq-dev] Only first X messages received with PGM/0MQ c++ setup

Johnny Berentsen johnny.berentsen at gmail.com
Mon Apr 2 15:30:26 CEST 2018


Hi all

I've just started dabbling with a setup (c++) of OpenPGM/ZeroMQ + protobuf
so be gentle :)

Everything seems to work as expected except I find that only the first X
number of messages are sent when sending Y number of messages rapidly.

I don't have a small test example ready, but here is the important stuff:
Server:
m_cmdsocket = new zmq::socket_t(*m_context, ZMQ_PULL);
m_cmdsocket->setsockopt( ZMQ_RCVTIMEO, 100 );
m_cmdsocket->bind("tcp://*:5557");

m_fplsocket  = new zmq::socket_t(*m_context, ZMQ_PUB);
const int rate = 1000000;                              // 1Gb TX- and RX-
rate
m_fplsocket->setsockopt(ZMQ_RATE, &rate, sizeof(rate));
m_fplsocket->bind("epgm://vboxnet0;239.10.10.10:50002");
m_fplsocket->bind("ipc:///tmp/InNOVA/3105");


Client:
m_cmdsocket = new zmq::socket_t(*m_context, ZMQ_PUSH);
m_cmdsocket->connect("tcp://192.168.56.1:5557");

m_fplsocket  = new zmq::socket_t(*m_context, ZMQ_PUB);
m_fplsocket->setsockopt( ZMQ_RCVTIMEO, 100 );
const int rate = 1000000;                              // 1Gb TX- and RX-
rate
m_fplsocket->setsockopt(ZMQ_RATE, &rate, sizeof(rate));
m_fplsocket->bind("epgm://lanB;239.10.10.10:50002");

I'll try to make up a compilable small version if necessary.

So this is how it currently works:
A server and client process is started and exchanges some messages on both
channels (PUSH to PULL and PUB to SUB), and once we have stable
communication the server will try to transmit 2500 protobuf messages. The
first ~1995 messages are received by the client but the remaining are lost.
If I add a one millisecond delay
(std::this_thread::sleep_for(std::chrono::milliseconds(1));) all messages
are received. This is when running on same host (i.e. using IPC). If
sending between the host and a virtual machine, the version without delay
drops down to ~30 received messages, but gets all messages with the delay.
The protobuf message consists of some short strings and numbers and a large
random bitmask with a total size of 6160 bytes.

I'm assuming some kind of queue buffer on the sender side is full and the
remaining messages are discarded, but I am not sure how I should go about
handling this. This might be there somewhere in the docs in big letters,
but has still slipped through :/ The discussions I've found concerns tcp
pub/sub and HWM so I'm not sure how that relates to IPC/PGM pub/sub.

Also note that everything happens in one thread, so the receive side is not
blocking and will timeout after 100ms and continue with printing a line and
set some timer and then start receive again.

The sending loop is like this:
for each object:
    std::string protomsg;
    fpl.SerializeToString(&protomsg);
    int msgsize = protomsg.length();

    zmq::message_t fplmsg(msgsize);
    memcpy(fplmsg.data(), protomsg.data(), msgsize);
    m_fplsocket->send(*msg);

Thanks
Johnny
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20180402/6119e1a4/attachment.htm>


More information about the zeromq-dev mailing list