[zeromq-dev] PUB/SUB assert(get_load () == 0) in zmq_ctx_destroy
Martin Townsend
martin.townsend at xsilon.com
Tue Jul 1 16:46:44 CEST 2014
This maybe related to
On 01/07/14 15:28, Martin Townsend wrote:
> Hi,
> I think I have got to the bottom of the problem. My main app is calling
> zmq_ctx_destroy which ends up calling the destructor for the io_thread
> associated with the IPC socket. This destructor calls the destructor
> for the poller instance. problem is that the io_thread instance has
> added the mailbox handle to the poller using add_fd but expects it to be
> removed using
> void zmq::io_thread_t::process_stop ()
> {
> poller->rm_fd (mailbox_handle);
> poller->stop ();
> }
> but in the context destructor it doesn't wait for the poller to stop it
> just calls
> for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
> io_threads [i]->stop ();
> // Wait till I/O threads actually terminate.
> for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
> delete io_threads [i];
> }
> If I'm correct in my thinking the poller is needed to process the stop
> command in zmq::io_thread_t::in_event??
> if so we now have a race condition between io_thread stopping and
> receiving the stop command before we delete it. So I added code to
> ensure the poller and io_thread are stopped in a coordinated fashion and
> I don't get the assert.
> eg in poller loop
> void zmq::epoll_t::loop ()
> {
> epoll_event ev_buf [max_io_events];
> printf("poller started\n");
> poller_running = true;
> while (!stopping) {
> ...
> }
> printf("poller stopped\n");
> poller_running = false;
> }
> in destructor we wait for poller worker to stop
> zmq::epoll_t::~epoll_t ()
> {
> int i;
> printf("poller dying\n");
> // Wait till the worker thread exits.
> worker.stop ();
> close (epoll_fd);
> for (retired_t::iterator it = retired.begin (); it != retired.end
> (); ++it)
> delete *it;
> i = 0;
> while(poller_running) {
> sleep(1);
> if(++i == 10)
> break;
> }
> printf("poller dead\n");
> }
> Then I created a poller stopped function that returned !poller_running
> which is used by io_thread_t stopped function so I can check this in the
> ctx destructor
> zmq::ctx_t::~ctx_t ()
> {
> // Check that there are no remaining sockets.
> zmq_assert (sockets.empty ());
> printf("Stopping iothreads\n");
> // Ask I/O threads to terminate. If stop signal wasn't sent to I/O
> // thread subsequent invocation of destructor would hang-up.
> for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
> io_threads [i]->stop ();
> printf("Checking iothreads\n");
> // Wait till I/O threads actually terminate.
> for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
> while (!io_threads [i]->stopped ()) {
> printf("iothread not stopped wait and try again\n");
> sleep (1);
> }
> delete io_threads [i];
> }
> ...
> I can create a patch but I would like to get people's thoughts first.
> Regards,
> Martin.
> On 01/07/14 11:45, Martin Townsend wrote:
>> Apologies I forgot to say that I'm using v4.0.4 with epoll.
>> On 01/07/14 11:44, Martin Townsend wrote:
>>> Hi,
>>> I'm getting an assert when trying to gracefully exit
>>> At the bottom of the mail is my pthread code that handles subscribing in
>>> a pub/sub model and reads power values. On SIGINT and SIGTERM it sets
>>> quit to 1 and the thread exits but I get an exception on the call to
>>> zmq_ctx_destroy (zcontext);
>>> Assertion failed: get_load () == 0 (poller_base.cpp:31)
>>> Thread [2] (Suspended: Signal 'SIGABRT' received. Description: Aborted.)
>>> 15 __GI_raise() raise.c:56 0x482c9ff4
>>> 14 __GI_abort() abort.c:89 0x482ce4d0
>>> 13 zmq::zmq_abort() err.cpp:74 0x4803c558
>>> 12 zmq::poller_base_t::~poller_base_t() poller_base.cpp:31 0x480519d4
>>> 11 zmq::epoll_t::~epoll_t() epoll.cpp:42 0x4803b8f8
>>> 10 zmq::epoll_t::~epoll_t() epoll.cpp:50 0x4803b930
>>> 9 zmq::io_thread_t::~io_thread_t() io_thread.cpp:39 0x4803d2f0
>>> 8 zmq::io_thread_t::~io_thread_t() io_thread.cpp:40 0x4803d3ac
>>> 7 zmq::ctx_t::~ctx_t() ctx.cpp:82 0x48033a9c
>>> 6 zmq::ctx_t::terminate() ctx.cpp:153 0x480367d4
>>> 5 zmq_ctx_term() zmq.cpp:171 0x4806f13c
>>> 4 zmq_ctx_destroy() zmq.cpp:241 0x4806f3e0
>>> 3 zsub_thread() xcoapd.c:470 0x10002b9c
>>> 2 start_thread() pthread_create.c:314 0x48095180
>>> 1 clone() clone.S:65 0x48383e88
>>> Am I doing anything incorrect below? If I put a sleep(4) before
>>> destroying the context I don't get the assert so I'm suspecting a race
>>> condition somewhere. I'm running on a 100MHz Microblaze.
>>> /* ZeroMq Subscriber task */
>>> static void * zsub_thread(void * arg) {
>>> int rc;
>>> zcontext = zmq_ctx_new ();
>>> zmq_ctx_set (zcontext, ZMQ_MAX_SOCKETS, 256);
>>> int max_sockets = zmq_ctx_get (zcontext, ZMQ_MAX_SOCKETS); assert
>>> (max_sockets == 256);
>>> subscriber = zmq_socket (zcontext, ZMQ_SUB);
>>> printf("Subscribing\n");
>>> rc = zmq_connect (subscriber, "ipc://xpwrd.ipc");
>>> assert (rc == 0);
>>> // Subscribe to zipcode, default is NYC, 10001
>>> char *filter = "Pwr";
>>> rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen
>>> (filter));
>>> while(!quit) {
>>> char buf[32];
>>> char *s;
>>> int unknown_err_count;
>>> zmq_pollitem_t items [] = {
>>> { subscriber, 0, ZMQ_POLLIN, 0 }
>>> };
>>> rc = zmq_poll (items, 1, 1000);
>>> if(rc == 1) {
>>> /* As we only have one poll item we can safely call recv on
>>> this */
>>> unknown_err_count = 0;
>>> s = s_recv (subscriber, buf, sizeof(buf));
>>> if(s)
>>> sscanf(s, "Pwr:%04dW T:%dC", &pwr, &temp);
>>> printf("Pwr:%04dW T:%dC\n", pwr, temp);
>>> } else if(rc == 0) {
>>> unknown_err_count = 0;
>>> printf("Poller Timed out\n");
>>> } else {
>>> /* error */
>>> if(errno == ETERM) {
>>> printf("Poller has detected that socket was terminated\n");
>>> break;
>>> } else if(errno == EFAULT) {
>>> printf("zmq_poll has invalid parameters!!!!\n");
>>> break;
>>> } else if(errno != EINTR) {
>>> /* Ignore EINTR as we will just poll again on next
>>> iteration of
>>> * loop, all other errors are undocumented in zmq so
>>> print a
>>> * message and set a flag and if it occurs 3 times exit
>>> loop */
>>> printf("Poller returned unknown error %d", errno);
>>> printf("%s\n", strerror(errno));
>>> unknown_err_count++;
>>> if(unknown_err_count == 3) {
>>> break;
>>> }
>>> }
>>> }
>>> }
>>> printf("Subscriber thread closed\n");
>>> zmq_close (subscriber);
>>> zmq_ctx_destroy (zcontext);
>>> return (void *)0;
>>> }
>>> Best Regards,
>>> Martin.
>>> _______________________________________________
>>> zeromq-dev mailing list
>>> zeromq-dev at lists.zeromq.org
>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
More information about the zeromq-dev
mailing list