[zeromq-dev] zmq multithread bug
Guo, Yanchao
Yanchao.Guo at sac.com
Wed Jan 27 08:40:19 CET 2010
Hi All, the sample code below publishes data into socket from two thread: One thread publishes "dummy", the other one publishes "from server: 1". But the data is not properly received on the other side, this is the print out:
received: from server: 6
received: dummyserver: 6
received: from server: 7
received: dummyserver: 7
received: from server: 8
received: dummyserver: 8
received: from server: 9
received: dummyserver: 9
received: from server: 10
received: dummyserver: 10
Note the "dummyserver: ", it looks like the buffer was overlapped.
Does zeromq support multithreading in general?
#include <sstream>
#include <tbb/queuing_rw_mutex.h>
tbb::queuing_rw_mutex m_Mutex;
#define DATA_PORT "tcp://172.16.18.21:5556"
#define CMD_PORT "tcp://172.16.18.21:5557"
using namespace zmq;
using namespace std;
class QbReceiver {
public:
QbReceiver(const char* port) {
m_Context = new context_t(1,1);
m_Socket = new socket_t(*m_Context, ZMQ_SUB);
m_Socket->connect(port);
m_Socket->setsockopt(ZMQ_SUBSCRIBE, "",0);
}
void listen();
public:
zmq::context_t* m_Context;
zmq::socket_t* m_Socket;
};
/*
struct ThreadReceive{
ThreadReceive(QbReceiver* p_Receiver) : m_MessageReceiver(p_Receiver){}
void operator ()(){
while (true){
message_t msg;
m_MessageReceiver->m_Socket->recv(&msg);
std::cout << "received: " << (char*)msg.data() << std::endl;
}
}
QbReceiver* m_MessageReceiver;
};
void QbReceiver::listen(){
ThreadReceive receiver(this);
tbb::tbb_thread t(receiver);
}
*/
void QbReceiver::listen(){
while (true){
message_t msg;
m_Socket->recv(&msg);
std::cout << "received: " << (char*)msg.data() << std::endl;
}
}
class QbPublisher {
public:
QbPublisher(const char* port) {
m_Context = new context_t(2,1);
m_Socket = new socket_t(*m_Context, ZMQ_PUB);
m_Socket->bind(port);
}
void send(std::string& p_String){
tbb::queuing_rw_mutex::scoped_lock myLock(m_Mutex, true);
message_t msg;
msg.rebuild(p_String.length());
memcpy(msg.data(), p_String.c_str(), p_String.length());
m_Socket->send(msg);
}
public:
zmq::context_t* m_Context;
zmq::socket_t* m_Socket;
};
struct ThreadSend{
ThreadSend(QbPublisher* p_Publisher) : m_MessagePublisher(p_Publisher){}
void operator ()(){
while (true){
string a("dummy");
m_MessagePublisher->send(a);
std::cout << "sending: " << a << std::endl;
sleep(1);
}
}
QbPublisher* m_MessagePublisher;
};
int main(int argc, char** argv){
if (argc > 1){
std::cout << "client mode" << std::endl;
QbReceiver r(DATA_PORT);
r.listen();
}
else {
std::cout << "server mode" << std::endl;
QbPublisher p(DATA_PORT);
//QbReceiver r(CMD_PORT);
//r.listen();
ThreadSend sender(&p);
tbb::tbb_thread t(sender);
while (true){
stringstream ss;
ss << "from server: ";
static int counter = 0;
ss << counter++;
std::string test(ss.str());
p.send(test);
std::cout << "sending: " << test << std::endl;
sleep(1);
}
}
}
DISCLAIMER: This e-mail message and any attachments are intended solely for the use of the individual or entity to which it is addressed and may contain information that is confidential or legally privileged. If you are not the intended recipient, you are hereby notified that any dissemination, distribution, copying or other use of this message or its attachments is strictly prohibited. If you have received this message in error, please notify the sender immediately and permanently delete this message and any attachments.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20100127/ec03951c/attachment.htm>
More information about the zeromq-dev
mailing list