[zeromq-dev] zmq::context_t fails to exit, stuck in infinite loop waiting dezombify()

Proemial proemial at gmail.com
Sat Dec 11 01:10:32 CET 2010


ZeroMQ 2.1.0 using c++ binding
WinXP, MSVC 2008

(code follows, summary of problem first)

I'm adapting part of an existing project, to see how it performs with
zeromq as its networking layer.  The test is using pub/sub over tcp.

In my initial test case, I queue up a random assortment of messages
and publish them, without having connected any subscribers.  This
continues for a short period, before I signal the publishing thread to
shutdown.  both zmq::context_t and zmq::socket_t are in the local
scope, and are auto destroyed when scope exits.

However, the thread never exits.  Pausing execution, I find it stuck
in socket_base.cpp:
>	libzmq.dll!zmq::socket_base_t::process_commands(bool block_=false, bool throttle_=false)  Line 645	C++
 	libzmq.dll!zmq::socket_base_t::dezombify()  Line 605	C++
 	libzmq.dll!zmq::ctx_t::dezombify()  Line 318 + 0xf bytes	C++
 	libzmq.dll!zmq::ctx_t::terminate()  Line 150	C++
 	libzmq.dll!zmq_term(void * ctx_=0x003d9a40)  Line 255 + 0x8 bytes	C++
 	TradingLink.exe!zmq::context_t::~context_t()  Line 183 + 0xe bytes	C++
 	TradingLink.exe!CZeroMQNode::PUBLISHThread(void *
pThreadCTX=0x003d99f8)  Line 299 + 0x30 bytes	C++

Th result of line 645 (rc = mailbox.recv(&cmd, false); ) is EAGAIN,
causing the following while loop to break and control to pass back up
the chain. Ultimately control appears to never exit the loop in
zmq::ctx_t::dezombify()

What follows is the code snippet.  I've wrapped the problem area in a
Try/catch to force scoping.  Execution never reaches "   TRACE("Test
Exit: Success\r\n");"

Can someone take a look and tell me what I've messed up?

Thank you!
Martin

CODE----

   try {
      zmq::context_t oCtx(1);
      zmq::socket_t o0MQOrderPUB(oCtx, ZMQ_PUB);
      o0MQOrderPUB.connect("tcp://192.168.1.101:15000");

      int nVal = errno;
      if (nVal != 0)
      {
         /* todo error handling */
         DebugBreak();
      }

      bool bContinue = true;
      while (bContinue)
      {
         bContinue = !poContext->poParent->m_bShutdown; //tbb atomic operation

         if (bContinue)
         {
            CZeroMQNode::TPublishRecord * poRecord=NULL;

            while (
poContext->poParent->m_listPublishQueue.try_pop(poRecord) ) // tbb
concurrent queue, anything available?
            {
               zmq::message_t poMsg( sizeof(tradelink::TMessageHeader)
+ poRecord->nSize);

               char * poBuff = (char*)poMsg.data();
               tradelink::TMessageHeader * poHeader =
(tradelink::TMessageHeader*)poBuff;

               poHeader->nSentinel =
tradelink::TMessageHeader::nMessageSentinelValue;
               poHeader->nSourceNode =
poContext->poParent->m_oConfig.nApplicationID;
               poHeader->nPayLoadLength = poRecord->nSize;
               poHeader->eMsgType = poRecord->eMsgType;
               poHeader->nTimeStamp = timeGetTime();

               memcpy(&poBuff[ sizeof(tradelink::TMessageHeader) ],
poRecord->poBuffer, poRecord->nSize);

               if (o0MQOrderPUB.send(poMsg))
               {
                  int rc = errno;
                  DebugBreak();
               }

               delete poRecord;
            }
            ::Sleep(15); // yield
         }
      }

      TRACE("ZMQ Publish Thread exiting.\r\n");
   } catch (CException * ex)
   {
      TRACE("Exception");
   }
   TRACE("Test Exit: Success\r\n");



More information about the zeromq-dev mailing list