[zeromq-dev] Assertion failure with socket monitor when clients disconnect

Doron Somech somdoron at gmail.com
Tue Oct 11 13:22:53 CEST 2016


The sync is actually used for thread safe socket, I don't think it is
a good idea to use it.

Actually we already have thread safe sockets, we can use Radio-dish.
Maybe creating a socket option to use radio-dish instead of pair, by
default the monitor will use pair, if the new option
(ZMQ_MONITOR_USE_RADIO?) is enabled socket will create radio socket
instead.



On Tue, Oct 11, 2016 at 2:03 PM, Auer, Jens <jens.auer at cgi.com> wrote:
> Hi,
>
>
>
> I’ve patched socket_base_t to protect the monitor socket with a mutex. There
> was an unused mutex “sync” already available in socket_base_t so I’ve used
> this J. If you want I can upload a pullrequest.
>
>
>
> Cheers,
>
>   Jens
>
>
>
> --
>
> Dr. Jens Auer | CGI | Software Engineer
>
> CGI Deutschland Ltd. & Co. KG
> Rheinstraße 95 | 64295 Darmstadt | Germany
>
> T: +49 6151 36860 154
>
> jens.auer at cgi.com
>
> Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter
> de.cgi.com/pflichtangaben.
>
>
>
> CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to
> CGI Group Inc. and its affiliates may be contained in this message. If you
> are not a recipient indicated or intended in this message (or responsible
> for delivery of this message to such person), or you think for any reason
> that this message may have been addressed to you in error, you may not use
> or copy or deliver this message to anyone else. In such case, you should
> destroy this message and are asked to notify the sender by reply e-mail.
>
>
>
> From: zeromq-dev [mailto:zeromq-dev-bounces at lists.zeromq.org] On Behalf Of
> Auer, Jens
> Sent: 11 October 2016 09:31
> To: ZeroMQ development list
> Subject: Re: [zeromq-dev] Assertion failure with socket monitor when clients
> disconnect
>
>
>
> Hi Doron,
>
> I'm not a big fun of monitoring and prefer others solution. I actually think
> of deprecating it,  the implementaion is buggy and actually violate the
> don't share the socket between threads,  which is probably what causing your
> issue.
>
> That’s quite a shock given that it is part of the stable API. I think in
> this case it should not only be deprecated but removed from API completely.
> On the other hand, something to monitor the connections is probably needed.
> Our clients are very concerned with TCP connections and would always insist
> on logging capabilities of TCP connection state changes. It would also be
> nice to get some statistics from the connections, e.g. number of received
> bytes or messages.
>
> Anyway,  you have two options,  I think you can manage without the
> monitoring,  I can try and help you find other solutions.  Another option is
> to try and not listen to all events, maybe this will avoid the sharing
> violation.
>
> I would like to replace the monitor with something else, but I am not sure
> how to do this given our requirements. Currently, we use the monitor to
>
> -       Log TCP connection events for connect, disconnect, accept, listen
> and reconnection retry events
>
> -       Limit the number of reconnection attempts
>
> Unfortunately this includes disconnect events which are exactly the events
> causing the crashes right now. The other events are probably rare enough
> that there is no problem.
>
> Is there another way to implement this without using the socket monitor? We
> use Router, Dealer, Sub, XPub and Stream sockets. I have full control over
> the protocol between the Router/Dealer, but I cannot change the protocol
> between Pub/Sub and external clients over Stream sockets, so I cannot add
> control messages here.
>
> I think an easy fix for my issues would be to add a mutex to protect the
> monitor socket in socket_base_t. I guess this was not done because it would
> block the thread and probably impact performance, but at least it will work
> correctly and not crash. It should be good enough for our use-case.
>
> An idea for a non-blocking solution would be to have an independent monitor
> class as there are listener and accepter classes which has an inproc SUB
> socket and the PAIR socket. Each ZeroMQ socket would then create a monitor
> when it is created, and each session object would have a PUB socket to
> broadcast events to the monitor. The monitor then forwards events received
> from individual clients/sessions on different IO threads to the PAIR socket
> where the application code can connect.
>
> Best wishes,
>
>   Jens
>
>
>
>
>
> On Oct 10, 2016 17:45, "Auer, Jens" <jens.auer at cgi.com> wrote:
>
> Hi,
>
> I have an issue with socket monitor on a ZMQ_STREAM socket. We have
> recognized that our sometimes software crashes with an internal ZeroMQ
> assertions when non-ZeroMQ-clients disconnect.
> I can reproduce this with a small test program that
> - starts a context with 6 threads
> - create a ZMQ_SUB socket with a monitor
> - create a ZMQ_PAIR socket and connect it to the monitor for the SUB socket
> - creates a ZMQ_STREAM socket and binds it
> - creates a socket monitor with a ZMQ_PAIR socket to get events
> - runs in loop zmq_polling on the two sockets
>
> When I start 500 instances of this program and connect 6 clients to each of
> them (I use netcat >/dev/null), and then kill the clients, I randomly get
> crashes with ZeroMQ assertion failures.
> Most of the time the failure is in the code processing the socket monitor
> event which I copied from the man page:
> Program terminated with signal 6, Aborted.
> #0  0x00007fb5644435f7 in raise () from /lib64/libc.so.6
> Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64
> apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64
> expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64
> libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64
> libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64
> log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64
> nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64
> nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64
> openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64
> (gdb) bt
> #0  0x00007fb5644435f7 in raise () from /lib64/libc.so.6
> #1  0x00007fb564444ce8 in abort () from /lib64/libc.so.6
> #2  0x00007fb56443c566 in __assert_fail_base () from /lib64/libc.so.6
> #3  0x00007fb56443c612 in __assert_fail () from /lib64/libc.so.6
> #4  0x00000000004b16f9 in get_monitor_event (monitor=0x2512b90, value=0x0,
> address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:47
> #5  0x00000000004b423e in main (argc=3, argv=0x7fffe10a2668) at
> /home/auerj/MDAF/src/test/pepSim.cpp:268
>
> Sometimes, I get an internal zmq assertion:
> Program terminated with signal 6, Aborted.
> #0  0x00007f3e75f185f7 in raise () from /lib64/libc.so.6
> Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64
> apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64
> expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64
> libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64
> libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64
> log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64
> nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64
> nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64
> openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64
> (gdb) bt
> #0  0x00007f3e75f185f7 in raise () from /lib64/libc.so.6
> #1  0x00007f3e75f19ce8 in abort () from /lib64/libc.so.6
> #2  0x00007f3e76cff769 in zmq::zmq_abort
> (errmsg_=errmsg_ at entry=0x7f3e76d328ed "check ()") at src/err.cpp:83
> #3  0x00007f3e76d06232 in zmq::msg_t::size (this=this at entry=0x7ffd200e9a50)
> at src/msg.cpp:254
> #4  0x00007f3e76d2e035 in zmq_msg_size (msg_=msg_ at entry=0x7ffd200e9a50) at
> src/zmq.cpp:627
> #5  0x00007f3e76d2e415 in s_recvmsg (s_=<optimized out>,
> msg_=0x7ffd200e9a50, flags_=<optimized out>) at src/zmq.cpp:459
> #6  0x00000000004b1654 in get_monitor_event (monitor=0x1d3fb90, value=0x0,
> address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:45
> #7  0x00000000004b423e in main (argc=3, argv=0x7ffd200e9d68) at
> /home/auerj/MDAF/src/test/pepSim.cpp:268
>
> I've added assertions to the internal function calls in ZeroMQ to better
> trace the error. It turns out that the message is already corrupted when
> it is read from the internal pipe:
> #0  0x00007fd3bc9285f7 in raise () from /lib64/libc.so.6
> Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64
> apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64
> expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64
> libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64
> libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64
> log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64
> nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64
> nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64
> openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64
> (gdb) bt
> #0  0x00007fd3bc9285f7 in raise () from /lib64/libc.so.6
> #1  0x00007fd3bc929ce8 in abort () from /lib64/libc.so.6
> #2  0x00007fd3bd70f769 in zmq::zmq_abort
> (errmsg_=errmsg_ at entry=0x7fd3bd742d08 "queue.front().check()") at
> src/err.cpp:83
> #3  0x00007fd3bd71eb1f in zmq::ypipe_t<zmq::msg_t, 256>::read
> (this=0x1f978f0, value_=0x7ffd760e8540) at src/ypipe.hpp:346
> #4  0x00007fd3bd71e259 in zmq::pipe_t::read (this=0x1f9f9a0,
> msg_=msg_ at entry=0x7ffd760e8540) at src/pipe.cpp:162
> #5  0x00007fd3bd71c63e in zmq::pair_t::xrecv (this=0x1f96b90,
> msg_=0x7ffd760e8540) at src/pair.cpp:114
> #6  0x00007fd3bd728b5b in zmq::socket_base_t::recv (this=0x1f96b90,
> msg_=msg_ at entry=0x7ffd760e8540, flags_=0) at src/socket_base.cpp:910
> #7  0x00007fd3bd73e809 in s_recvmsg (s_=<optimized out>,
> msg_=0x7ffd760e8540, flags_=<optimized out>) at src/zmq.cpp:456
> #8  0x00000000004b1654 in get_monitor_event (monitor=0x1f96b90, value=0x0,
> address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:45
> #9  0x00000000004b423e in main (argc=3, argv=0x7ffd760e8858) at
> /home/auerj/MDAF/src/test/pepSim.cpp:268
>
> The assertion is triggered by additional checks in the ypipe_t class which I
> specialized for zmq::msg_t to add the checks. The code is:
> 311         inline bool check_read ()
> 312         {
> 313             //  Was the value prefetched already? If so, return.
> 314             if (&queue.front () != r && r)
> 315                  return true;
> 316
> 317             //  There's no prefetched value, so let us prefetch more
> values.
> 318             //  Prefetching is to simply retrieve the
> 319             //  pointer from c in atomic fashion. If there are no
> 320             //  items to prefetch, set c to NULL (using
> compare-and-swap).
> 321             r = c.cas (&queue.front (), NULL);
> 322
> 323             //  If there are no elements prefetched, exit.
> 324             //  During pipe's lifetime r should never be NULL, however,
> 325             //  it can happen during pipe shutdown when items
> 326             //  are being deallocated.
> 327             if (&queue.front () == r || !r)
> 328                 return false;
> 329
> 330             zmq_assert( queue.front().check() );
> 331
> 332             //  There was at least one value prefetched.
> 333             return true;
> 334         }
> 335
> 336         //  Reads an item from the pipe. Returns false if there is no
> value.
> 337         //  available.
> 338         inline bool read (T *value_)
> 339         {
> 340             //  Try to prefetch a value.
> 341             if (!check_read ())
> 342                 return false;
> 343
> 344             //  There was at least one value prefetched.
> 345             //  Return it to the caller.
> 346             zmq_assert( queue.front().check() );
> 347             *value_ = queue.front ();
> 348             zmq_assert( value_->check() );
> 349             queue.pop ();
> 350             zmq_assert( value_->check() );
> 351
> 352             return true;
> 353         }
>
>
> I have another assertion sometimes when sending the monitor event, but here
> I am not sure if this is due to my modification
> adding the assertion. The stack trace is:
> #0  0x00007feaecc625f7 in raise () from /lib64/libc.so.6
> #1  0x00007feaecc63ce8 in abort () from /lib64/libc.so.6
> #2  0x00007feaeda49769 in zmq::zmq_abort
> (errmsg_=errmsg_ at entry=0x7feaeda7ccf3 "queue.back().check()") at
> src/err.cpp:83
> #3  0x00007feaeda58856 in zmq::ypipe_t<zmq::msg_t, 256>::write
> (this=0xaa2670, value_=..., incomplete_=<optimized out>) at
> src/ypipe.hpp:259
> #4  0x00007feaeda57645 in zmq::pipe_t::write (this=0xaac1c0,
> msg_=msg_ at entry=0x7feae7ce10f0) at src/pipe.cpp:215
> #5  0x00007feaeda56490 in zmq::pair_t::xsend (this=0xaa1ed0,
> msg_=0x7feae7ce10f0) at src/pair.cpp:90
> #6  0x00007feaeda62971 in zmq::socket_base_t::send
> (this=this at entry=0xaa1ed0, msg_=msg_ at entry=0x7feae7ce10f0,
> flags_=flags_ at entry=2) at src/socket_base.cpp:843
> #7  0x00007feaeda7846c in s_sendmsg (s_=0xaa1ed0, msg_=0x7feae7ce10f0,
> flags_=2) at src/zmq.cpp:346
> #8  0x00007feaeda62d93 in zmq::socket_base_t::monitor_event (this=0xaa16a0,
> event_=event_ at entry=512, value_=25, addr_="tcp://192.168.120.1:9494") at
> src/socket_base.cpp:1357
> #9  0x00007feaeda62f1d in zmq::socket_base_t::event_disconnected
> (this=<optimized out>, addr_="tcp://192.168.120.1:9494", fd_=<optimized
> out>) at src/socket_base.cpp:1344
> #10 0x00007feaeda6b45a in zmq::stream_engine_t::error
> (this=this at entry=0x7feae00022c0,
> reason=reason at entry=zmq::stream_engine_t::connection_error) at
> src/stream_engine.cpp:936
> #11 0x00007feaeda6c5db in zmq::stream_engine_t::in_event
> (this=0x7feae00022c0) at src/stream_engine.cpp:300
> #12 0x00007feaeda493ee in zmq::epoll_t::loop (this=0xaa0830) at
> src/epoll.cpp:176
> #13 0x00007feaeda73150 in thread_routine (arg_=0xaa08b0) at
> src/thread.cpp:96
> #14 0x00007feaed816dc5 in start_thread () from /lib64/libpthread.so.0
> #15 0x00007feaecd23ced in clone () from /lib64/libc.so.6
>
> and the code in question is
> 253         inline void write (const T &value_, bool incomplete_)
> 254         {
> 255             zmq_assert(value_.check() );
> 256
> 257             //  Place the value to the queue, add new terminator
> element.
> 258             queue.back () = value_;
> 259             queue.push ();
> 260             zmq_assert( queue.back().check() );
> 261
> 262             //  Move the "flush up to here" poiter.
> 263             if (!incomplete_)
> 264                 f = &queue.back ();
> 265         }
>
> I can also reproduce the crashes with a simpler test program that does only
> have the ZMQ_STREAM socket with its monitor and
> does not open a ZMQ_SUB socket. I could not reproduce it with less threads,
> i.e. the default of 2 io threads.
>
> The code for my test program is
> #include <string>
>
> #include <zmq.hpp>
>
> static int
> get_monitor_event(void *monitor,
>                   int *value = nullptr,
>                   char **address = nullptr)
> {
>     //  First frame in message contains event number and value
>     zmq_msg_t msg;
>     zmq_msg_init (&msg);
>     if (zmq_msg_recv (&msg, monitor, 0) == -1)
>         return -1;              //  Interruped, presumably
>     assert (zmq_msg_more (&msg));
>
>     uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
>     uint16_t event = *(uint16_t *) (data);
>     if (value)
>         *value = *(uint32_t *) (data + 2);
>
>     //  Second frame in message contains event address
>     zmq_msg_init (&msg);
>     if (zmq_msg_recv (&msg, monitor, 0) == -1)
>         return -1;              //  Interruped, presumably
>     assert (!zmq_msg_more (&msg));
>
>     if (address) {
>         data = (uint8_t *) zmq_msg_data (&msg);
>         size_t size = zmq_msg_size (&msg);
>         *address = (char *) malloc (size + 1);
>         memcpy (*address, data, size);
>         *address [size] = 0;
>     }
>     return event;
> }
>
> int main(int argc, char* argv[])
> {
>     std::string const address = argv[1];
>     std::string const port = argv[2];
>     std::string const stream_monitor_address("inproc://stream_monitor");
>     std::string const sub_monitor_address("inproc://sub_monitor");
>
>     zmq::context_t ctx(6);
>     zmq::socket_t stream(ctx, ZMQ_STREAM);
>     zmq::socket_t sub(ctx, ZMQ_SUB);
>
>     zmq_socket_monitor( static_cast<void*>(stream),
> stream_monitor_address.c_str(), ZMQ_EVENT_ALL);
>     zmq_socket_monitor( static_cast<void*>(sub),
> sub_monitor_address.c_str(), ZMQ_EVENT_ALL);
>
>     zmq::socket_t stream_monitor(ctx, ZMQ_PAIR);
>     zmq::socket_t sub_monitor(ctx, ZMQ_PAIR);
>
>     stream_monitor.connect( stream_monitor_address.c_str() );
>     sub_monitor.connect( sub_monitor_address.c_str() );
>
>     stream.bind("tcp://" + address + ":" + port);
>     //sub.connect("tcp://127.0.0.1:6000");
>
>     std::vector<zmq::message_t> clientIds;
>
>     while(1)
>     {
>         zmq::pollitem_t items[4] = {
>                 {static_cast<void*>(stream_monitor), -1, ZMQ_POLLIN, 0},
>                 { static_cast<void*>(stream), -1, ZMQ_POLLIN, 0},
>                 {static_cast<void*>(sub_monitor), -1, ZMQ_POLLIN, 0},
>                 { static_cast<void*>(sub), -1, ZMQ_POLLIN, 0},
>
>         };
>
>         auto rc = zmq::poll( items, 4 );
>
>         if (rc != -1)
>         {
>             if (0 != (items[1].revents & ZMQ_POLLIN) )
>             {
>                 zmq::message_t id;
>                 zmq::message_t m;
>
>                 stream.recv(&id);
>                 stream.recv(&m);
>             }
>             if (0 != (items[3].revents & ZMQ_POLLIN) )
>             {
>             }
>             if (0 != (items[0].revents & ZMQ_POLLIN))
>             {
>                 std::cout << "STREAM MONITOR " << get_monitor_event(
> static_cast<void*>(stream_monitor), nullptr, nullptr) << std::endl;
>             }
>             if (0 != (items[2].revents & ZMQ_POLLIN) )
>             {
>                 std::cout << "SUB MONITOR " << get_monitor_event(
> static_cast<void*>(sub_monitor), nullptr, nullptr) << std::endl;
>             }
>         }
>     }
>
>     return 0;
> }
>
> It uses the C++ interface from
> https://github.com/zeromq/cppzmq/blob/master/zmq.hpp.
>
> Best wishes,
>   Jens Auer
>
> --
>
> Jens Auer |
> CGI | Software-Engineer
>
> CGI (Germany) GmbH & Co. KG
>
> Rheinstraße 95 | 64295 Darmstadt | Germany
> T: +49 6151 36860 154
>
> jens.auer at cgi.com
>
> Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter
> de.cgi.com/pflichtangaben.
>
>
> CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to
> CGI Group Inc. and its affiliates may be contained in this message. If you
> are not a recipient indicated or intended in this message
>  (or responsible for delivery of this message to such person), or you think
> for any reason that this message may have been addressed to you in error,
> you may not use or copy or deliver this message to anyone else. In such
> case, you should destroy this message
>  and are asked to notify the sender by reply e-mail.
>
>
>
>
>
>
>
>
>
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev



More information about the zeromq-dev mailing list