[zeromq-dev] subscribe a lot of topic , make the subscriber get data latency?

LN 58315149 at qq.com
Sat Mar 11 13:07:10 CET 2017


hi all !
 

 I have written a program to imitate the pub/sub of the securities market quotation data. and i found that when i subscribe a lot of topics, the subscriber get data latency. when subscirbe 50 topics , some tick data slow more than 100ms from the server, and i don't know why. Below is my code, and i user the black box pattern.
  
 the problem is that: (Windows Server 2008, 8 cores, sizeof(TDF_MARKET_DATA) = 300bytes)
 A: when i subscribe one topic from console(s 0600008,), the latency from server to client is 0 or 1 milliseconds     


, it is ok!(log compare from client to server 0600008)
  
 B: when i subscribe 100 topics from console(like s 0600002, 0600004, 0600006...), the latency sometimes 1ms and sometimes 10 ~ 60 milliseconds.
  
 C: when i subscribe 300 topics from console, the latency sometimes 1 ~ 20 seconds!!!
 And i found that the memory of client increase fast.
  
 and i don't know what's wrong with my program. 
  
 Looking forward to your reply!! Thanks a lot!
    


    

 
 
 

   
 -------------------------Server(modify from wuserver.cpp)---------------------------


 

  //
 //  Weather update server in C++
 //  Binds PUB socket to tcp://*:5556
 //  Publishes random weather updates
 //
 //  Olivier Chamoux <olivier.chamoux at fr.thalesgroup.com>
 //
 

 #include <stdio.h>
 #include <stdlib.h>
 #include <time.h>
 #include <zmq.hpp>
 

 #include <boost/asio.hpp>
 #include <boost/bind.hpp>
 #include <boost/random.hpp>
 #include <boost/thread.hpp>
 #include <boost/lexical_cast.hpp>
 #include <boost/tokenizer.hpp>
 #include <boost/filesystem.hpp>
 #include <boost/date_time/gregorian/gregorian.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>  
 

 #include <log4cpp/Category.hh>
 #include <log4cpp/Appender.hh>
 #include <log4cpp/Priority.hh>
 #include <log4cpp/FileAppender.hh>
 #include <log4cpp/PatternLayout.hh>
 #include <log4cpp/OstreamAppender.hh>
 

 #include "../TDFPub/src/TDF/TDFAPIStruct.h"
 

 #if (defined (WIN32))
 //#include <zhelpers.hpp>
 #endif
 

 #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
 

 log4cpp::Category& glog = log4cpp::Category::getRoot();
 

 int main ()
 {
 boost::filesystem::path log_path("./log/");
 if (!boost::filesystem::exists(log_path))
 {
 create_directory(log_path);
 }
 std::string local_day = boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day());
 boost::filesystem::path path_local_day("");
 path_local_day += log_path;
 path_local_day += local_day;
 if (!boost::filesystem::exists(path_local_day))
 {
 create_directory(path_local_day);
 }
 

 log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout();
 layout_c->setConversionPattern("%d: %p %c %x: %m%n");
 log4cpp::Appender *console_appender = new log4cpp::OstreamAppender("console", &std::cout);
 console_appender->setLayout(layout_c);
 

 const std::string str_log(path_local_day.string() + "/wuserver.log");
 log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout();
 layout_f->setConversionPattern("%d: %p %c %x: %m%n");
 log4cpp::Appender* file_appender = new log4cpp::FileAppender("main", str_log);
 file_appender->setLayout(layout_f);
 

 glog.addAppender(file_appender);
 glog.addAppender(console_appender); 
 glog.setPriority(log4cpp::Priority::DEBUG);
 

     //  Prepare our context and publisher
     zmq::context_t context (1);
     zmq::socket_t publisher (context, ZMQ_PUB);
 int hwm = 0;
 publisher.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
     publisher.bind("tcp://*:5556");
 

 boost::uniform_int<> distribution(1, 3000);
 boost::mt19937 engine;
 boost::variate_generator<boost::mt19937, boost::uniform_int<> > myrandom(engine, distribution);
     while (1)
 {
 int code;
 char prefix[8] = {0};
 TDF_MARKET_DATA tick;
 

 zmq::message_t message(7 + sizeof(TDF_MARKET_DATA)); 
 code = myrandom();
 code % 2 == 0 ? sprintf(prefix, "0600%03d", code) : sprintf(prefix, "1000%03d", code);
 
 //sprintf(data.szCode, "%07d", prefix);
 //memcpy(tick.szCode, prefix, 7);
 strcpy(tick.szCode, prefix);
 memcpy((char*)message.data(), prefix, 7);
 memcpy((char*)message.data() + 7, &tick, sizeof(TDF_MARKET_DATA));
 

 publisher.send(message);
 

 //if (strncmp(prefix, "1001271", 7) == 0)
 if (strncmp(prefix, "0600008", 7) == 0)
 {
 glog.info("send %s", prefix);
 } 
 

 boost::this_thread::sleep(boost::posix_time::microseconds(1000));// 1ms
 //Sleep(1);
     }
     return 0;
 }
  
  

 -------------------------Client(Black box pattern)---------------------------
 //
