[zeromq-dev] Only first X messages received with PGM/0MQ c++ setup

Luca Boccassi luca.boccassi at gmail.com
Mon Apr 16 14:35:43 CEST 2018


On Mon, 2018-04-16 at 13:54 +0200, Johnny Berentsen wrote:
> 2018-04-02 15:30 GMT+02:00 Johnny Berentsen <johnny.berentsen at gmail.c
> om>:
> 
> > Hi all
> > <snip initial explanation>
> > 
> 
> So I made a small example showing my problem. Attached server.cc and
> client.cc at the end.
> Now I have a vector with size 2500 of std::strings filled with 4096 #
> characters. Every ten seconds I loop through the vector and send the
> data
> over a EPGM socket. If I go full throttle, only the first 1000
> messages are
> received, but if I add a usleep(1) after each send, all messages are
> received.
> I'm assuming there's something really simple missing in my setup, but
> I am
> unable to figure it out.

Default high water mark is 1000 messages, most likely the receiving
side is too slow to process and so it starts dropping messages

> server.cc:
> ------------
> #include <iostream>
> #include <sstream>
> #include <unistd.h>
> #include <zmq.hpp>
> 
> int main(int argc, char *argv[])
> {
>     std::stringstream connstr;
>     connstr << "epgm://";
>     if( argc > 0 )   connstr << argv[1] << ";239.10.10.10:50000";
>     else             { std::cout << "Missing argument" << std::endl;
> exit(1); }
> 
>     std::cout << "Initializing network to " << connstr.str() <<
> std::endl;
>     zmq::context_t *context   = new zmq::context_t(1);
>     zmq::socket_t  *fplsocket = new zmq::socket_t(*context, ZMQ_PUB);
>     const int rate = 1000000;                              // 1Gb TX-
> and
> RX- rate
>     fplsocket->setsockopt(ZMQ_RATE, &rate, sizeof(rate));
>     fplsocket->bind(connstr.str());
>     std::cout << "Network initialized" << std::endl;
> 
>     std::vector<std::string> blobs;
>     int len=4096;
>     for( auto i=0 ; i < 2500 ; i++ )
>     {
>         std::string str(len, '#');
>         blobs.push_back(str);
>     }
> 
>     while( 1 )
>     {
>         sleep(10);
>         std::cout << "Sending " << blobs.size() << " messages" <<
> std::endl;
>         for( std::vector<std::string>::iterator it = blobs.begin() ;
> it !=
> blobs.end() ; it++ )
>         {
>             zmq::message_t fplmsg(it->length());
>             memcpy(fplmsg.data(), it->data(), it->length());
>             fplsocket->send(fplmsg);
>             // usleep(1);
>         }
>     }
> }
> 
> client.cc:
> -----------
> #include <iostream>
> #include <sstream>
> #include <unistd.h>
> #include <zmq.hpp>
> 
> int main(int argc, char *argv[])
> {
>     std::stringstream connstr;
>     connstr << "epgm://";
>     if( argc > 0 )   connstr << argv[1] << ";239.10.10.10:50000";
>     else             { std::cout << "Missing argument" << std::endl;
> exit(1); }
> 
>     std::cout << "Initializing network to " << connstr.str() <<
> std::endl;
>     zmq::context_t *context   = new zmq::context_t(1);
>     zmq::socket_t  *fplsocket = new zmq::socket_t(*context, ZMQ_SUB);
>     fplsocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
>     fplsocket->connect(connstr.str());
>     std::cout << "Network initialized" << std::endl;
> 
>     int i=0;
>     int rc;
>     zmq::message_t resultset;
>     while( 1 )
>     {
>         if( (rc = fplsocket->recv(&resultset)) == true)
>         {
>             i++;
>             std::string msg_str(static_cast<char*>(resultset.data()),
> resultset.size());
>             std::cout << "Received msg " << i << " with size: " <<
> msg_str.length() << std::endl;
>         }
>     }
> }
> 
> Compilation:
> g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC
> -DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o server.o
> server.cc
> g++ -m64 -o server server.o -lzmq -lpgm -lprotobuf
> g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC
> -DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o client.o
> client.cc
> g++ -m64 -o client client.o -lzmq -lpgm -lprotobuf
> 
> # Start on machine 1
> ./server <interface name>
> # Start on machine 2
> ./client <interface name>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev

-- 
Kind regards,
Luca Boccassi
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20180416/9d33b98f/attachment.sig>


More information about the zeromq-dev mailing list