[zeromq-dev] subscribe a lot of topic , make the subscriber get data latency?

Francesco francesco.montorsi at gmail.com
Sat Mar 11 17:32:47 CET 2017


Hi,
I think you are setting the HWM to 0, making ZMQ queues of infinite
size (limited by the RAM you have)... this may explain your issue? I
mean: the queues get full and they start growing leading to 1)
increased memory usage, 2) increasing latency...

HTH,
Francesco


2017-03-11 13:07 GMT+01:00 LN <58315149 at qq.com>:
> hi all !
>
> I have written a program to imitate the pub/sub of the securities market
> quotation data. and i found that when i subscribe a lot of topics, the
> subscriber get data latency. when subscirbe 50 topics , some tick data slow
> more than 100ms from the server, and i don't know why. Below is my code, and
> i user the black box pattern.
>
> the problem is that: (Windows Server 2008, 8 cores, sizeof(TDF_MARKET_DATA)
> = 300bytes)
> A: when i subscribe one topic from console(s 0600008,), the latency from
> server to client is 0 or 1 milliseconds
> , it is ok!(log compare from client to server 0600008)
>
> B: when i subscribe 100 topics from console(like s 0600002, 0600004,
> 0600006...), the latency sometimes 1ms and sometimes 10 ~ 60 milliseconds.
>
> C: when i subscribe 300 topics from console, the latency sometimes 1 ~ 20
> seconds!!!
> And i found that the memory of client increase fast.
>
> and i don't know what's wrong with my program.
>
> Looking forward to your reply!! Thanks a lot!
>
> -------------------------Server(modify from
> wuserver.cpp)---------------------------
>
> //
> //  Weather update server in C++
> //  Binds PUB socket to tcp://*:5556
> //  Publishes random weather updates
> //
> //  Olivier Chamoux <olivier.chamoux at fr.thalesgroup.com>
> //
>
> #include <stdio.h>
> #include <stdlib.h>
> #include <time.h>
> #include <zmq.hpp>
>
> #include <boost/asio.hpp>
> #include <boost/bind.hpp>
> #include <boost/random.hpp>
> #include <boost/thread.hpp>
> #include <boost/lexical_cast.hpp>
> #include <boost/tokenizer.hpp>
> #include <boost/filesystem.hpp>
> #include <boost/date_time/gregorian/gregorian.hpp>
> #include <boost/date_time/posix_time/posix_time.hpp>
>
> #include <log4cpp/Category.hh>
> #include <log4cpp/Appender.hh>
> #include <log4cpp/Priority.hh>
> #include <log4cpp/FileAppender.hh>
> #include <log4cpp/PatternLayout.hh>
> #include <log4cpp/OstreamAppender.hh>
>
> #include "../TDFPub/src/TDF/TDFAPIStruct.h"
>
> #if (defined (WIN32))
> //#include <zhelpers.hpp>
> #endif
>
> #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
>
> log4cpp::Category& glog = log4cpp::Category::getRoot();
>
> int main ()
> {
> boost::filesystem::path log_path("./log/");
> if (!boost::filesystem::exists(log_path))
> {
> create_directory(log_path);
> }
> std::string local_day =
> boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day());
> boost::filesystem::path path_local_day("");
> path_local_day += log_path;
> path_local_day += local_day;
> if (!boost::filesystem::exists(path_local_day))
> {
> create_directory(path_local_day);
> }
>
> log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout();
> layout_c->setConversionPattern("%d: %p %c %x: %m%n");
> log4cpp::Appender *console_appender = new
> log4cpp::OstreamAppender("console", &std::cout);
> console_appender->setLayout(layout_c);
>
> const std::string str_log(path_local_day.string() + "/wuserver.log");
> log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout();
> layout_f->setConversionPattern("%d: %p %c %x: %m%n");
> log4cpp::Appender* file_appender = new log4cpp::FileAppender("main",
> str_log);
> file_appender->setLayout(layout_f);
>
> glog.addAppender(file_appender);
> glog.addAppender(console_appender);
> glog.setPriority(log4cpp::Priority::DEBUG);
>
>     //  Prepare our context and publisher
>     zmq::context_t context (1);
>     zmq::socket_t publisher (context, ZMQ_PUB);
> int hwm = 0;
> publisher.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
>     publisher.bind("tcp://*:5556");
>
> boost::uniform_int<> distribution(1, 3000);
> boost::mt19937 engine;
> boost::variate_generator<boost::mt19937, boost::uniform_int<> >
> myrandom(engine, distribution);
>     while (1)
> {
> int code;
> char prefix[8] = {0};
> TDF_MARKET_DATA tick;
>
> zmq::message_t message(7 + sizeof(TDF_MARKET_DATA));
> code = myrandom();
> code % 2 == 0 ? sprintf(prefix, "0600%03d", code) : sprintf(prefix,
> "1000%03d", code);
> //sprintf(data.szCode, "%07d", prefix);
> //memcpy(tick.szCode, prefix, 7);
> strcpy(tick.szCode, prefix);
> memcpy((char*)message.data(), prefix, 7);
> memcpy((char*)message.data() + 7, &tick, sizeof(TDF_MARKET_DATA));
>
> publisher.send(message);
>
> //if (strncmp(prefix, "1001271", 7) == 0)
> if (strncmp(prefix, "0600008", 7) == 0)
> {
> glog.info("send %s", prefix);
> }
>
> boost::this_thread::sleep(boost::posix_time::microseconds(1000));// 1ms
> //Sleep(1);
>     }
>     return 0;
> }
>
>
> -------------------------Client(Black box
> pattern)---------------------------
> //
> //  Weather update client in C++
> //  Connects SUB socket to tcp://localhost:5556
> //  Collects weather updates and finds avg temp in zipcode
> //
> //  Olivier Chamoux <olivier.chamoux at fr.thalesgroup.com>
> //
> #include <zmq.hpp>
> #include <string>
> #include <iostream>
> #include <sstream>
> #include <boost/thread.hpp>
> #include <boost/function.hpp>
> #include "zhelpers.hpp"
> #include <stdio.h>
> #include <stdlib.h>
> #include <time.h>
> #include "zhelpers.hpp"
> #include <boost/asio.hpp>
> #include <boost/bind.hpp>
> #include <boost/thread.hpp>
> #include <boost/lexical_cast.hpp>
> #include <boost/tokenizer.hpp>
> #include <boost/filesystem.hpp>
> #include <boost/date_time/gregorian/gregorian.hpp>
> #include <log4cpp/Category.hh>
> #include <log4cpp/Appender.hh>
> #include <log4cpp/Priority.hh>
> #include <log4cpp/FileAppender.hh>
> #include <log4cpp/PatternLayout.hh>
> #include <log4cpp/OstreamAppender.hh>
> #include "../TDFPub/src/TDF/TDFAPIStruct.h"
> log4cpp::Category& glog = log4cpp::Category::getRoot();
>
> class client_task
> {
> public:
>  client_task()
>   : context_(NULL)
>  {
>   context_ = zmq_ctx_new();
>  }
>  void subscribe()
>  {
>   void* sender = zmq_socket(context_, ZMQ_PAIR);
>   int rc = zmq_connect(sender, "inproc://input");
>   assert(rc == 0);
>   std::string msg;
>   while (getline(std::cin, msg) != "")
>   {
>    int size = zmq_send(sender, msg.c_str(), msg.length(), 0);
>    assert(size != 0);
>   }
>  }
>  void poll()
>  {
>   void* receiver = zmq_socket(context_, ZMQ_PAIR);
>   int rc = zmq_bind(receiver, "inproc://input");
>   assert(rc == 0);
>   void* subscriber = zmq_socket(context_, ZMQ_SUB);
>   int hwm = 0;
>   rc = zmq_setsockopt(subscriber, ZMQ_RCVHWM, &hwm, sizeof(hwm));
>   assert(rc == 0);
>   rc = zmq_setsockopt(subscriber, ZMQ_SNDHWM, &hwm, sizeof(hwm));
>   assert(rc == 0);
>   rc = zmq_connect(subscriber, "tcp://localhost:5556");
>   void* task_sender = zmq_socket(context_, ZMQ_PUSH);
>   rc = zmq_bind(task_sender, "inproc://task_send");
>   zmq_pollitem_t items [] = {
>    {receiver, 0, ZMQ_POLLIN, 0},
>    {subscriber, 0, ZMQ_POLLIN, 0}
>   };
>   while (true)
>   {
>    zmq_poll(&items[0], 2, -1);
>    if (items[0].revents & ZMQ_POLLIN)
>    {
>     zmq_msg_t msg;
>     zmq_msg_init(&msg);
>     zmq_recvmsg(receiver, &msg, 0);
>     char cmd = (static_cast<char*>(zmq_msg_data(&msg)))[0];
>     std::string list(static_cast<char*>(zmq_msg_data(&msg)) + 2,
> zmq_msg_size(&msg) - 2);
>     std::vector<std::string> vec_stocks;
>     const char* sep = ",";
>     const char* begin = list.c_str();
>     char* p = NULL;
>     p = strtok((char*)begin, sep);
>     if (p)
>     {
>      vec_stocks.push_back(p);
>     }
>     else
>     {
>      continue;
>     }
>     while ((p = strtok(NULL, sep)) != NULL)
>     {
>      vec_stocks.push_back(p);
>     }
>     if (vec_stocks.size() == 0)
>     {
>      continue;
>     }
>     if (cmd == 's')
>     {
>      for (std::vector<std::string>::const_iterator it = vec_stocks.begin();
>       it != vec_stocks.end(); it++)
>      {
>       rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, it->c_str(),
> it->length());
>       assert(rc == 0);
>      }
>     }
>     else if (cmd == 'u')
>     {
>      for (std::vector<std::string>::const_iterator it = vec_stocks.begin();
>       it != vec_stocks.end(); it++)
>      {
>       rc = zmq_setsockopt(subscriber, ZMQ_UNSUBSCRIBE, it->c_str(),
> it->length());
>       assert(rc == 0);
>      }
>     }
>     zmq_msg_close(&msg);
>    }
>    if (items[1].revents & ZMQ_POLLIN)
>    {
>     zmq_msg_t msg;
>     zmq_msg_init(&msg);
>     zmq_recvmsg(subscriber, &msg, 0);
>     /*if (memcmp(static_cast<char*>(zmq_msg_data(&msg)), "0600008", 7) == 0)
>     {
>      glog.info("recv");
>     }*/
>
>     //zmq_send(task_sender, &msg, zmq_msg_size(&msg), 0);
>     zmq_sendmsg(task_sender, &msg, 0);
>     zmq_msg_close(&msg);
>    }
>   }
>  }
>  void task_handle()
>  {
>   void* receiver = zmq_socket(context_, ZMQ_PULL);
>   int rc = zmq_connect(receiver, "inproc://task_send");
>   assert(rc == 0);
>   while (true)
>   {
>    zmq_msg_t msg;
>    zmq_msg_init(&msg);
>    zmq_recvmsg(receiver, &msg, 0);
>
>    char prefix[8] = {0};
>    memcpy(prefix, static_cast<char*>(zmq_msg_data(&msg)), 7);
>    if (strncmp(prefix, "0600008", 7) == 0)
>    {
>     TDF_MARKET_DATA dst;
>     memset(&dst, 0, sizeof(dst));
>     memcpy(&dst, static_cast<char*>(zmq_msg_data(&msg)) + 7, sizeof(dst));
>     glog.info("[%d] recv %s, szCode = %s",
>      boost::this_thread::get_id(), prefix, dst.szCode);
>    }
>    zmq_msg_close(&msg);
>   }
>  }
>
> private:
>  void* context_;
> };
> int main()
> {
>  boost::filesystem::path log_path("./log/");
>  if (!boost::filesystem::exists(log_path))
>  {
>   create_directory(log_path);
>  }
>  std::string local_day =
> boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day());
>  boost::filesystem::path path_local_day("");
>  path_local_day += log_path;
>  path_local_day += local_day;
>  if (!boost::filesystem::exists(path_local_day))
>  {
>   create_directory(path_local_day);
>  }
>  log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout();
>  layout_c->setConversionPattern("%d: %p %c %x: %m%n");
>  log4cpp::Appender *console_appender = new
> log4cpp::OstreamAppender("console", &std::cout);
>  console_appender->setLayout(layout_c);
>  const std::string str_log(path_local_day.string() + "/wuclient.log");
>  log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout();
>  layout_f->setConversionPattern("%d: %p %c %x: %m%n");
>  log4cpp::Appender* file_appender = new log4cpp::FileAppender("main",
> str_log);
>  file_appender->setLayout(layout_f);
>  glog.addAppender(file_appender);
>  glog.addAppender(console_appender);
>  glog.setPriority(log4cpp::Priority::DEBUG);
>  client_task ct;
>  for (int i = 0; i < 2; i++)
>  {
>   boost::thread t_handle(boost::bind(&client_task::task_handle, &ct));
>  }
>  Sleep(2000);
>  boost::thread t_poll(boost::bind(&client_task::poll, &ct));
>  Sleep(500);
>  boost::thread t_subscribe(boost::bind(&client_task::subscribe, &ct));
>  t_poll.join();
>  t_subscribe.join();
>  return 0;
> }
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev



More information about the zeromq-dev mailing list