[zeromq-dev] subscribe a lot of topic , make the subscriber get data latency?
Francesco
francesco.montorsi at gmail.com
Sat Mar 11 17:32:47 CET 2017
Hi,
I think you are setting the HWM to 0, making ZMQ queues of infinite
size (limited by the RAM you have)... this may explain your issue? I
mean: the queues get full and they start growing leading to 1)
increased memory usage, 2) increasing latency...
HTH,
Francesco
2017-03-11 13:07 GMT+01:00 LN <58315149 at qq.com>:
> hi all !
>
> I have written a program to imitate the pub/sub of the securities market
> quotation data. and i found that when i subscribe a lot of topics, the
> subscriber get data latency. when subscirbe 50 topics , some tick data slow
> more than 100ms from the server, and i don't know why. Below is my code, and
> i user the black box pattern.
>
> the problem is that: (Windows Server 2008, 8 cores, sizeof(TDF_MARKET_DATA)
> = 300bytes)
> A: when i subscribe one topic from console(s 0600008,), the latency from
> server to client is 0 or 1 milliseconds
> , it is ok!(log compare from client to server 0600008)
>
> B: when i subscribe 100 topics from console(like s 0600002, 0600004,
> 0600006...), the latency sometimes 1ms and sometimes 10 ~ 60 milliseconds.
>
> C: when i subscribe 300 topics from console, the latency sometimes 1 ~ 20
> seconds!!!
> And i found that the memory of client increase fast.
>
> and i don't know what's wrong with my program.
>
> Looking forward to your reply!! Thanks a lot!
>
> -------------------------Server(modify from
> wuserver.cpp)---------------------------
>
> //
> // Weather update server in C++
> // Binds PUB socket to tcp://*:5556
> // Publishes random weather updates
> //
> // Olivier Chamoux <olivier.chamoux at fr.thalesgroup.com>
> //
>
> #include <stdio.h>
> #include <stdlib.h>
> #include <time.h>
> #include <zmq.hpp>
>
> #include <boost/asio.hpp>
> #include <boost/bind.hpp>
> #include <boost/random.hpp>
> #include <boost/thread.hpp>
> #include <boost/lexical_cast.hpp>
> #include <boost/tokenizer.hpp>
> #include <boost/filesystem.hpp>
> #include <boost/date_time/gregorian/gregorian.hpp>
> #include <boost/date_time/posix_time/posix_time.hpp>
>
> #include <log4cpp/Category.hh>
> #include <log4cpp/Appender.hh>
> #include <log4cpp/Priority.hh>
> #include <log4cpp/FileAppender.hh>
> #include <log4cpp/PatternLayout.hh>
> #include <log4cpp/OstreamAppender.hh>
>
> #include "../TDFPub/src/TDF/TDFAPIStruct.h"
>
> #if (defined (WIN32))
> //#include <zhelpers.hpp>
> #endif
>
> #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
>
> log4cpp::Category& glog = log4cpp::Category::getRoot();
>
> int main ()
> {
> boost::filesystem::path log_path("./log/");
> if (!boost::filesystem::exists(log_path))
> {
> create_directory(log_path);
> }
> std::string local_day =
> boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day());
> boost::filesystem::path path_local_day("");
> path_local_day += log_path;
> path_local_day += local_day;
> if (!boost::filesystem::exists(path_local_day))
> {
> create_directory(path_local_day);
> }
>
> log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout();
> layout_c->setConversionPattern("%d: %p %c %x: %m%n");
> log4cpp::Appender *console_appender = new
> log4cpp::OstreamAppender("console", &std::cout);
> console_appender->setLayout(layout_c);
>
> const std::string str_log(path_local_day.string() + "/wuserver.log");
> log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout();
> layout_f->setConversionPattern("%d: %p %c %x: %m%n");
> log4cpp::Appender* file_appender = new log4cpp::FileAppender("main",
> str_log);
> file_appender->setLayout(layout_f);
>
> glog.addAppender(file_appender);
> glog.addAppender(console_appender);
> glog.setPriority(log4cpp::Priority::DEBUG);
>
> // Prepare our context and publisher
> zmq::context_t context (1);
> zmq::socket_t publisher (context, ZMQ_PUB);
> int hwm = 0;
> publisher.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
> publisher.bind("tcp://*:5556");
>
> boost::uniform_int<> distribution(1, 3000);
> boost::mt19937 engine;
> boost::variate_generator<boost::mt19937, boost::uniform_int<> >
> myrandom(engine, distribution);
> while (1)
> {
> int code;
> char prefix[8] = {0};
> TDF_MARKET_DATA tick;
>
> zmq::message_t message(7 + sizeof(TDF_MARKET_DATA));
> code = myrandom();
> code % 2 == 0 ? sprintf(prefix, "0600%03d", code) : sprintf(prefix,
> "1000%03d", code);
> //sprintf(data.szCode, "%07d", prefix);
> //memcpy(tick.szCode, prefix, 7);
> strcpy(tick.szCode, prefix);
> memcpy((char*)message.data(), prefix, 7);
> memcpy((char*)message.data() + 7, &tick, sizeof(TDF_MARKET_DATA));
>
> publisher.send(message);
>
> //if (strncmp(prefix, "1001271", 7) == 0)
> if (strncmp(prefix, "0600008", 7) == 0)
> {
> glog.info("send %s", prefix);
> }
>
> boost::this_thread::sleep(boost::posix_time::microseconds(1000));// 1ms
> //Sleep(1);
> }
> return 0;
> }
>
>
> -------------------------Client(Black box
> pattern)---------------------------
> //
> // Weather update client in C++
> // Connects SUB socket to tcp://localhost:5556
> // Collects weather updates and finds avg temp in zipcode
> //
> // Olivier Chamoux <olivier.chamoux at fr.thalesgroup.com>
> //
> #include <zmq.hpp>
> #include <string>
> #include <iostream>
> #include <sstream>
> #include <boost/thread.hpp>
> #include <boost/function.hpp>
> #include "zhelpers.hpp"
> #include <stdio.h>
> #include <stdlib.h>
> #include <time.h>
> #include "zhelpers.hpp"
> #include <boost/asio.hpp>
> #include <boost/bind.hpp>
> #include <boost/thread.hpp>
> #include <boost/lexical_cast.hpp>
> #include <boost/tokenizer.hpp>
> #include <boost/filesystem.hpp>
> #include <boost/date_time/gregorian/gregorian.hpp>
> #include <log4cpp/Category.hh>
> #include <log4cpp/Appender.hh>
> #include <log4cpp/Priority.hh>
> #include <log4cpp/FileAppender.hh>
> #include <log4cpp/PatternLayout.hh>
> #include <log4cpp/OstreamAppender.hh>
> #include "../TDFPub/src/TDF/TDFAPIStruct.h"
> log4cpp::Category& glog = log4cpp::Category::getRoot();
>
> class client_task
> {
> public:
> client_task()
> : context_(NULL)
> {
> context_ = zmq_ctx_new();
> }
> void subscribe()
> {
> void* sender = zmq_socket(context_, ZMQ_PAIR);
> int rc = zmq_connect(sender, "inproc://input");
> assert(rc == 0);
> std::string msg;
> while (getline(std::cin, msg) != "")
> {
> int size = zmq_send(sender, msg.c_str(), msg.length(), 0);
> assert(size != 0);
> }
> }
> void poll()
> {
> void* receiver = zmq_socket(context_, ZMQ_PAIR);
> int rc = zmq_bind(receiver, "inproc://input");
> assert(rc == 0);
> void* subscriber = zmq_socket(context_, ZMQ_SUB);
> int hwm = 0;
> rc = zmq_setsockopt(subscriber, ZMQ_RCVHWM, &hwm, sizeof(hwm));
> assert(rc == 0);
> rc = zmq_setsockopt(subscriber, ZMQ_SNDHWM, &hwm, sizeof(hwm));
> assert(rc == 0);
> rc = zmq_connect(subscriber, "tcp://localhost:5556");
> void* task_sender = zmq_socket(context_, ZMQ_PUSH);
> rc = zmq_bind(task_sender, "inproc://task_send");
> zmq_pollitem_t items [] = {
> {receiver, 0, ZMQ_POLLIN, 0},
> {subscriber, 0, ZMQ_POLLIN, 0}
> };
> while (true)
> {
> zmq_poll(&items[0], 2, -1);
> if (items[0].revents & ZMQ_POLLIN)
> {
> zmq_msg_t msg;
> zmq_msg_init(&msg);
> zmq_recvmsg(receiver, &msg, 0);
> char cmd = (static_cast<char*>(zmq_msg_data(&msg)))[0];
> std::string list(static_cast<char*>(zmq_msg_data(&msg)) + 2,
> zmq_msg_size(&msg) - 2);
> std::vector<std::string> vec_stocks;
> const char* sep = ",";
> const char* begin = list.c_str();
> char* p = NULL;
> p = strtok((char*)begin, sep);
> if (p)
> {
> vec_stocks.push_back(p);
> }
> else
> {
> continue;
> }
> while ((p = strtok(NULL, sep)) != NULL)
> {
> vec_stocks.push_back(p);
> }
> if (vec_stocks.size() == 0)
> {
> continue;
> }
> if (cmd == 's')
> {
> for (std::vector<std::string>::const_iterator it = vec_stocks.begin();
> it != vec_stocks.end(); it++)
> {
> rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, it->c_str(),
> it->length());
> assert(rc == 0);
> }
> }
> else if (cmd == 'u')
> {
> for (std::vector<std::string>::const_iterator it = vec_stocks.begin();
> it != vec_stocks.end(); it++)
> {
> rc = zmq_setsockopt(subscriber, ZMQ_UNSUBSCRIBE, it->c_str(),
> it->length());
> assert(rc == 0);
> }
> }
> zmq_msg_close(&msg);
> }
> if (items[1].revents & ZMQ_POLLIN)
> {
> zmq_msg_t msg;
> zmq_msg_init(&msg);
> zmq_recvmsg(subscriber, &msg, 0);
> /*if (memcmp(static_cast<char*>(zmq_msg_data(&msg)), "0600008", 7) == 0)
> {
> glog.info("recv");
> }*/
>
> //zmq_send(task_sender, &msg, zmq_msg_size(&msg), 0);
> zmq_sendmsg(task_sender, &msg, 0);
> zmq_msg_close(&msg);
> }
> }
> }
> void task_handle()
> {
> void* receiver = zmq_socket(context_, ZMQ_PULL);
> int rc = zmq_connect(receiver, "inproc://task_send");
> assert(rc == 0);
> while (true)
> {
> zmq_msg_t msg;
> zmq_msg_init(&msg);
> zmq_recvmsg(receiver, &msg, 0);
>
> char prefix[8] = {0};
> memcpy(prefix, static_cast<char*>(zmq_msg_data(&msg)), 7);
> if (strncmp(prefix, "0600008", 7) == 0)
> {
> TDF_MARKET_DATA dst;
> memset(&dst, 0, sizeof(dst));
> memcpy(&dst, static_cast<char*>(zmq_msg_data(&msg)) + 7, sizeof(dst));
> glog.info("[%d] recv %s, szCode = %s",
> boost::this_thread::get_id(), prefix, dst.szCode);
> }
> zmq_msg_close(&msg);
> }
> }
>
> private:
> void* context_;
> };
> int main()
> {
> boost::filesystem::path log_path("./log/");
> if (!boost::filesystem::exists(log_path))
> {
> create_directory(log_path);
> }
> std::string local_day =
> boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day());
> boost::filesystem::path path_local_day("");
> path_local_day += log_path;
> path_local_day += local_day;
> if (!boost::filesystem::exists(path_local_day))
> {
> create_directory(path_local_day);
> }
> log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout();
> layout_c->setConversionPattern("%d: %p %c %x: %m%n");
> log4cpp::Appender *console_appender = new
> log4cpp::OstreamAppender("console", &std::cout);
> console_appender->setLayout(layout_c);
> const std::string str_log(path_local_day.string() + "/wuclient.log");
> log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout();
> layout_f->setConversionPattern("%d: %p %c %x: %m%n");
> log4cpp::Appender* file_appender = new log4cpp::FileAppender("main",
> str_log);
> file_appender->setLayout(layout_f);
> glog.addAppender(file_appender);
> glog.addAppender(console_appender);
> glog.setPriority(log4cpp::Priority::DEBUG);
> client_task ct;
> for (int i = 0; i < 2; i++)
> {
> boost::thread t_handle(boost::bind(&client_task::task_handle, &ct));
> }
> Sleep(2000);
> boost::thread t_poll(boost::bind(&client_task::poll, &ct));
> Sleep(500);
> boost::thread t_subscribe(boost::bind(&client_task::subscribe, &ct));
> t_poll.join();
> t_subscribe.join();
> return 0;
> }
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
More information about the zeromq-dev
mailing list