[zeromq-dev] zmq crash when using PGM
纪明
jiming at yafco.com
Mon Aug 13 08:52:39 CEST 2018
Hi all:
We are using ZMQ to do some multicast work. The service keep
crashing, and we found it has something to do with pgm_receiver.
Specifically, there is a function called
zmq::pgm_receiver_t::restart_input(), when it receives some data, it
calls decoder to decode the message. On line 132, it checks if the
message size is greater than zero. If yes, it will call process_input()
function to decode the message. However, when insize is greater than
zero, inpos could point to null. When this happens, zmq crashes when
calling memcpy to copy something to the memory that inpos points to.
This actually looks like a threading issue to us.
We really appreciate if anyone familiar with this zmq could point
out a solution to this. We are using zmq in a real time environment,
occassional message drop is more acceptable than crashing the service.
We tried to change the source code a little bit, from "if (insize > 0)"
to "if (insize > 0 && inpos)". It caused other problem.
Thanks a lot in advance.
Ming
The relevant zmq code looks like this:
void zmq::pgm_receiver_t::restart_input ()
{
zmq_assert (session != NULL);
zmq_assert (active_tsi != NULL);
const peers_t::iterator it = peers.find (*active_tsi);
zmq_assert (it != peers.end ());
zmq_assert (it->second.joined);
// Push the pending message into the session.
int rc = session->push_msg (it->second.decoder->msg ());
errno_assert (rc == 0);
if (insize > 0) {
rc = process_input (it->second.decoder);
if (rc == -1) {
// HWM reached; we will try later.
if (errno == EAGAIN) {
session->flush ();
return;
}
// Data error. Delete message decoder, mark the
// peer as not joined and drop remaining data.
it->second.joined = false;
LIBZMQ_DELETE(it->second.decoder);
insize = 0;
}
}
// Resume polling.
set_pollin (pipe_handle);
set_pollin (socket_handle);
active_tsi = NULL;
in_event ();
}
int zmq::pgm_receiver_t::process_input (v1_decoder_t *decoder)
{
zmq_assert (session != NULL);
while (insize > 0) {
size_t n = 0;
int rc = decoder->decode (inpos, insize, n);
if (rc == -1)
return -1;
inpos += n;
insize -= n;
if (rc == 0)
break;
rc = session->push_msg (decoder->msg ());
if (rc == -1) {
errno_assert (errno == EAGAIN);
return -1;
}
}
return 0;
}
More information about the zeromq-dev
mailing list