[zeromq-dev] Assertion failure with socket monitor when clients disconnect
Auer, Jens
jens.auer at cgi.com
Tue Oct 11 13:03:52 CEST 2016
Hi,
I’ve patched socket_base_t to protect the monitor socket with a mutex. There was an unused mutex “sync” already available in socket_base_t so I’ve used this J. If you want I can upload a pullrequest.
Cheers,
Jens
--
Dr. Jens Auer | CGI | Software Engineer
CGI Deutschland Ltd. & Co. KG
Rheinstraße 95 | 64295 Darmstadt | Germany
T: +49 6151 36860 154
jens.auer at cgi.com<mailto:jens.auer at cgi.com>
Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter de.cgi.com/pflichtangaben<http://de.cgi.com/pflichtangaben>.
CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to CGI Group Inc. and its affiliates may be contained in this message. If you are not a recipient indicated or intended in this message (or responsible for delivery of this message to such person), or you think for any reason that this message may have been addressed to you in error, you may not use or copy or deliver this message to anyone else. In such case, you should destroy this message and are asked to notify the sender by reply e-mail.
From: zeromq-dev [mailto:zeromq-dev-bounces at lists.zeromq.org] On Behalf Of Auer, Jens
Sent: 11 October 2016 09:31
To: ZeroMQ development list
Subject: Re: [zeromq-dev] Assertion failure with socket monitor when clients disconnect
Hi Doron,
I'm not a big fun of monitoring and prefer others solution. I actually think of deprecating it, the implementaion is buggy and actually violate the don't share the socket between threads, which is probably what causing your issue.
That’s quite a shock given that it is part of the stable API. I think in this case it should not only be deprecated but removed from API completely. On the other hand, something to monitor the connections is probably needed. Our clients are very concerned with TCP connections and would always insist on logging capabilities of TCP connection state changes. It would also be nice to get some statistics from the connections, e.g. number of received bytes or messages.
Anyway, you have two options, I think you can manage without the monitoring, I can try and help you find other solutions. Another option is to try and not listen to all events, maybe this will avoid the sharing violation.
I would like to replace the monitor with something else, but I am not sure how to do this given our requirements. Currently, we use the monitor to
- Log TCP connection events for connect, disconnect, accept, listen and reconnection retry events
- Limit the number of reconnection attempts
Unfortunately this includes disconnect events which are exactly the events causing the crashes right now. The other events are probably rare enough that there is no problem.
Is there another way to implement this without using the socket monitor? We use Router, Dealer, Sub, XPub and Stream sockets. I have full control over the protocol between the Router/Dealer, but I cannot change the protocol between Pub/Sub and external clients over Stream sockets, so I cannot add control messages here.
I think an easy fix for my issues would be to add a mutex to protect the monitor socket in socket_base_t. I guess this was not done because it would block the thread and probably impact performance, but at least it will work correctly and not crash. It should be good enough for our use-case.
An idea for a non-blocking solution would be to have an independent monitor class as there are listener and accepter classes which has an inproc SUB socket and the PAIR socket. Each ZeroMQ socket would then create a monitor when it is created, and each session object would have a PUB socket to broadcast events to the monitor. The monitor then forwards events received from individual clients/sessions on different IO threads to the PAIR socket where the application code can connect.
Best wishes,
Jens
On Oct 10, 2016 17:45, "Auer, Jens" <jens.auer at cgi.com<mailto:jens.auer at cgi.com>> wrote:
Hi,
I have an issue with socket monitor on a ZMQ_STREAM socket. We have recognized that our sometimes software crashes with an internal ZeroMQ assertions when non-ZeroMQ-clients disconnect.
I can reproduce this with a small test program that
- starts a context with 6 threads
- create a ZMQ_SUB socket with a monitor
- create a ZMQ_PAIR socket and connect it to the monitor for the SUB socket
- creates a ZMQ_STREAM socket and binds it
- creates a socket monitor with a ZMQ_PAIR socket to get events
- runs in loop zmq_polling on the two sockets
When I start 500 instances of this program and connect 6 clients to each of them (I use netcat >/dev/null), and then kill the clients, I randomly get crashes with ZeroMQ assertion failures.
Most of the time the failure is in the code processing the socket monitor event which I copied from the man page:
Program terminated with signal 6, Aborted.
#0 0x00007fb5644435f7 in raise () from /lib64/libc.so.6
Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64 apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64 expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64 libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64 libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64 log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64 nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64 nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64 openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64
(gdb) bt
#0 0x00007fb5644435f7 in raise () from /lib64/libc.so.6
#1 0x00007fb564444ce8 in abort () from /lib64/libc.so.6
#2 0x00007fb56443c566 in __assert_fail_base () from /lib64/libc.so.6
#3 0x00007fb56443c612 in __assert_fail () from /lib64/libc.so.6
#4 0x00000000004b16f9 in get_monitor_event (monitor=0x2512b90, value=0x0, address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:47
#5 0x00000000004b423e in main (argc=3, argv=0x7fffe10a2668) at /home/auerj/MDAF/src/test/pepSim.cpp:268
Sometimes, I get an internal zmq assertion:
Program terminated with signal 6, Aborted.
#0 0x00007f3e75f185f7 in raise () from /lib64/libc.so.6
Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64 apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64 expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64 libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64 libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64 log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64 nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64 nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64 openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64
(gdb) bt
#0 0x00007f3e75f185f7 in raise () from /lib64/libc.so.6
#1 0x00007f3e75f19ce8 in abort () from /lib64/libc.so.6
#2 0x00007f3e76cff769 in zmq::zmq_abort (errmsg_=errmsg_ at entry=0x7f3e76d328ed "check ()") at src/err.cpp:83
#3 0x00007f3e76d06232 in zmq::msg_t::size (this=this at entry=0x7ffd200e9a50) at src/msg.cpp:254
#4 0x00007f3e76d2e035 in zmq_msg_size (msg_=msg_ at entry=0x7ffd200e9a50) at src/zmq.cpp:627
#5 0x00007f3e76d2e415 in s_recvmsg (s_=<optimized out>, msg_=0x7ffd200e9a50, flags_=<optimized out>) at src/zmq.cpp:459
#6 0x00000000004b1654 in get_monitor_event (monitor=0x1d3fb90, value=0x0, address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:45
#7 0x00000000004b423e in main (argc=3, argv=0x7ffd200e9d68) at /home/auerj/MDAF/src/test/pepSim.cpp:268
I've added assertions to the internal function calls in ZeroMQ to better trace the error. It turns out that the message is already corrupted when
it is read from the internal pipe:
#0 0x00007fd3bc9285f7 in raise () from /lib64/libc.so.6
Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64 apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64 expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64 libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64 libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64 log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64 nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64 nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64 openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64
(gdb) bt
#0 0x00007fd3bc9285f7 in raise () from /lib64/libc.so.6
#1 0x00007fd3bc929ce8 in abort () from /lib64/libc.so.6
#2 0x00007fd3bd70f769 in zmq::zmq_abort (errmsg_=errmsg_ at entry=0x7fd3bd742d08 "queue.front().check()") at src/err.cpp:83
#3 0x00007fd3bd71eb1f in zmq::ypipe_t<zmq::msg_t, 256>::read (this=0x1f978f0, value_=0x7ffd760e8540) at src/ypipe.hpp:346
#4 0x00007fd3bd71e259 in zmq::pipe_t::read (this=0x1f9f9a0, msg_=msg_ at entry=0x7ffd760e8540) at src/pipe.cpp:162
#5 0x00007fd3bd71c63e in zmq::pair_t::xrecv (this=0x1f96b90, msg_=0x7ffd760e8540) at src/pair.cpp:114
#6 0x00007fd3bd728b5b in zmq::socket_base_t::recv (this=0x1f96b90, msg_=msg_ at entry=0x7ffd760e8540, flags_=0) at src/socket_base.cpp:910
#7 0x00007fd3bd73e809 in s_recvmsg (s_=<optimized out>, msg_=0x7ffd760e8540, flags_=<optimized out>) at src/zmq.cpp:456
#8 0x00000000004b1654 in get_monitor_event (monitor=0x1f96b90, value=0x0, address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:45
#9 0x00000000004b423e in main (argc=3, argv=0x7ffd760e8858) at /home/auerj/MDAF/src/test/pepSim.cpp:268
The assertion is triggered by additional checks in the ypipe_t class which I specialized for zmq::msg_t to add the checks. The code is:
311 inline bool check_read ()
312 {
313 // Was the value prefetched already? If so, return.
314 if (&queue.front () != r && r)
315 return true;
316
317 // There's no prefetched value, so let us prefetch more values.
318 // Prefetching is to simply retrieve the
319 // pointer from c in atomic fashion. If there are no
320 // items to prefetch, set c to NULL (using compare-and-swap).
321 r = c.cas (&queue.front (), NULL);
322
323 // If there are no elements prefetched, exit.
324 // During pipe's lifetime r should never be NULL, however,
325 // it can happen during pipe shutdown when items
326 // are being deallocated.
327 if (&queue.front () == r || !r)
328 return false;
329
330 zmq_assert( queue.front().check() );
331
332 // There was at least one value prefetched.
333 return true;
334 }
335
336 // Reads an item from the pipe. Returns false if there is no value.
337 // available.
338 inline bool read (T *value_)
339 {
340 // Try to prefetch a value.
341 if (!check_read ())
342 return false;
343
344 // There was at least one value prefetched.
345 // Return it to the caller.
346 zmq_assert( queue.front().check() );
347 *value_ = queue.front ();
348 zmq_assert( value_->check() );
349 queue.pop ();
350 zmq_assert( value_->check() );
351
352 return true;
353 }
I have another assertion sometimes when sending the monitor event, but here I am not sure if this is due to my modification
adding the assertion. The stack trace is:
#0 0x00007feaecc625f7 in raise () from /lib64/libc.so.6
#1 0x00007feaecc63ce8 in abort () from /lib64/libc.so.6
#2 0x00007feaeda49769 in zmq::zmq_abort (errmsg_=errmsg_ at entry=0x7feaeda7ccf3 "queue.back().check()") at src/err.cpp:83
#3 0x00007feaeda58856 in zmq::ypipe_t<zmq::msg_t, 256>::write (this=0xaa2670, value_=..., incomplete_=<optimized out>) at src/ypipe.hpp:259
#4 0x00007feaeda57645 in zmq::pipe_t::write (this=0xaac1c0, msg_=msg_ at entry=0x7feae7ce10f0) at src/pipe.cpp:215
#5 0x00007feaeda56490 in zmq::pair_t::xsend (this=0xaa1ed0, msg_=0x7feae7ce10f0) at src/pair.cpp:90
#6 0x00007feaeda62971 in zmq::socket_base_t::send (this=this at entry=0xaa1ed0, msg_=msg_ at entry=0x7feae7ce10f0, flags_=flags_ at entry=2) at src/socket_base.cpp:843
#7 0x00007feaeda7846c in s_sendmsg (s_=0xaa1ed0, msg_=0x7feae7ce10f0, flags_=2) at src/zmq.cpp:346
#8 0x00007feaeda62d93 in zmq::socket_base_t::monitor_event (this=0xaa16a0, event_=event_ at entry=512, value_=25, addr_="tcp://192.168.120.1:9494<http://192.168.120.1:9494>") at src/socket_base.cpp:1357
#9 0x00007feaeda62f1d in zmq::socket_base_t::event_disconnected (this=<optimized out>, addr_="tcp://192.168.120.1:9494<http://192.168.120.1:9494>", fd_=<optimized out>) at src/socket_base.cpp:1344
#10 0x00007feaeda6b45a in zmq::stream_engine_t::error (this=this at entry=0x7feae00022c0, reason=reason at entry=zmq::stream_engine_t::connection_error) at src/stream_engine.cpp:936
#11 0x00007feaeda6c5db in zmq::stream_engine_t::in_event (this=0x7feae00022c0) at src/stream_engine.cpp:300
#12 0x00007feaeda493ee in zmq::epoll_t::loop (this=0xaa0830) at src/epoll.cpp:176
#13 0x00007feaeda73150 in thread_routine (arg_=0xaa08b0) at src/thread.cpp:96
#14 0x00007feaed816dc5 in start_thread () from /lib64/libpthread.so.0
#15 0x00007feaecd23ced in clone () from /lib64/libc.so.6
and the code in question is
253 inline void write (const T &value_, bool incomplete_)
254 {
255 zmq_assert(value_.check() );
256
257 // Place the value to the queue, add new terminator element.
258 queue.back () = value_;
259 queue.push ();
260 zmq_assert( queue.back().check() );
261
262 // Move the "flush up to here" poiter.
263 if (!incomplete_)
264 f = &queue.back ();
265 }
I can also reproduce the crashes with a simpler test program that does only have the ZMQ_STREAM socket with its monitor and
does not open a ZMQ_SUB socket. I could not reproduce it with less threads, i.e. the default of 2 io threads.
The code for my test program is
#include <string>
#include <zmq.hpp>
static int
get_monitor_event(void *monitor,
int *value = nullptr,
char **address = nullptr)
{
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interruped, presumably
assert (zmq_msg_more (&msg));
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
uint16_t event = *(uint16_t *) (data);
if (value)
*value = *(uint32_t *) (data + 2);
// Second frame in message contains event address
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interruped, presumably
assert (!zmq_msg_more (&msg));
if (address) {
data = (uint8_t *) zmq_msg_data (&msg);
size_t size = zmq_msg_size (&msg);
*address = (char *) malloc (size + 1);
memcpy (*address, data, size);
*address [size] = 0;
}
return event;
}
int main(int argc, char* argv[])
{
std::string const address = argv[1];
std::string const port = argv[2];
std::string const stream_monitor_address("inproc://stream_monitor");
std::string const sub_monitor_address("inproc://sub_monitor");
zmq::context_t ctx(6);
zmq::socket_t stream(ctx, ZMQ_STREAM);
zmq::socket_t sub(ctx, ZMQ_SUB);
zmq_socket_monitor( static_cast<void*>(stream), stream_monitor_address.c_str(), ZMQ_EVENT_ALL);
zmq_socket_monitor( static_cast<void*>(sub), sub_monitor_address.c_str(), ZMQ_EVENT_ALL);
zmq::socket_t stream_monitor(ctx, ZMQ_PAIR);
zmq::socket_t sub_monitor(ctx, ZMQ_PAIR);
stream_monitor.connect( stream_monitor_address.c_str() );
sub_monitor.connect( sub_monitor_address.c_str() );
stream.bind("tcp://" + address + ":" + port);
//sub.connect("tcp://127.0.0.1:6000<http://127.0.0.1:6000>");
std::vector<zmq::message_t> clientIds;
while(1)
{
zmq::pollitem_t items[4] = {
{static_cast<void*>(stream_monitor), -1, ZMQ_POLLIN, 0},
{ static_cast<void*>(stream), -1, ZMQ_POLLIN, 0},
{static_cast<void*>(sub_monitor), -1, ZMQ_POLLIN, 0},
{ static_cast<void*>(sub), -1, ZMQ_POLLIN, 0},
};
auto rc = zmq::poll( items, 4 );
if (rc != -1)
{
if (0 != (items[1].revents & ZMQ_POLLIN) )
{
zmq::message_t id;
zmq::message_t m;
stream.recv(&id);
stream.recv(&m);
}
if (0 != (items[3].revents & ZMQ_POLLIN) )
{
}
if (0 != (items[0].revents & ZMQ_POLLIN))
{
std::cout << "STREAM MONITOR " << get_monitor_event( static_cast<void*>(stream_monitor), nullptr, nullptr) << std::endl;
}
if (0 != (items[2].revents & ZMQ_POLLIN) )
{
std::cout << "SUB MONITOR " << get_monitor_event( static_cast<void*>(sub_monitor), nullptr, nullptr) << std::endl;
}
}
}
return 0;
}
It uses the C++ interface from https://github.com/zeromq/cppzmq/blob/master/zmq.hpp.
Best wishes,
Jens Auer
--
Jens Auer |
CGI | Software-Engineer
CGI (Germany) GmbH & Co. KG
Rheinstraße 95 | 64295 Darmstadt | Germany
T: +49 6151 36860 154<tel:%2B49%206151%2036860%20154>
jens.auer at cgi.com<mailto:jens.auer at cgi.com>
Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter
de.cgi.com/pflichtangaben<http://de.cgi.com/pflichtangaben>.
CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to CGI Group Inc. and its affiliates may be contained in this message. If you are not a recipient indicated or intended in this message
(or responsible for delivery of this message to such person), or you think for any reason that this message may have been addressed to you in error, you may not use or copy or deliver this message to anyone else. In such case, you should destroy this message
and are asked to notify the sender by reply e-mail.
_______________________________________________
zeromq-dev mailing list
zeromq-dev at lists.zeromq.org<mailto:zeromq-dev at lists.zeromq.org>
http://lists.zeromq.org/mailman/listinfo/zeromq-dev
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20161011/0419bb3e/attachment.htm>
More information about the zeromq-dev
mailing list