[zeromq-dev] 回复:zeromq-dev Digest, Vol 12, Issue 9
LN
58315149 at qq.com
Sun Mar 12 15:13:43 CET 2017
hi all
i don't think the hwm set to 0 make the latency, because i set it to a large value, the problem still exist.
the problem is if a subscribe few topics, the client works fine. if i subscirbe a lot of topics , the client get data
latency, and use more cpu% and memory.
Is there any problem with my client code? Can someone help me look at it? Or zeromq efficiency can only achieve so much, I do not believe.
------------------ 原始邮件 ------------------
发件人: "zeromq-dev-request";<zeromq-dev-request at lists.zeromq.org>;
发送时间: 2017年3月12日(星期天) 晚上7:00
收件人: "zeromq-dev"<zeromq-dev at lists.zeromq.org>;
主题: zeromq-dev Digest, Vol 12, Issue 9
Send zeromq-dev mailing list submissions to
zeromq-dev at lists.zeromq.org
To subscribe or unsubscribe via the World Wide Web, visit
https://lists.zeromq.org/mailman/listinfo/zeromq-dev
or, via email, send a message with subject or body 'help' to
zeromq-dev-request at lists.zeromq.org
You can reach the person managing the list at
zeromq-dev-owner at lists.zeromq.org
When replying, please edit your Subject line so it is more specific
than "Re: Contents of zeromq-dev digest..."
Today's Topics:
1. subscribe a lot of topic , make the subscriber get data
latency? (=?utf-8?B?TE4=?=)
2. Re: subscribe a lot of topic , make the subscriber get data
latency? (Francesco)
----------------------------------------------------------------------
Message: 1
Date: Sat, 11 Mar 2017 20:07:10 +0800
From: "=?utf-8?B?TE4=?=" <58315149 at qq.com>
To: "=?utf-8?B?emVyb21xLWRldg==?=" <zeromq-dev at lists.zeromq.org>
Subject: [zeromq-dev] subscribe a lot of topic , make the subscriber
get data latency?
Message-ID: <tencent_684177AD152F9B302A4F0107 at qq.com>
Content-Type: text/plain; charset="utf-8"
hi all !
I have written a program to imitate the pub/sub of the securities market quotation data. and i found that when i subscribe a lot of topics, the subscriber get data latency. when subscirbe 50 topics , some tick data slow more than 100ms from the server, and i don't know why. Below is my code, and i user the black box pattern.
the problem is that: (Windows Server 2008, 8 cores, sizeof(TDF_MARKET_DATA) = 300bytes)
A: when i subscribe one topic from console(s 0600008,), the latency from server to client is 0 or 1 milliseconds
, it is ok!(log compare from client to server 0600008)
B: when i subscribe 100 topics from console(like s 0600002, 0600004, 0600006...), the latency sometimes 1ms and sometimes 10 ~ 60 milliseconds.
C: when i subscribe 300 topics from console, the latency sometimes 1 ~ 20 seconds!!!
And i found that the memory of client increase fast.
and i don't know what's wrong with my program.
Looking forward to your reply!! Thanks a lot!
-------------------------Server(modify from wuserver.cpp)---------------------------
//
// Weather update server in C++
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
// Olivier Chamoux <olivier.chamoux at fr.thalesgroup.com>
//
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <zmq.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/random.hpp>
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/tokenizer.hpp>
#include <boost/filesystem.hpp>
#include <boost/date_time/gregorian/gregorian.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <log4cpp/Category.hh>
#include <log4cpp/Appender.hh>
#include <log4cpp/Priority.hh>
#include <log4cpp/FileAppender.hh>
#include <log4cpp/PatternLayout.hh>
#include <log4cpp/OstreamAppender.hh>
#include "../TDFPub/src/TDF/TDFAPIStruct.h"
#if (defined (WIN32))
//#include <zhelpers.hpp>
#endif
#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
log4cpp::Category& glog = log4cpp::Category::getRoot();
int main ()
{
boost::filesystem::path log_path("./log/");
if (!boost::filesystem::exists(log_path))
{
create_directory(log_path);
}
std::string local_day = boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day());
boost::filesystem::path path_local_day("");
path_local_day += log_path;
path_local_day += local_day;
if (!boost::filesystem::exists(path_local_day))
{
create_directory(path_local_day);
}
log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout();
layout_c->setConversionPattern("%d: %p %c %x: %m%n");
log4cpp::Appender *console_appender = new log4cpp::OstreamAppender("console", &std::cout);
console_appender->setLayout(layout_c);
const std::string str_log(path_local_day.string() + "/wuserver.log");
log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout();
layout_f->setConversionPattern("%d: %p %c %x: %m%n");
log4cpp::Appender* file_appender = new log4cpp::FileAppender("main", str_log);
file_appender->setLayout(layout_f);
glog.addAppender(file_appender);
glog.addAppender(console_appender);
glog.setPriority(log4cpp::Priority::DEBUG);
// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
int hwm = 0;
publisher.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
publisher.bind("tcp://*:5556");
boost::uniform_int<> distribution(1, 3000);
boost::mt19937 engine;
boost::variate_generator<boost::mt19937, boost::uniform_int<> > myrandom(engine, distribution);
while (1)
{
int code;
char prefix[8] = {0};
TDF_MARKET_DATA tick;
zmq::message_t message(7 + sizeof(TDF_MARKET_DATA));
code = myrandom();
code % 2 == 0 ? sprintf(prefix, "0600%03d", code) : sprintf(prefix, "1000%03d", code);
//sprintf(data.szCode, "%07d", prefix);
//memcpy(tick.szCode, prefix, 7);
strcpy(tick.szCode, prefix);
memcpy((char*)message.data(), prefix, 7);
memcpy((char*)message.data() + 7, &tick, sizeof(TDF_MARKET_DATA));
publisher.send(message);
//if (strncmp(prefix, "1001271", 7) == 0)
if (strncmp(prefix, "0600008", 7) == 0)
{
glog.info("send %s", prefix);
}
boost::this_thread::sleep(boost::posix_time::microseconds(1000));// 1ms
//Sleep(1);
}
return 0;
}
-------------------------Client(Black box pattern)---------------------------
//
// Weather update client in C++
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
// Olivier Chamoux <olivier.chamoux at fr.thalesgroup.com>
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <sstream>
#include <boost/thread.hpp>
#include <boost/function.hpp>
#include "zhelpers.hpp"
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "zhelpers.hpp"
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/tokenizer.hpp>
#include <boost/filesystem.hpp>
#include <boost/date_time/gregorian/gregorian.hpp>
#include <log4cpp/Category.hh>
#include <log4cpp/Appender.hh>
#include <log4cpp/Priority.hh>
#include <log4cpp/FileAppender.hh>
#include <log4cpp/PatternLayout.hh>
#include <log4cpp/OstreamAppender.hh>
#include "../TDFPub/src/TDF/TDFAPIStruct.h"
log4cpp::Category& glog = log4cpp::Category::getRoot();
class client_task
{
public:
client_task()
: context_(NULL)
{
context_ = zmq_ctx_new();
}
void subscribe()
{
void* sender = zmq_socket(context_, ZMQ_PAIR);
int rc = zmq_connect(sender, "inproc://input");
assert(rc == 0);
std::string msg;
while (getline(std::cin, msg) != "")
{
int size = zmq_send(sender, msg.c_str(), msg.length(), 0);
assert(size != 0);
}
}
void poll()
{
void* receiver = zmq_socket(context_, ZMQ_PAIR);
int rc = zmq_bind(receiver, "inproc://input");
assert(rc == 0);
void* subscriber = zmq_socket(context_, ZMQ_SUB);
int hwm = 0;
rc = zmq_setsockopt(subscriber, ZMQ_RCVHWM, &hwm, sizeof(hwm));
assert(rc == 0);
rc = zmq_setsockopt(subscriber, ZMQ_SNDHWM, &hwm, sizeof(hwm));
assert(rc == 0);
rc = zmq_connect(subscriber, "tcp://localhost:5556");
void* task_sender = zmq_socket(context_, ZMQ_PUSH);
rc = zmq_bind(task_sender, "inproc://task_send");
zmq_pollitem_t items [] = {
{receiver, 0, ZMQ_POLLIN, 0},
{subscriber, 0, ZMQ_POLLIN, 0}
};
while (true)
{
zmq_poll(&items[0], 2, -1);
if (items[0].revents & ZMQ_POLLIN)
{
zmq_msg_t msg;
zmq_msg_init(&msg);
zmq_recvmsg(receiver, &msg, 0);
char cmd = (static_cast<char*>(zmq_msg_data(&msg)))[0];
std::string list(static_cast<char*>(zmq_msg_data(&msg)) + 2, zmq_msg_size(&msg) - 2);
std::vector<std::string> vec_stocks;
const char* sep = ",";
const char* begin = list.c_str();
char* p = NULL;
p = strtok((char*)begin, sep);
if (p)
{
vec_stocks.push_back(p);
}
else
{
continue;
}
while ((p = strtok(NULL, sep)) != NULL)
{
vec_stocks.push_back(p);
}
if (vec_stocks.size() == 0)
{
continue;
}
if (cmd == 's')
{
for (std::vector<std::string>::const_iterator it = vec_stocks.begin();
it != vec_stocks.end(); it++)
{
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, it->c_str(), it->length());
assert(rc == 0);
}
}
else if (cmd == 'u')
{
for (std::vector<std::string>::const_iterator it = vec_stocks.begin();
it != vec_stocks.end(); it++)
{
rc = zmq_setsockopt(subscriber, ZMQ_UNSUBSCRIBE, it->c_str(), it->length());
assert(rc == 0);
}
}
zmq_msg_close(&msg);
}
if (items[1].revents & ZMQ_POLLIN)
{
zmq_msg_t msg;
zmq_msg_init(&msg);
zmq_recvmsg(subscriber, &msg, 0);
/*if (memcmp(static_cast<char*>(zmq_msg_data(&msg)), "0600008", 7) == 0)
{
glog.info("recv");
}*/
//zmq_send(task_sender, &msg, zmq_msg_size(&msg), 0);
zmq_sendmsg(task_sender, &msg, 0);
zmq_msg_close(&msg);
}
}
}
void task_handle()
{
void* receiver = zmq_socket(context_, ZMQ_PULL);
int rc = zmq_connect(receiver, "inproc://task_send");
assert(rc == 0);
while (true)
{
zmq_msg_t msg;
zmq_msg_init(&msg);
zmq_recvmsg(receiver, &msg, 0);
char prefix[8] = {0};
memcpy(prefix, static_cast<char*>(zmq_msg_data(&msg)), 7);
if (strncmp(prefix, "0600008", 7) == 0)
{
TDF_MARKET_DATA dst;
memset(&dst, 0, sizeof(dst));
memcpy(&dst, static_cast<char*>(zmq_msg_data(&msg)) + 7, sizeof(dst));
glog.info("[%d] recv %s, szCode = %s",
boost::this_thread::get_id(), prefix, dst.szCode);
}
zmq_msg_close(&msg);
}
}
private:
void* context_;
};
int main()
{
boost::filesystem::path log_path("./log/");
if (!boost::filesystem::exists(log_path))
{
create_directory(log_path);
}
std::string local_day = boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day());
boost::filesystem::path path_local_day("");
path_local_day += log_path;
path_local_day += local_day;
if (!boost::filesystem::exists(path_local_day))
{
create_directory(path_local_day);
}
log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout();
layout_c->setConversionPattern("%d: %p %c %x: %m%n");
log4cpp::Appender *console_appender = new log4cpp::OstreamAppender("console", &std::cout);
console_appender->setLayout(layout_c);
const std::string str_log(path_local_day.string() + "/wuclient.log");
log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout();
layout_f->setConversionPattern("%d: %p %c %x: %m%n");
log4cpp::Appender* file_appender = new log4cpp::FileAppender("main", str_log);
file_appender->setLayout(layout_f);
glog.addAppender(file_appender);
glog.addAppender(console_appender);
glog.setPriority(log4cpp::Priority::DEBUG);
client_task ct;
for (int i = 0; i < 2; i++)
{
boost::thread t_handle(boost::bind(&client_task::task_handle, &ct));
}
Sleep(2000);
boost::thread t_poll(boost::bind(&client_task::poll, &ct));
Sleep(500);
boost::thread t_subscribe(boost::bind(&client_task::subscribe, &ct));
t_poll.join();
t_subscribe.join();
return 0;
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20170311/ecd3179e/attachment-0001.html>
------------------------------
Message: 2
Date: Sat, 11 Mar 2017 17:32:47 +0100
From: Francesco <francesco.montorsi at gmail.com>
To: ZeroMQ development list <zeromq-dev at lists.zeromq.org>
Subject: Re: [zeromq-dev] subscribe a lot of topic , make the
subscriber get data latency?
Message-ID:
<CAKW0qihuq0R7mfALnZBRuBkD6u9e5HNAsD952Hs3C+QtsOrkeg at mail.gmail.com>
Content-Type: text/plain; charset=UTF-8
Hi,
I think you are setting the HWM to 0, making ZMQ queues of infinite
size (limited by the RAM you have)... this may explain your issue? I
mean: the queues get full and they start growing leading to 1)
increased memory usage, 2) increasing latency...
HTH,
Francesco
2017-03-11 13:07 GMT+01:00 LN <58315149 at qq.com>:
> hi all !
>
> I have written a program to imitate the pub/sub of the securities market
> quotation data. and i found that when i subscribe a lot of topics, the
> subscriber get data latency. when subscirbe 50 topics , some tick data slow
> more than 100ms from the server, and i don't know why. Below is my code, and
> i user the black box pattern.
>
> the problem is that: (Windows Server 2008, 8 cores, sizeof(TDF_MARKET_DATA)
> = 300bytes)
> A: when i subscribe one topic from console(s 0600008,), the latency from
> server to client is 0 or 1 milliseconds
> , it is ok!(log compare from client to server 0600008)
>
> B: when i subscribe 100 topics from console(like s 0600002, 0600004,
> 0600006...), the latency sometimes 1ms and sometimes 10 ~ 60 milliseconds.
>
> C: when i subscribe 300 topics from console, the latency sometimes 1 ~ 20
> seconds!!!
> And i found that the memory of client increase fast.
>
> and i don't know what's wrong with my program.
>
> Looking forward to your reply!! Thanks a lot!
>
> -------------------------Server(modify from
> wuserver.cpp)---------------------------
>
> //
> // Weather update server in C++
> // Binds PUB socket to tcp://*:5556
> // Publishes random weather updates
> //
> // Olivier Chamoux <olivier.chamoux at fr.thalesgroup.com>
> //
>
> #include <stdio.h>
> #include <stdlib.h>
> #include <time.h>
> #include <zmq.hpp>
>
> #include <boost/asio.hpp>
> #include <boost/bind.hpp>
> #include <boost/random.hpp>
> #include <boost/thread.hpp>
> #include <boost/lexical_cast.hpp>
> #include <boost/tokenizer.hpp>
> #include <boost/filesystem.hpp>
> #include <boost/date_time/gregorian/gregorian.hpp>
> #include <boost/date_time/posix_time/posix_time.hpp>
>
> #include <log4cpp/Category.hh>
> #include <log4cpp/Appender.hh>
> #include <log4cpp/Priority.hh>
> #include <log4cpp/FileAppender.hh>
> #include <log4cpp/PatternLayout.hh>
> #include <log4cpp/OstreamAppender.hh>
>
> #include "../TDFPub/src/TDF/TDFAPIStruct.h"
>
> #if (defined (WIN32))
> //#include <zhelpers.hpp>
> #endif
>
> #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
>
> log4cpp::Category& glog = log4cpp::Category::getRoot();
>
> int main ()
> {
> boost::filesystem::path log_path("./log/");
> if (!boost::filesystem::exists(log_path))
> {
> create_directory(log_path);
> }
> std::string local_day =
> boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day());
> boost::filesystem::path path_local_day("");
> path_local_day += log_path;
> path_local_day += local_day;
> if (!boost::filesystem::exists(path_local_day))
> {
> create_directory(path_local_day);
> }
>
> log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout();
> layout_c->setConversionPattern("%d: %p %c %x: %m%n");
> log4cpp::Appender *console_appender = new
> log4cpp::OstreamAppender("console", &std::cout);
> console_appender->setLayout(layout_c);
>
> const std::string str_log(path_local_day.string() + "/wuserver.log");
> log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout();
> layout_f->setConversionPattern("%d: %p %c %x: %m%n");
> log4cpp::Appender* file_appender = new log4cpp::FileAppender("main",
> str_log);
> file_appender->setLayout(layout_f);
>
> glog.addAppender(file_appender);
> glog.addAppender(console_appender);
> glog.setPriority(log4cpp::Priority::DEBUG);
>
> // Prepare our context and publisher
> zmq::context_t context (1);
> zmq::socket_t publisher (context, ZMQ_PUB);
> int hwm = 0;
> publisher.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
> publisher.bind("tcp://*:5556");
>
> boost::uniform_int<> distribution(1, 3000);
> boost::mt19937 engine;
> boost::variate_generator<boost::mt19937, boost::uniform_int<> >
> myrandom(engine, distribution);
> while (1)
> {
> int code;
> char prefix[8] = {0};
> TDF_MARKET_DATA tick;
>
> zmq::message_t message(7 + sizeof(TDF_MARKET_DATA));
> code = myrandom();
> code % 2 == 0 ? sprintf(prefix, "0600%03d", code) : sprintf(prefix,
> "1000%03d", code);
> //sprintf(data.szCode, "%07d", prefix);
> //memcpy(tick.szCode, prefix, 7);
> strcpy(tick.szCode, prefix);
> memcpy((char*)message.data(), prefix, 7);
> memcpy((char*)message.data() + 7, &tick, sizeof(TDF_MARKET_DATA));
>
> publisher.send(message);
>
> //if (strncmp(prefix, "1001271", 7) == 0)
> if (strncmp(prefix, "0600008", 7) == 0)
> {
> glog.info("send %s", prefix);
> }
>
> boost::this_thread::sleep(boost::posix_time::microseconds(1000));// 1ms
> //Sleep(1);
> }
> return 0;
> }
>
>
> -------------------------Client(Black box
> pattern)---------------------------
> //
> // Weather update client in C++
> // Connects SUB socket to tcp://localhost:5556
> // Collects weather updates and finds avg temp in zipcode
> //
> // Olivier Chamoux <olivier.chamoux at fr.thalesgroup.com>
> //
> #include <zmq.hpp>
> #include <string>
> #include <iostream>
> #include <sstream>
> #include <boost/thread.hpp>
> #include <boost/function.hpp>
> #include "zhelpers.hpp"
> #include <stdio.h>
> #include <stdlib.h>
> #include <time.h>
> #include "zhelpers.hpp"
> #include <boost/asio.hpp>
> #include <boost/bind.hpp>
> #include <boost/thread.hpp>
> #include <boost/lexical_cast.hpp>
> #include <boost/tokenizer.hpp>
> #include <boost/filesystem.hpp>
> #include <boost/date_time/gregorian/gregorian.hpp>
> #include <log4cpp/Category.hh>
> #include <log4cpp/Appender.hh>
> #include <log4cpp/Priority.hh>
> #include <log4cpp/FileAppender.hh>
> #include <log4cpp/PatternLayout.hh>
> #include <log4cpp/OstreamAppender.hh>
> #include "../TDFPub/src/TDF/TDFAPIStruct.h"
> log4cpp::Category& glog = log4cpp::Category::getRoot();
>
> class client_task
> {
> public:
> client_task()
> : context_(NULL)
> {
> context_ = zmq_ctx_new();
> }
> void subscribe()
> {
> void* sender = zmq_socket(context_, ZMQ_PAIR);
> int rc = zmq_connect(sender, "inproc://input");
> assert(rc == 0);
> std::string msg;
> while (getline(std::cin, msg) != "")
> {
> int size = zmq_send(sender, msg.c_str(), msg.length(), 0);
> assert(size != 0);
> }
> }
> void poll()
> {
> void* receiver = zmq_socket(context_, ZMQ_PAIR);
> int rc = zmq_bind(receiver, "inproc://input");
> assert(rc == 0);
> void* subscriber = zmq_socket(context_, ZMQ_SUB);
> int hwm = 0;
> rc = zmq_setsockopt(subscriber, ZMQ_RCVHWM, &hwm, sizeof(hwm));
> assert(rc == 0);
> rc = zmq_setsockopt(subscriber, ZMQ_SNDHWM, &hwm, sizeof(hwm));
> assert(rc == 0);
> rc = zmq_connect(subscriber, "tcp://localhost:5556");
> void* task_sender = zmq_socket(context_, ZMQ_PUSH);
> rc = zmq_bind(task_sender, "inproc://task_send");
> zmq_pollitem_t items [] = {
> {receiver, 0, ZMQ_POLLIN, 0},
> {subscriber, 0, ZMQ_POLLIN, 0}
> };
> while (true)
> {
> zmq_poll(&items[0], 2, -1);
> if (items[0].revents & ZMQ_POLLIN)
> {
> zmq_msg_t msg;
> zmq_msg_init(&msg);
> zmq_recvmsg(receiver, &msg, 0);
> char cmd = (static_cast<char*>(zmq_msg_data(&msg)))[0];
> std::string list(static_cast<char*>(zmq_msg_data(&msg)) + 2,
> zmq_msg_size(&msg) - 2);
> std::vector<std::string> vec_stocks;
> const char* sep = ",";
> const char* begin = list.c_str();
> char* p = NULL;
> p = strtok((char*)begin, sep);
> if (p)
> {
> vec_stocks.push_back(p);
> }
> else
> {
> continue;
> }
> while ((p = strtok(NULL, sep)) != NULL)
> {
> vec_stocks.push_back(p);
> }
> if (vec_stocks.size() == 0)
> {
> continue;
> }
> if (cmd == 's')
> {
> for (std::vector<std::string>::const_iterator it = vec_stocks.begin();
> it != vec_stocks.end(); it++)
> {
> rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, it->c_str(),
> it->length());
> assert(rc == 0);
> }
> }
> else if (cmd == 'u')
> {
> for (std::vector<std::string>::const_iterator it = vec_stocks.begin();
> it != vec_stocks.end(); it++)
> {
> rc = zmq_setsockopt(subscriber, ZMQ_UNSUBSCRIBE, it->c_str(),
> it->length());
> assert(rc == 0);
> }
> }
> zmq_msg_close(&msg);
> }
> if (items[1].revents & ZMQ_POLLIN)
> {
> zmq_msg_t msg;
> zmq_msg_init(&msg);
> zmq_recvmsg(subscriber, &msg, 0);
> /*if (memcmp(static_cast<char*>(zmq_msg_data(&msg)), "0600008", 7) == 0)
> {
> glog.info("recv");
> }*/
>
> //zmq_send(task_sender, &msg, zmq_msg_size(&msg), 0);
> zmq_sendmsg(task_sender, &msg, 0);
> zmq_msg_close(&msg);
> }
> }
> }
> void task_handle()
> {
> void* receiver = zmq_socket(context_, ZMQ_PULL);
> int rc = zmq_connect(receiver, "inproc://task_send");
> assert(rc == 0);
> while (true)
> {
> zmq_msg_t msg;
> zmq_msg_init(&msg);
> zmq_recvmsg(receiver, &msg, 0);
>
> char prefix[8] = {0};
> memcpy(prefix, static_cast<char*>(zmq_msg_data(&msg)), 7);
> if (strncmp(prefix, "0600008", 7) == 0)
> {
> TDF_MARKET_DATA dst;
> memset(&dst, 0, sizeof(dst));
> memcpy(&dst, static_cast<char*>(zmq_msg_data(&msg)) + 7, sizeof(dst));
> glog.info("[%d] recv %s, szCode = %s",
> boost::this_thread::get_id(), prefix, dst.szCode);
> }
> zmq_msg_close(&msg);
> }
> }
>
> private:
> void* context_;
> };
> int main()
> {
> boost::filesystem::path log_path("./log/");
> if (!boost::filesystem::exists(log_path))
> {
> create_directory(log_path);
> }
> std::string local_day =
> boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day());
> boost::filesystem::path path_local_day("");
> path_local_day += log_path;
> path_local_day += local_day;
> if (!boost::filesystem::exists(path_local_day))
> {
> create_directory(path_local_day);
> }
> log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout();
> layout_c->setConversionPattern("%d: %p %c %x: %m%n");
> log4cpp::Appender *console_appender = new
> log4cpp::OstreamAppender("console", &std::cout);
> console_appender->setLayout(layout_c);
> const std::string str_log(path_local_day.string() + "/wuclient.log");
> log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout();
> layout_f->setConversionPattern("%d: %p %c %x: %m%n");
> log4cpp::Appender* file_appender = new log4cpp::FileAppender("main",
> str_log);
> file_appender->setLayout(layout_f);
> glog.addAppender(file_appender);
> glog.addAppender(console_appender);
> glog.setPriority(log4cpp::Priority::DEBUG);
> client_task ct;
> for (int i = 0; i < 2; i++)
> {
> boost::thread t_handle(boost::bind(&client_task::task_handle, &ct));
> }
> Sleep(2000);
> boost::thread t_poll(boost::bind(&client_task::poll, &ct));
> Sleep(500);
> boost::thread t_subscribe(boost::bind(&client_task::subscribe, &ct));
> t_poll.join();
> t_subscribe.join();
> return 0;
> }
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
------------------------------
Subject: Digest Footer
_______________________________________________
zeromq-dev mailing list
zeromq-dev at lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev
------------------------------
End of zeromq-dev Digest, Vol 12, Issue 9
*****************************************
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20170312/c4dd3379/attachment.htm>
More information about the zeromq-dev
mailing list