[zeromq-dev] PUB/SUB assert(get_load () == 0) in zmq_ctx_destroy
Pieter Hintjens
ph at imatix.com
Tue Jul 1 18:17:24 CEST 2014
Could be related, yes. Since you can reproduce it, why not make the
patch, and send it to master.
On Tue, Jul 1, 2014 at 4:46 PM, Martin Townsend
<martin.townsend at xsilon.com> wrote:
> This maybe related to
> https://github.com/zeromq/libzmq/issues/795
>
> Regards,
> Martin.
>
>
> On 01/07/14 15:28, Martin Townsend wrote:
>> Hi,
>>
>> I think I have got to the bottom of the problem. My main app is calling
>> zmq_ctx_destroy which ends up calling the destructor for the io_thread
>> associated with the IPC socket. This destructor calls the destructor
>> for the poller instance. problem is that the io_thread instance has
>> added the mailbox handle to the poller using add_fd but expects it to be
>> removed using
>> void zmq::io_thread_t::process_stop ()
>> {
>> poller->rm_fd (mailbox_handle);
>> poller->stop ();
>> }
>>
>> but in the context destructor it doesn't wait for the poller to stop it
>> just calls
>>
>> for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
>> io_threads [i]->stop ();
>>
>> // Wait till I/O threads actually terminate.
>> for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
>> delete io_threads [i];
>> }
>>
>> If I'm correct in my thinking the poller is needed to process the stop
>> command in zmq::io_thread_t::in_event??
>>
>> if so we now have a race condition between io_thread stopping and
>> receiving the stop command before we delete it. So I added code to
>> ensure the poller and io_thread are stopped in a coordinated fashion and
>> I don't get the assert.
>>
>> eg in poller loop
>>
>> void zmq::epoll_t::loop ()
>> {
>> epoll_event ev_buf [max_io_events];
>>
>> printf("poller started\n");
>> poller_running = true;
>> while (!stopping) {
>> ...
>> }
>> printf("poller stopped\n");
>> poller_running = false;
>> }
>>
>> in destructor we wait for poller worker to stop
>> zmq::epoll_t::~epoll_t ()
>> {
>> int i;
>>
>> printf("poller dying\n");
>>
>> // Wait till the worker thread exits.
>> worker.stop ();
>>
>> close (epoll_fd);
>> for (retired_t::iterator it = retired.begin (); it != retired.end
>> (); ++it)
>> delete *it;
>>
>> i = 0;
>> while(poller_running) {
>> sleep(1);
>> if(++i == 10)
>> break;
>> }
>> printf("poller dead\n");
>> }
>>
>>
>> Then I created a poller stopped function that returned !poller_running
>> which is used by io_thread_t stopped function so I can check this in the
>> ctx destructor
>>
>> zmq::ctx_t::~ctx_t ()
>> {
>> // Check that there are no remaining sockets.
>> zmq_assert (sockets.empty ());
>>
>> printf("Stopping iothreads\n");
>> // Ask I/O threads to terminate. If stop signal wasn't sent to I/O
>> // thread subsequent invocation of destructor would hang-up.
>> for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
>> io_threads [i]->stop ();
>>
>> printf("Checking iothreads\n");
>> // Wait till I/O threads actually terminate.
>> for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
>> while (!io_threads [i]->stopped ()) {
>> printf("iothread not stopped wait and try again\n");
>> sleep (1);
>> }
>> delete io_threads [i];
>> }
>>
>> ...
>>
>>
>> I can create a patch but I would like to get people's thoughts first.
>>
>> Regards,
>> Martin.
>>
>>
>>
>> On 01/07/14 11:45, Martin Townsend wrote:
>>> Apologies I forgot to say that I'm using v4.0.4 with epoll.
>>>
>>> On 01/07/14 11:44, Martin Townsend wrote:
>>>> Hi,
>>>>
>>>> I'm getting an assert when trying to gracefully exit
>>>>
>>>> At the bottom of the mail is my pthread code that handles subscribing in
>>>> a pub/sub model and reads power values. On SIGINT and SIGTERM it sets
>>>> quit to 1 and the thread exits but I get an exception on the call to
>>>> zmq_ctx_destroy (zcontext);
>>>>
>>>> Assertion failed: get_load () == 0 (poller_base.cpp:31)
>>>> Thread [2] (Suspended: Signal 'SIGABRT' received. Description: Aborted.)
>>>> 15 __GI_raise() raise.c:56 0x482c9ff4
>>>> 14 __GI_abort() abort.c:89 0x482ce4d0
>>>> 13 zmq::zmq_abort() err.cpp:74 0x4803c558
>>>> 12 zmq::poller_base_t::~poller_base_t() poller_base.cpp:31 0x480519d4
>>>> 11 zmq::epoll_t::~epoll_t() epoll.cpp:42 0x4803b8f8
>>>> 10 zmq::epoll_t::~epoll_t() epoll.cpp:50 0x4803b930
>>>> 9 zmq::io_thread_t::~io_thread_t() io_thread.cpp:39 0x4803d2f0
>>>> 8 zmq::io_thread_t::~io_thread_t() io_thread.cpp:40 0x4803d3ac
>>>> 7 zmq::ctx_t::~ctx_t() ctx.cpp:82 0x48033a9c
>>>> 6 zmq::ctx_t::terminate() ctx.cpp:153 0x480367d4
>>>> 5 zmq_ctx_term() zmq.cpp:171 0x4806f13c
>>>> 4 zmq_ctx_destroy() zmq.cpp:241 0x4806f3e0
>>>> 3 zsub_thread() xcoapd.c:470 0x10002b9c
>>>> 2 start_thread() pthread_create.c:314 0x48095180
>>>> 1 clone() clone.S:65 0x48383e88
>>>>
>>>> Am I doing anything incorrect below? If I put a sleep(4) before
>>>> destroying the context I don't get the assert so I'm suspecting a race
>>>> condition somewhere. I'm running on a 100MHz Microblaze.
>>>>
>>>> /* ZeroMq Subscriber task */
>>>> static void * zsub_thread(void * arg) {
>>>> int rc;
>>>>
>>>> zcontext = zmq_ctx_new ();
>>>> zmq_ctx_set (zcontext, ZMQ_MAX_SOCKETS, 256);
>>>> int max_sockets = zmq_ctx_get (zcontext, ZMQ_MAX_SOCKETS); assert
>>>> (max_sockets == 256);
>>>>
>>>> subscriber = zmq_socket (zcontext, ZMQ_SUB);
>>>> printf("Subscribing\n");
>>>> rc = zmq_connect (subscriber, "ipc://xpwrd.ipc");
>>>> assert (rc == 0);
>>>> // Subscribe to zipcode, default is NYC, 10001
>>>> char *filter = "Pwr";
>>>> rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen
>>>> (filter));
>>>>
>>>>
>>>> while(!quit) {
>>>> char buf[32];
>>>> char *s;
>>>> int unknown_err_count;
>>>> zmq_pollitem_t items [] = {
>>>> { subscriber, 0, ZMQ_POLLIN, 0 }
>>>> };
>>>>
>>>> rc = zmq_poll (items, 1, 1000);
>>>> if(rc == 1) {
>>>> /* As we only have one poll item we can safely call recv on
>>>> this */
>>>> unknown_err_count = 0;
>>>> s = s_recv (subscriber, buf, sizeof(buf));
>>>> if(s)
>>>> sscanf(s, "Pwr:%04dW T:%dC", &pwr, &temp);
>>>>
>>>> printf("Pwr:%04dW T:%dC\n", pwr, temp);
>>>> } else if(rc == 0) {
>>>> unknown_err_count = 0;
>>>> printf("Poller Timed out\n");
>>>> } else {
>>>> /* error */
>>>> if(errno == ETERM) {
>>>> printf("Poller has detected that socket was terminated\n");
>>>> break;
>>>> } else if(errno == EFAULT) {
>>>> printf("zmq_poll has invalid parameters!!!!\n");
>>>> break;
>>>> } else if(errno != EINTR) {
>>>> /* Ignore EINTR as we will just poll again on next
>>>> iteration of
>>>> * loop, all other errors are undocumented in zmq so
>>>> print a
>>>> * message and set a flag and if it occurs 3 times exit
>>>> loop */
>>>> printf("Poller returned unknown error %d", errno);
>>>> printf("%s\n", strerror(errno));
>>>> unknown_err_count++;
>>>> if(unknown_err_count == 3) {
>>>> break;
>>>> }
>>>> }
>>>> }
>>>> }
>>>>
>>>> printf("Subscriber thread closed\n");
>>>> zmq_close (subscriber);
>>>> zmq_ctx_destroy (zcontext);
>>>>
>>>> return (void *)0;
>>>> }
>>>>
>>>>
>>>> Best Regards,
>>>> Martin.
>>>>
>>>> _______________________________________________
>>>> zeromq-dev mailing list
>>>> zeromq-dev at lists.zeromq.org
>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>> _______________________________________________
>>> zeromq-dev mailing list
>>> zeromq-dev at lists.zeromq.org
>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
More information about the zeromq-dev
mailing list