[zeromq-dev] ZMQ DEALER/ROUTER socket recv and send hang

huang jun hjwsm1989 at gmail.com
Thu Jul 21 15:15:17 CEST 2016


I use DEALER/ROUTER for message communication,
we use MsgrTcpServer for message handling

int MsgrTcpServer::start()
{
    // Prepare our context and sockets
    zmq::context_t context (1);
    zmq::socket_t clients (context, ZMQ_ROUTER);

    dout(10) << " bind_addr " << bind_addr
       << " bind_port " << bind_port << dendl;
    string tcp_addr = get_tcp_addr();
    try {
         clients.bind (tcp_addr.c_str());
          dout(10) << "tcp bind to " << tcp_addr << " succeed" << dendl;
    } catch (zmq::error_t &e) {
       dout(1) << "tcp bind to " << tcp_addr << " failed, ret=" <<
e.num() << dendl;
        return e.num();
    }
    zmq::socket_t workers (context, ZMQ_DEALER);
    workers.bind ("inproc://workers");

    // Launch pool of worker threads
    for (int thread_nbr = 0; thread_nbr < thread_num; thread_nbr++) {
       workerlist.push_back(new MsgrTcpServerWorker((void *)&context, msgr));
    }
    list<MsgrTcpServerWorker *>::iterator i = workerlist.begin();
    for ( ;i != workerlist.end(); i++)
    {
        (*i)->start();
    }

    // Connect work threads to client threads via a queue
    zmq::proxy (clients, workers, NULL);
    return 0;
}

and the MsgrTcpServerWorker is the job worker thread

void *MsgrTcpServerWorker::entry ()
{
  zmq::context_t *context = (zmq::context_t *) arg;
  zmq::socket_t socket (*context, ZMQ_REP);
  socket.connect ("inproc://workers");

while (true) {
     //  Wait for next request from client
     zmq::message_t request;
     dout(0) << "going to recv msg from client" << dendl;
     socket.recv (&request);
     dout(0) << "recved msg from client" << dendl;
     //print_memusage();
     Handler *handler = Handler::get_handler(&socket,
string((char*)request.data()));
     try
     {
          handler->handle_message(string((char*)request.data()), msgr);
     }
     catch(exception &e)
     {
        //dout(10)<<"catch error:"<<e.what()<<dendl;
        ostringstream oss;
        BackTrace bt(1);
        bt.print(oss);
        dout(0) << oss.str() << dendl;
     }
    //dout(10) << "goto release handler" << dendl;
    if (handler)
         delete handler;
     handler = NULL;
     //dout(10) << "after release handler" << dendl;
     //print_memusage();
  }
  return ((void *)NULL);
}

The problem is : program can not recv message, but it's alive.
here is the main process's log
[2016-07-21 15:53:47.642555] 10 reply message done
[2016-07-21 15:53:47.642662] 0 going to recv msg from client
it never print "recved msg from client", so think it's stucked in recv function.

what did i do so far,
1. we use a test program to send message to this process, here is the log:
Connecting to tcp://0.0.0.0:5556
do send request...

yes, we can connect to the listened socket, but can not send data to it.

2. the fd used by the process
cmon    1451579 root   23u  0000     0,9        0     5154 anon_inode
cmon    1451579 root   24u  IPv4 2687562      0t0      TCP *:freeciv (LISTEN)
cmon    1451579 root   25u  0000     0,9        0     5154 anon_inode
cmon    1451579 root   26u  0000     0,9        0     5154 anon_inode
cmon    1451579 root   27u  0000     0,9        0     5154 anon_inode
cmon    1451579 root   28u  IPv4 2687563      0t0      TCP
192.168.3.103:freeciv->192.168.3.102:35568 (ESTABLISHED)
cmon    1451579 root   29u  IPv4 2687564      0t0      TCP
192.168.3.103:freeciv->192.168.3.102:35575 (ESTABLISHED)
cmon    1451579 root   30u  IPv4 2688180      0t0      TCP
192.168.3.103:freeciv->192.168.3.102:42789 (ESTABLISHED)