//  Weather update client in C++
//  Connects SUB socket to tcp://localhost:5556
//  Collects weather updates and finds avg temp in zipcode
//
//  Olivier Chamoux <olivier.chamoux at fr.thalesgroup.com>
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <sstream>
#include <boost/thread.hpp>
#include <boost/function.hpp>
#include "zhelpers.hpp"
 #include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "zhelpers.hpp"
 #include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/tokenizer.hpp>
#include <boost/filesystem.hpp>
#include <boost/date_time/gregorian/gregorian.hpp>
 #include <log4cpp/Category.hh>
#include <log4cpp/Appender.hh>
#include <log4cpp/Priority.hh>
#include <log4cpp/FileAppender.hh>
#include <log4cpp/PatternLayout.hh>
#include <log4cpp/OstreamAppender.hh>
 #include "../TDFPub/src/TDF/TDFAPIStruct.h"
 log4cpp::Category& glog = log4cpp::Category::getRoot();
 
class client_task
{
public:
 client_task()
  : context_(NULL)
 {
  context_ = zmq_ctx_new();
 } 
  void subscribe()
 {
  void* sender = zmq_socket(context_, ZMQ_PAIR);
  int rc = zmq_connect(sender, "inproc://input");
  assert(rc == 0);
   std::string msg;
  while (getline(std::cin, msg) != "")
  {   
   int size = zmq_send(sender, msg.c_str(), msg.length(), 0);
   assert(size != 0);
  }
 }
  void poll()
 {
  void* receiver = zmq_socket(context_, ZMQ_PAIR);
  int rc = zmq_bind(receiver, "inproc://input");
  assert(rc == 0);
   void* subscriber = zmq_socket(context_, ZMQ_SUB);
  int hwm = 0;
  rc = zmq_setsockopt(subscriber, ZMQ_RCVHWM, &hwm, sizeof(hwm));
  assert(rc == 0);
  rc = zmq_setsockopt(subscriber, ZMQ_SNDHWM, &hwm, sizeof(hwm));
  assert(rc == 0);
  rc = zmq_connect(subscriber, "tcp://localhost:5556");
   void* task_sender = zmq_socket(context_, ZMQ_PUSH);
  rc = zmq_bind(task_sender, "inproc://task_send");
   zmq_pollitem_t items [] = {
   {receiver, 0, ZMQ_POLLIN, 0},
   {subscriber, 0, ZMQ_POLLIN, 0}
  };
   while (true)
  {
   zmq_poll(&items[0], 2, -1);
    if (items[0].revents & ZMQ_POLLIN)
   {
    zmq_msg_t msg;
    zmq_msg_init(&msg);
     zmq_recvmsg(receiver, &msg, 0);
    char cmd = (static_cast<char*>(zmq_msg_data(&msg)))[0];
    std::string list(static_cast<char*>(zmq_msg_data(&msg)) + 2, zmq_msg_size(&msg) - 2);
     std::vector<std::string> vec_stocks;
    const char* sep = ",";
    const char* begin = list.c_str();
    char* p = NULL;
    p = strtok((char*)begin, sep);
    if (p)
    {
     vec_stocks.push_back(p);
    }
    else
    {
     continue;
    }
    while ((p = strtok(NULL, sep)) != NULL)
    {
     vec_stocks.push_back(p);
    }
     if (vec_stocks.size() == 0)
    {
     continue;
    }
     if (cmd == 's')
    {
     for (std::vector<std::string>::const_iterator it = vec_stocks.begin();
      it != vec_stocks.end(); it++)
     {
      rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, it->c_str(), it->length());
      assert(rc == 0);
     }     
    }
    else if (cmd == 'u')
    {
     for (std::vector<std::string>::const_iterator it = vec_stocks.begin();
      it != vec_stocks.end(); it++)
     {
      rc = zmq_setsockopt(subscriber, ZMQ_UNSUBSCRIBE, it->c_str(), it->length());
      assert(rc == 0);
     }     
    }
     zmq_msg_close(&msg);
   }
   if (items[1].revents & ZMQ_POLLIN)
   {
    zmq_msg_t msg;
    zmq_msg_init(&msg);
     zmq_recvmsg(subscriber, &msg, 0);
    /*if (memcmp(static_cast<char*>(zmq_msg_data(&msg)), "0600008", 7) == 0)
    {
     glog.info("recv");
    }*/
    
     //zmq_send(task_sender, &msg, zmq_msg_size(&msg), 0);
    zmq_sendmsg(task_sender, &msg, 0);
     zmq_msg_close(&msg);
   }
  }
 }
  void task_handle()
 {
  void* receiver = zmq_socket(context_, ZMQ_PULL);
  int rc = zmq_connect(receiver, "inproc://task_send");
  assert(rc == 0);
   while (true)
  {
   zmq_msg_t msg;
   zmq_msg_init(&msg);
    zmq_recvmsg(receiver, &msg, 0);
 
   char prefix[8] = {0};
   memcpy(prefix, static_cast<char*>(zmq_msg_data(&msg)), 7);    
    if (strncmp(prefix, "0600008", 7) == 0)
   {
    TDF_MARKET_DATA dst;
    memset(&dst, 0, sizeof(dst));
    memcpy(&dst, static_cast<char*>(zmq_msg_data(&msg)) + 7, sizeof(dst));
    glog.info("[%d] recv %s, szCode = %s",
     boost::this_thread::get_id(), prefix, dst.szCode);
   }
    zmq_msg_close(&msg);
  }
 }
  
