[zeromq-dev] zmq crash when using PGM

纪明 jiming at yafco.com
Mon Aug 13 08:52:39 CEST 2018


Hi all:

     We are using ZMQ to do some multicast work. The service keep 
crashing, and we found it has something to do with pgm_receiver.

     Specifically, there is a function called 
zmq::pgm_receiver_t::restart_input(), when it receives some data, it 
calls decoder to decode the message. On line 132, it checks if the 
message size is greater than zero. If yes, it will call process_input() 
function to decode the message. However, when insize is greater than 
zero, inpos could point to null. When this happens, zmq crashes when 
calling memcpy to copy something to the memory that inpos points to. 
This actually looks like a threading issue to us.

     We really appreciate if anyone familiar with this zmq could point 
out a solution to this. We are using zmq in a real time environment, 
occassional message drop is more acceptable than crashing the service. 
We tried to change the source code a little bit, from "if (insize > 0)" 
to "if (insize > 0 && inpos)". It caused other problem.

Thanks a lot in advance.
Ming


The relevant zmq code looks like this:

void zmq::pgm_receiver_t::restart_input ()
{
     zmq_assert (session != NULL);
     zmq_assert (active_tsi != NULL);

     const peers_t::iterator it = peers.find (*active_tsi);
     zmq_assert (it != peers.end ());
     zmq_assert (it->second.joined);

     //  Push the pending message into the session.
     int rc = session->push_msg (it->second.decoder->msg ());
     errno_assert (rc == 0);

     if (insize > 0) {
         rc = process_input (it->second.decoder);
         if (rc == -1) {
             //  HWM reached; we will try later.
             if (errno == EAGAIN) {
                 session->flush ();
                 return;
             }
             //  Data error. Delete message decoder, mark the
             //  peer as not joined and drop remaining data.
             it->second.joined = false;
             LIBZMQ_DELETE(it->second.decoder);
             insize = 0;
         }
     }

     //  Resume polling.
     set_pollin (pipe_handle);
     set_pollin (socket_handle);

     active_tsi = NULL;
     in_event ();
}

int zmq::pgm_receiver_t::process_input (v1_decoder_t *decoder)
{
     zmq_assert (session != NULL);

     while (insize > 0) {
         size_t n = 0;
         int rc = decoder->decode (inpos, insize, n);
         if (rc == -1)
             return -1;
         inpos += n;
         insize -= n;
         if (rc == 0)
             break;
         rc = session->push_msg (decoder->msg ());
         if (rc == -1) {
             errno_assert (errno == EAGAIN);
             return -1;
         }
     }
     return 0;
}







More information about the zeromq-dev mailing list