3.we also use strace to trace the process, the log is
1451582 write(4, "[2016-07-21 15:53:47.642662] 0 g"..., 61 <unfinished ...>
1451579 poll([{fd=25, events=POLLIN}], 1, 0 <unfinished ...>
1451582 <... write resumed> )           = 61
1451579 <... poll resumed> )            = 1 ([{fd=25, revents=POLLIN}])
1451582 poll([{fd=27, events=POLLIN}], 1, -1 <unfinished ...>
1451579 read(25, "\1\0\0\0\0\0\0\0", 8) = 8
1451579 poll([{fd=25, events=POLLIN}], 1, 0) = 0 (Timeout)
1451579 poll([{fd=23, events=POLLIN}], 1, 0) = 0 (Timeout)
1451579 write(21, "\1\0\0\0\0\0\0\0", 8) = 8
1451579 poll([{fd=23, events=POLLIN}, {fd=25, events=POLLIN}], 2, 0) =
0 (Timeout)
1451579 poll([{fd=23, events=POLLIN}], 1, 0) = 0 (Timeout)
1451579 poll([{fd=25, events=POLLIN}], 1, 0) = 0 (Timeout)
1451579 poll([{fd=23, events=POLLIN}, {fd=25, events=POLLIN}], 2, -1
it's that the poll of zmq_proxy failed?

4. the socket stat
[root at lab207 src]# netstat -n|grep 5556
tcp        0      0 192.168.3.103:5556          192.168.3.102:35568
     ESTABLISHED
tcp       10      0 127.0.0.1:5556              127.0.0.1:39115
     ESTABLISHED
tcp        0      0 127.0.0.1:39115             127.0.0.1:5556
     ESTABLISHED
tcp        0      0 192.168.3.103:5556          192.168.3.102:42789
     ESTABLISHED
tcp        0      0 192.168.3.103:5556          192.168.3.102:35575
     ESTABLISHED

5. The stack of the process
[root at lab207 rpms]# pstack 1451579
Thread 4 (Thread 0x7f91fb597700 (LWP 1451580)):
#0  0x0000003b33ce8f33 in epoll_wait () from /lib64/libc.so.6
#1  0x00007f91fba2fdfd in zmq::epoll_t::loop() () from
/usr/local/lib/libzmq.so.3
#2  0x00007f91fba4855b in thread_routine () from /usr/local/lib/libzmq.so.3
#3  0x0000003b34407a51 in start_thread () from /lib64/libpthread.so.0
#4  0x0000003b33ce893d in clone () from /lib64/libc.so.6
Thread 3 (Thread 0x7f91fa565700 (LWP 1451582)):
#0  0x0000003b33cdf113 in poll () from /lib64/libc.so.6
#1  0x00007f91fba3e1a1 in zmq::signaler_t::wait(int) () from
/usr/local/lib/libzmq.so.3
#2  0x00007f91fba3398a in zmq::mailbox_t::recv(zmq::command_t*, int)
() from /usr/local/lib/libzmq.so.3
#3  0x00007f91fba3efa4 in zmq::socket_base_t::process_commands(int,
bool) () from /usr/local/lib/libzmq.so.3
#4  0x00007f91fba3f1ef in zmq::socket_base_t::recv(zmq::msg_t*, int)
() from /usr/local/lib/libzmq.so.3
#5  0x00007f91fba4f029 in s_recvmsg(zmq::socket_base_t*, zmq_msg_t*,
int) () from /usr/local/lib/libzmq.so.3
#6  0x00000000005b8507 in zmq::socket_t::recv(zmq::message_t*, int) ()
#7  0x00000000005b5f9c in MsgrTcpServerWorker::entry() ()
#8  0x00000000004a85c6 in Thread::entry_wrapper() ()
#9  0x00000000004a855c in Thread::_entry_func(void*) ()
#10 0x0000003b34407a51 in start_thread () from /lib64/libpthread.so.0
#11 0x0000003b33ce893d in clone () from /lib64/libc.so.6
Thread 2 (Thread 0x7f91f9d64700 (LWP 1451583)):
#0  0x0000003b33cdf113 in poll () from /lib64/libc.so.6
#1  0x00007f91fba3e1a1 in zmq::signaler_t::wait(int) () from
/usr/local/lib/libzmq.so.3
#2  0x00007f91fba3398a in zmq::mailbox_t::recv(zmq::command_t*, int)
() from /usr/local/lib/libzmq.so.3
#3  0x00007f91fba3efa4 in zmq::socket_base_t::process_commands(int,
bool) () from /usr/local/lib/libzmq.so.3
#4  0x00007f91fba3f1ef in zmq::socket_base_t::recv(zmq::msg_t*, int)
() from /usr/local/lib/libzmq.so.3
#5  0x00007f91fba4f029 in s_recvmsg(zmq::socket_base_t*, zmq_msg_t*,
int) () from /usr/local/lib/libzmq.so.3
#6  0x00000000005b8507 in zmq::socket_t::recv(zmq::message_t*, int) ()
#7  0x00000000005b5f9c in MsgrTcpServerWorker::entry() ()
#8  0x00000000004a85c6 in Thread::entry_wrapper() ()
#9  0x00000000004a855c in Thread::_entry_func(void*) ()
#10 0x0000003b34407a51 in start_thread () from /lib64/libpthread.so.0
#11 0x0000003b33ce893d in clone () from /lib64/libc.so.6
Thread 1 (Thread 0x7f91fb5997e0 (LWP 1451579)):
#0  0x0000003b33cdf113 in poll () from /lib64/libc.so.6
#1  0x00007f91fba4fba8 in zmq_poll () from /usr/local/lib/libzmq.so.3
#2  0x00007f91fba3b101 in zmq::proxy(zmq::socket_base_t*,
zmq::socket_base_t*, zmq::socket_base_t*) () from
/usr/local/lib/libzmq.so.3
#3  0x00000000005b8183 in zmq::proxy(void*, void*, void*) ()
#4  0x00000000005b6a30 in MsgrTcpServer::start() ()
#5  0x00000000005b726c in Msgr::start() ()
#6  0x00000000004870ba in main ()

6. the stack in proc filesystem
[root at lab207 src]# cat /proc/1451579/stack
[<ffffffff8118e409>] poll_schedule_timeout+0x49/0x70
[<ffffffff8118e9a0>] do_poll+0x250/0x2b0
[<ffffffff8118f3df>] do_sys_poll+0x1bf/0x240
[<ffffffff8118f59c>] do_restart_poll+0x3c/0x60
[<ffffffff81066fcf>] sys_restart_syscall+0x1f/0x30
[<ffffffff8155f1d9>] system_call_fastpath+0x16/0x1b
[<ffffffffffffffff>] 0xffffffffffffffff

I have no idea why this happen, what should i do to dignose this problem.
Any tips or advices are welcome, as i can not think other way to
locate the problem.

I also found that, when i killed the process, the 'netstat -n|grep 5556' shows
the socket still in ESTABLISHED state, is that normal?


-- 
Thank you!
HuangJun



More information about the zeromq-dev mailing list