private:
 void* context_;
};
 int main()
{
 boost::filesystem::path log_path("./log/");
 if (!boost::filesystem::exists(log_path))
 {
  create_directory(log_path);
 }
 std::string local_day = boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day());
 boost::filesystem::path path_local_day("");
 path_local_day += log_path;
 path_local_day += local_day;
 if (!boost::filesystem::exists(path_local_day))
 {
  create_directory(path_local_day);
 }

  log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout();
 layout_c->setConversionPattern("%d: %p %c %x: %m%n");
 log4cpp::Appender *console_appender = new log4cpp::OstreamAppender("console", &std::cout);
 console_appender->setLayout(layout_c);
  const std::string str_log(path_local_day.string() + "/wuclient.log");
 log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout();
 layout_f->setConversionPattern("%d: %p %c %x: %m%n");
 log4cpp::Appender* file_appender = new log4cpp::FileAppender("main", str_log);
 file_appender->setLayout(layout_f);
  glog.addAppender(file_appender);
 glog.addAppender(console_appender);  
 glog.setPriority(log4cpp::Priority::DEBUG);
  client_task ct;
 for (int i = 0; i < 2; i++)
 {
  boost::thread t_handle(boost::bind(&client_task::task_handle, &ct));
 }
  Sleep(2000);
 boost::thread t_poll(boost::bind(&client_task::poll, &ct));
  Sleep(500);
 boost::thread t_subscribe(boost::bind(&client_task::subscribe, &ct));
  t_poll.join();
 t_subscribe.join();
  return 0;
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20170311/ecd3179e/attachment.htm>


More information about the zeromq-dev mailing list