[zeromq-dev] ZMQ DEALER/ROUTER socket recv and send hang
huang jun
hjwsm1989 at gmail.com
Thu Jul 21 15:15:17 CEST 2016
I use DEALER/ROUTER for message communication,
we use MsgrTcpServer for message handling
int MsgrTcpServer::start()
{
// Prepare our context and sockets
zmq::context_t context (1);
zmq::socket_t clients (context, ZMQ_ROUTER);
dout(10) << " bind_addr " << bind_addr
<< " bind_port " << bind_port << dendl;
string tcp_addr = get_tcp_addr();
try {
clients.bind (tcp_addr.c_str());
dout(10) << "tcp bind to " << tcp_addr << " succeed" << dendl;
} catch (zmq::error_t &e) {
dout(1) << "tcp bind to " << tcp_addr << " failed, ret=" <<
e.num() << dendl;
return e.num();
}
zmq::socket_t workers (context, ZMQ_DEALER);
workers.bind ("inproc://workers");
// Launch pool of worker threads
for (int thread_nbr = 0; thread_nbr < thread_num; thread_nbr++) {
workerlist.push_back(new MsgrTcpServerWorker((void *)&context, msgr));
}
list<MsgrTcpServerWorker *>::iterator i = workerlist.begin();
for ( ;i != workerlist.end(); i++)
{
(*i)->start();
}
// Connect work threads to client threads via a queue
zmq::proxy (clients, workers, NULL);
return 0;
}
and the MsgrTcpServerWorker is the job worker thread
void *MsgrTcpServerWorker::entry ()
{
zmq::context_t *context = (zmq::context_t *) arg;
zmq::socket_t socket (*context, ZMQ_REP);
socket.connect ("inproc://workers");
while (true) {
// Wait for next request from client
zmq::message_t request;
dout(0) << "going to recv msg from client" << dendl;
socket.recv (&request);
dout(0) << "recved msg from client" << dendl;
//print_memusage();
Handler *handler = Handler::get_handler(&socket,
string((char*)request.data()));
try
{
handler->handle_message(string((char*)request.data()), msgr);
}
catch(exception &e)
{
//dout(10)<<"catch error:"<<e.what()<<dendl;
ostringstream oss;
BackTrace bt(1);
bt.print(oss);
dout(0) << oss.str() << dendl;
}
//dout(10) << "goto release handler" << dendl;
if (handler)
delete handler;
handler = NULL;
//dout(10) << "after release handler" << dendl;
//print_memusage();
}
return ((void *)NULL);
}
The problem is : program can not recv message, but it's alive.
here is the main process's log
[2016-07-21 15:53:47.642555] 10 reply message done
[2016-07-21 15:53:47.642662] 0 going to recv msg from client
it never print "recved msg from client", so think it's stucked in recv function.
what did i do so far,
1. we use a test program to send message to this process, here is the log:
Connecting to tcp://0.0.0.0:5556
do send request...
yes, we can connect to the listened socket, but can not send data to it.
2. the fd used by the process
cmon 1451579 root 23u 0000 0,9 0 5154 anon_inode
cmon 1451579 root 24u IPv4 2687562 0t0 TCP *:freeciv (LISTEN)
cmon 1451579 root 25u 0000 0,9 0 5154 anon_inode
cmon 1451579 root 26u 0000 0,9 0 5154 anon_inode
cmon 1451579 root 27u 0000 0,9 0 5154 anon_inode
cmon 1451579 root 28u IPv4 2687563 0t0 TCP
192.168.3.103:freeciv->192.168.3.102:35568 (ESTABLISHED)
cmon 1451579 root 29u IPv4 2687564 0t0 TCP
192.168.3.103:freeciv->192.168.3.102:35575 (ESTABLISHED)
cmon 1451579 root 30u IPv4 2688180 0t0 TCP
192.168.3.103:freeciv->192.168.3.102:42789 (ESTABLISHED)
3.we also use strace to trace the process, the log is
1451582 write(4, "[2016-07-21 15:53:47.642662] 0 g"..., 61 <unfinished ...>
1451579 poll([{fd=25, events=POLLIN}], 1, 0 <unfinished ...>
1451582 <... write resumed> ) = 61
1451579 <... poll resumed> ) = 1 ([{fd=25, revents=POLLIN}])
1451582 poll([{fd=27, events=POLLIN}], 1, -1 <unfinished ...>
1451579 read(25, "\1\0\0\0\0\0\0\0", 8) = 8
1451579 poll([{fd=25, events=POLLIN}], 1, 0) = 0 (Timeout)
1451579 poll([{fd=23, events=POLLIN}], 1, 0) = 0 (Timeout)
1451579 write(21, "\1\0\0\0\0\0\0\0", 8) = 8
1451579 poll([{fd=23, events=POLLIN}, {fd=25, events=POLLIN}], 2, 0) =
0 (Timeout)
1451579 poll([{fd=23, events=POLLIN}], 1, 0) = 0 (Timeout)
1451579 poll([{fd=25, events=POLLIN}], 1, 0) = 0 (Timeout)
1451579 poll([{fd=23, events=POLLIN}, {fd=25, events=POLLIN}], 2, -1
it's that the poll of zmq_proxy failed?
4. the socket stat
[root at lab207 src]# netstat -n|grep 5556
tcp 0 0 192.168.3.103:5556 192.168.3.102:35568
ESTABLISHED
tcp 10 0 127.0.0.1:5556 127.0.0.1:39115
ESTABLISHED
tcp 0 0 127.0.0.1:39115 127.0.0.1:5556
ESTABLISHED
tcp 0 0 192.168.3.103:5556 192.168.3.102:42789
ESTABLISHED
tcp 0 0 192.168.3.103:5556 192.168.3.102:35575
ESTABLISHED
5. The stack of the process
[root at lab207 rpms]# pstack 1451579
Thread 4 (Thread 0x7f91fb597700 (LWP 1451580)):
#0 0x0000003b33ce8f33 in epoll_wait () from /lib64/libc.so.6
#1 0x00007f91fba2fdfd in zmq::epoll_t::loop() () from
/usr/local/lib/libzmq.so.3
#2 0x00007f91fba4855b in thread_routine () from /usr/local/lib/libzmq.so.3
#3 0x0000003b34407a51 in start_thread () from /lib64/libpthread.so.0
#4 0x0000003b33ce893d in clone () from /lib64/libc.so.6
Thread 3 (Thread 0x7f91fa565700 (LWP 1451582)):
#0 0x0000003b33cdf113 in poll () from /lib64/libc.so.6
#1 0x00007f91fba3e1a1 in zmq::signaler_t::wait(int) () from
/usr/local/lib/libzmq.so.3
#2 0x00007f91fba3398a in zmq::mailbox_t::recv(zmq::command_t*, int)
() from /usr/local/lib/libzmq.so.3
#3 0x00007f91fba3efa4 in zmq::socket_base_t::process_commands(int,
bool) () from /usr/local/lib/libzmq.so.3
#4 0x00007f91fba3f1ef in zmq::socket_base_t::recv(zmq::msg_t*, int)
() from /usr/local/lib/libzmq.so.3
#5 0x00007f91fba4f029 in s_recvmsg(zmq::socket_base_t*, zmq_msg_t*,
int) () from /usr/local/lib/libzmq.so.3
#6 0x00000000005b8507 in zmq::socket_t::recv(zmq::message_t*, int) ()
#7 0x00000000005b5f9c in MsgrTcpServerWorker::entry() ()
#8 0x00000000004a85c6 in Thread::entry_wrapper() ()
#9 0x00000000004a855c in Thread::_entry_func(void*) ()
#10 0x0000003b34407a51 in start_thread () from /lib64/libpthread.so.0
#11 0x0000003b33ce893d in clone () from /lib64/libc.so.6
Thread 2 (Thread 0x7f91f9d64700 (LWP 1451583)):
#0 0x0000003b33cdf113 in poll () from /lib64/libc.so.6
#1 0x00007f91fba3e1a1 in zmq::signaler_t::wait(int) () from
/usr/local/lib/libzmq.so.3
#2 0x00007f91fba3398a in zmq::mailbox_t::recv(zmq::command_t*, int)
() from /usr/local/lib/libzmq.so.3
#3 0x00007f91fba3efa4 in zmq::socket_base_t::process_commands(int,
bool) () from /usr/local/lib/libzmq.so.3
#4 0x00007f91fba3f1ef in zmq::socket_base_t::recv(zmq::msg_t*, int)
() from /usr/local/lib/libzmq.so.3
#5 0x00007f91fba4f029 in s_recvmsg(zmq::socket_base_t*, zmq_msg_t*,
int) () from /usr/local/lib/libzmq.so.3
#6 0x00000000005b8507 in zmq::socket_t::recv(zmq::message_t*, int) ()
#7 0x00000000005b5f9c in MsgrTcpServerWorker::entry() ()
#8 0x00000000004a85c6 in Thread::entry_wrapper() ()
#9 0x00000000004a855c in Thread::_entry_func(void*) ()
#10 0x0000003b34407a51 in start_thread () from /lib64/libpthread.so.0
#11 0x0000003b33ce893d in clone () from /lib64/libc.so.6
Thread 1 (Thread 0x7f91fb5997e0 (LWP 1451579)):
#0 0x0000003b33cdf113 in poll () from /lib64/libc.so.6
#1 0x00007f91fba4fba8 in zmq_poll () from /usr/local/lib/libzmq.so.3
#2 0x00007f91fba3b101 in zmq::proxy(zmq::socket_base_t*,
zmq::socket_base_t*, zmq::socket_base_t*) () from
/usr/local/lib/libzmq.so.3
#3 0x00000000005b8183 in zmq::proxy(void*, void*, void*) ()
#4 0x00000000005b6a30 in MsgrTcpServer::start() ()
#5 0x00000000005b726c in Msgr::start() ()
#6 0x00000000004870ba in main ()
6. the stack in proc filesystem
[root at lab207 src]# cat /proc/1451579/stack
[<ffffffff8118e409>] poll_schedule_timeout+0x49/0x70
[<ffffffff8118e9a0>] do_poll+0x250/0x2b0
[<ffffffff8118f3df>] do_sys_poll+0x1bf/0x240
[<ffffffff8118f59c>] do_restart_poll+0x3c/0x60
[<ffffffff81066fcf>] sys_restart_syscall+0x1f/0x30
[<ffffffff8155f1d9>] system_call_fastpath+0x16/0x1b
[<ffffffffffffffff>] 0xffffffffffffffff
I have no idea why this happen, what should i do to dignose this problem.
Any tips or advices are welcome, as i can not think other way to
locate the problem.
I also found that, when i killed the process, the 'netstat -n|grep 5556' shows
the socket still in ESTABLISHED state, is that normal?
--
Thank you!
HuangJun
More information about the zeromq-dev
mailing list