[zeromq-dev] subscribe a lot of topic , make the subscriber get data latency?
LN
58315149 at qq.com
Sat Mar 11 13:07:10 CET 2017
hi all !
I have written a program to imitate the pub/sub of the securities market quotation data. and i found that when i subscribe a lot of topics, the subscriber get data latency. when subscirbe 50 topics , some tick data slow more than 100ms from the server, and i don't know why. Below is my code, and i user the black box pattern.
the problem is that: (Windows Server 2008, 8 cores, sizeof(TDF_MARKET_DATA) = 300bytes)
A: when i subscribe one topic from console(s 0600008,), the latency from server to client is 0 or 1 milliseconds
, it is ok!(log compare from client to server 0600008)
B: when i subscribe 100 topics from console(like s 0600002, 0600004, 0600006...), the latency sometimes 1ms and sometimes 10 ~ 60 milliseconds.
C: when i subscribe 300 topics from console, the latency sometimes 1 ~ 20 seconds!!!
And i found that the memory of client increase fast.
and i don't know what's wrong with my program.
Looking forward to your reply!! Thanks a lot!
-------------------------Server(modify from wuserver.cpp)---------------------------
//
// Weather update server in C++
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
// Olivier Chamoux <olivier.chamoux at fr.thalesgroup.com>
//
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <zmq.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/random.hpp>
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/tokenizer.hpp>
#include <boost/filesystem.hpp>
#include <boost/date_time/gregorian/gregorian.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <log4cpp/Category.hh>
#include <log4cpp/Appender.hh>
#include <log4cpp/Priority.hh>
#include <log4cpp/FileAppender.hh>
#include <log4cpp/PatternLayout.hh>
#include <log4cpp/OstreamAppender.hh>
#include "../TDFPub/src/TDF/TDFAPIStruct.h"
#if (defined (WIN32))
//#include <zhelpers.hpp>
#endif
#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
log4cpp::Category& glog = log4cpp::Category::getRoot();
int main ()
{
boost::filesystem::path log_path("./log/");
if (!boost::filesystem::exists(log_path))
{
create_directory(log_path);
}
std::string local_day = boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day());
boost::filesystem::path path_local_day("");
path_local_day += log_path;
path_local_day += local_day;
if (!boost::filesystem::exists(path_local_day))
{
create_directory(path_local_day);
}
log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout();
layout_c->setConversionPattern("%d: %p %c %x: %m%n");
log4cpp::Appender *console_appender = new log4cpp::OstreamAppender("console", &std::cout);
console_appender->setLayout(layout_c);
const std::string str_log(path_local_day.string() + "/wuserver.log");
log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout();
layout_f->setConversionPattern("%d: %p %c %x: %m%n");
log4cpp::Appender* file_appender = new log4cpp::FileAppender("main", str_log);
file_appender->setLayout(layout_f);
glog.addAppender(file_appender);
glog.addAppender(console_appender);
glog.setPriority(log4cpp::Priority::DEBUG);
// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
int hwm = 0;
publisher.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
publisher.bind("tcp://*:5556");
boost::uniform_int<> distribution(1, 3000);
boost::mt19937 engine;
boost::variate_generator<boost::mt19937, boost::uniform_int<> > myrandom(engine, distribution);
while (1)
{
int code;
char prefix[8] = {0};
TDF_MARKET_DATA tick;
zmq::message_t message(7 + sizeof(TDF_MARKET_DATA));
code = myrandom();
code % 2 == 0 ? sprintf(prefix, "0600%03d", code) : sprintf(prefix, "1000%03d", code);
//sprintf(data.szCode, "%07d", prefix);
//memcpy(tick.szCode, prefix, 7);
strcpy(tick.szCode, prefix);
memcpy((char*)message.data(), prefix, 7);
memcpy((char*)message.data() + 7, &tick, sizeof(TDF_MARKET_DATA));
publisher.send(message);
//if (strncmp(prefix, "1001271", 7) == 0)
if (strncmp(prefix, "0600008", 7) == 0)
{
glog.info("send %s", prefix);
}
boost::this_thread::sleep(boost::posix_time::microseconds(1000));// 1ms
//Sleep(1);
}
return 0;
}
-------------------------Client(Black box pattern)---------------------------
//
// Weather update client in C++
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
// Olivier Chamoux <olivier.chamoux at fr.thalesgroup.com>
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <sstream>
#include <boost/thread.hpp>
#include <boost/function.hpp>
#include "zhelpers.hpp"
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "zhelpers.hpp"
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/tokenizer.hpp>
#include <boost/filesystem.hpp>
#include <boost/date_time/gregorian/gregorian.hpp>
#include <log4cpp/Category.hh>
#include <log4cpp/Appender.hh>
#include <log4cpp/Priority.hh>
#include <log4cpp/FileAppender.hh>
#include <log4cpp/PatternLayout.hh>
#include <log4cpp/OstreamAppender.hh>
#include "../TDFPub/src/TDF/TDFAPIStruct.h"
log4cpp::Category& glog = log4cpp::Category::getRoot();
class client_task
{
public:
client_task()
: context_(NULL)
{
context_ = zmq_ctx_new();
}
void subscribe()
{
void* sender = zmq_socket(context_, ZMQ_PAIR);
int rc = zmq_connect(sender, "inproc://input");
assert(rc == 0);
std::string msg;
while (getline(std::cin, msg) != "")
{
int size = zmq_send(sender, msg.c_str(), msg.length(), 0);
assert(size != 0);
}
}
void poll()
{
void* receiver = zmq_socket(context_, ZMQ_PAIR);
int rc = zmq_bind(receiver, "inproc://input");
assert(rc == 0);
void* subscriber = zmq_socket(context_, ZMQ_SUB);
int hwm = 0;
rc = zmq_setsockopt(subscriber, ZMQ_RCVHWM, &hwm, sizeof(hwm));
assert(rc == 0);
rc = zmq_setsockopt(subscriber, ZMQ_SNDHWM, &hwm, sizeof(hwm));
assert(rc == 0);
rc = zmq_connect(subscriber, "tcp://localhost:5556");
void* task_sender = zmq_socket(context_, ZMQ_PUSH);
rc = zmq_bind(task_sender, "inproc://task_send");
zmq_pollitem_t items [] = {
{receiver, 0, ZMQ_POLLIN, 0},
{subscriber, 0, ZMQ_POLLIN, 0}
};
while (true)
{
zmq_poll(&items[0], 2, -1);
if (items[0].revents & ZMQ_POLLIN)
{
zmq_msg_t msg;
zmq_msg_init(&msg);
zmq_recvmsg(receiver, &msg, 0);
char cmd = (static_cast<char*>(zmq_msg_data(&msg)))[0];
std::string list(static_cast<char*>(zmq_msg_data(&msg)) + 2, zmq_msg_size(&msg) - 2);
std::vector<std::string> vec_stocks;
const char* sep = ",";
const char* begin = list.c_str();
char* p = NULL;
p = strtok((char*)begin, sep);
if (p)
{
vec_stocks.push_back(p);
}
else
{
continue;
}
while ((p = strtok(NULL, sep)) != NULL)
{
vec_stocks.push_back(p);
}
if (vec_stocks.size() == 0)
{
continue;
}
if (cmd == 's')
{
for (std::vector<std::string>::const_iterator it = vec_stocks.begin();
it != vec_stocks.end(); it++)
{
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, it->c_str(), it->length());
assert(rc == 0);
}
}
else if (cmd == 'u')
{
for (std::vector<std::string>::const_iterator it = vec_stocks.begin();
it != vec_stocks.end(); it++)
{
rc = zmq_setsockopt(subscriber, ZMQ_UNSUBSCRIBE, it->c_str(), it->length());
assert(rc == 0);
}
}
zmq_msg_close(&msg);
}
if (items[1].revents & ZMQ_POLLIN)
{
zmq_msg_t msg;
zmq_msg_init(&msg);
zmq_recvmsg(subscriber, &msg, 0);
/*if (memcmp(static_cast<char*>(zmq_msg_data(&msg)), "0600008", 7) == 0)
{
glog.info("recv");
}*/
//zmq_send(task_sender, &msg, zmq_msg_size(&msg), 0);
zmq_sendmsg(task_sender, &msg, 0);
zmq_msg_close(&msg);
}
}
}
void task_handle()
{
void* receiver = zmq_socket(context_, ZMQ_PULL);
int rc = zmq_connect(receiver, "inproc://task_send");
assert(rc == 0);
while (true)
{
zmq_msg_t msg;
zmq_msg_init(&msg);
zmq_recvmsg(receiver, &msg, 0);
char prefix[8] = {0};
memcpy(prefix, static_cast<char*>(zmq_msg_data(&msg)), 7);
if (strncmp(prefix, "0600008", 7) == 0)
{
TDF_MARKET_DATA dst;
memset(&dst, 0, sizeof(dst));
memcpy(&dst, static_cast<char*>(zmq_msg_data(&msg)) + 7, sizeof(dst));
glog.info("[%d] recv %s, szCode = %s",
boost::this_thread::get_id(), prefix, dst.szCode);
}
zmq_msg_close(&msg);
}
}
private:
void* context_;
};
int main()
{
boost::filesystem::path log_path("./log/");
if (!boost::filesystem::exists(log_path))
{
create_directory(log_path);
}
std::string local_day = boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day());
boost::filesystem::path path_local_day("");
path_local_day += log_path;
path_local_day += local_day;
if (!boost::filesystem::exists(path_local_day))
{
create_directory(path_local_day);
}
log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout();
layout_c->setConversionPattern("%d: %p %c %x: %m%n");
log4cpp::Appender *console_appender = new log4cpp::OstreamAppender("console", &std::cout);
console_appender->setLayout(layout_c);
const std::string str_log(path_local_day.string() + "/wuclient.log");
log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout();
layout_f->setConversionPattern("%d: %p %c %x: %m%n");
log4cpp::Appender* file_appender = new log4cpp::FileAppender("main", str_log);
file_appender->setLayout(layout_f);
glog.addAppender(file_appender);
glog.addAppender(console_appender);
glog.setPriority(log4cpp::Priority::DEBUG);
client_task ct;
for (int i = 0; i < 2; i++)
{
boost::thread t_handle(boost::bind(&client_task::task_handle, &ct));
}
Sleep(2000);
boost::thread t_poll(boost::bind(&client_task::poll, &ct));
Sleep(500);
boost::thread t_subscribe(boost::bind(&client_task::subscribe, &ct));
t_poll.join();
t_subscribe.join();
return 0;
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20170311/ecd3179e/attachment.htm>
More information about the zeromq-dev
mailing list