[zeromq-dev] FW: ZeroMQ Error when client starts first, server second.

Christian Kamm kamm at incasoftware.de
Sat Jul 27 18:16:06 CEST 2013


On 07/27/2013 11:22 AM, Riskybiz wrote:
> It is possible that a different thread could be trying to add to the
> vector while the response is being built but I had hoped that using the
> concurrent vector would eliminate the problem?  

No. Your code thinks it's in the last iteration and sends without
SNDMORE - but then the vector grows and it tries to add another frame
afterwards and fails.

I don't know the concurrency library you are using, but maybe reading
the end() iterator just once would help.

Christian

The server loop is
> running on a different thread to the rest of the application.  The ‘for’
> loop building the message uses an iterator to detect the ‘end’ of the
> DDV vector so that the just as much data as is presently available is
> sent.  The vector only grows with push_back(), it’s never erased or
> changed; the only operations on the vector are push_back() and iterator
> dereferencing to fetch the underlying string.  Also the program
> functionality is all normal when server starts first, so am not sure
> that the problem lies in composing the message.
> 
>  
> 
> I forgot to mention in my post that the REP socket becomes unresponsive
> to further requests after the ‘Operation cannot be accomplished in
> current state’ error it can be seen how the client receives all of the
> message up to part 4462.  The client then requests element 4463 from the
> vector but never gets a response.
> 
>  
> 
> Riskybiz.
> 
>  
> 
>  
> 
> _Further sample output:_
> 
> _ _
> 
> [5776] SD_Client: Message: 4458 4422 41481.266667 41481.291667 98.84000
> 98.61900
> 
> [5776] SD_Client: Message: 4459 4401 41481.260417 41481.291667 98.84000
> 98.61900
> 
> [5776] SD_Client: Message: 4460 4484 41481.286111 41481.292361 98.84000
> 98.71100
> 
> [5776] SD_Client: Message: 4461 4487 41481.286806 41481.293056 98.84000
> 98.70200
> 
> [5776] SD_Client: Message: 4462 4466 41481.280556 41481.293056 98.84000
> 98.70200
> 
> [5776] SD_Client: Sending Data Request For Element No:4463
> 
> [5776] SD_Client: Waiting For Reply....
> 
> [5776] SD_Client: Waiting For Reply....
> 
> [5776] SD_Client: Waiting For Reply....
> 
> [5776] SD_Client: Waiting For Reply....
> 
>  
> 
>  
> 
> *From:*Christian Kamm [mailto:kamm at incasoftware.de]
> *Sent:* 27 July 2013 06:31
> *To:* Riskybiz
> *Cc:* zeromq-dev at lists.zeromq.org <mailto:zeromq-dev at lists.zeromq.org>
> *Subject:* Re: [zeromq-dev] ZeroMQ Error when client starts first,
> server second.
> 
>  
> 
>> [5100] TestDataAccess: ZMQServer: SendMore: 4461 4487 41481.286806
> 41481.293056 98.84000 98.70200
>> [5100] TestDataAccess: ZMQServer: FinalSend: 4462 4466 41481.280556
> 41481.293056 98.84000 98.70200
>> [5100] TestDataAccess: ZMQServer: Error In SendMore: Operation cannot
> be accomplished in current state
>> [5776] SD_Client: Message: 0 1 41479.919444 41479.926389 100.29400
> 
> That looks to me like you are trying to send a message part after the
> final part has been sent. That'd be consistent with the client actually
> getting the finished message and the error message you're getting.
> 
> I didn't see any synchronization in your code. Maybe another thread
> grows the DDV vector while the repose is being built?
> 
> Christian
> 
> Riskybiz <riskybizLive at live.com <mailto:riskybizLive at live.com>> wrote:
> 
> I have a REQ & REP server and client configured so that the client sends
> a request for data beginning from a certain vector index.  The server
> then sends messages (multipart if necessary) where each message part
> represents the contents of one vector element.  On the first request it
> is common for a multipart message of several thousand parts to be sent
> by REP.  Subsequent requests will result in either a ‘nothing further’
> message, a single message or a multipart message of just a few parts. 
> The bulk of the data is sent on the first request.
> 
>  
> 
> I am experiencing ZMQ behaviour I cannot explain.  Please note this is
> my first ZMQ project!
> 
>  
> 
> When the server is launched first with the client launched later; then
> all is well.  The large multipart message goes through and is received
> properly, subsequent smaller transmissions can then be seen as expected.
> I am careful that only one request is sent from a client at a time, the
> client will wait for a reply so that the lock step REQ – REP routine is
> maintained.
> 
>  
> 
> HOWEVER if the CLIENT is launched FIRST and the SERVER a few (maybe 10)
> seconds LATER then an error becomes apparent. 
> 
>  
> 
> The first request from REQ results in a large multipart message being
> sent from the server REP.  The sample output below shows the ‘FinalSend’
> from the server when part 4462 of a large multipart message is sent. 
> Then the error ‘Operation cannot be accomplished in current state’ is
> shown.  This error causes an inelegant exit by my application.
> 
>  
> 
> After the server error the client can be seen processing the large
> multipart message as it is received beginning with part 0, 1, 2, 3 etc….
> 
>  
> 
> Now; I am not sure how to correct this problem.  Looking at the patterns
> of behaviour for REQ & REP sockets does not give a direct answer other
> than a suspicion that a high-water-mark issue could be present?  Or
> perhaps there is some socket option that needs resetting or has got stuck?
> 
>  
> 
> Any tips or help much appreciated.  It’s really important to my
> application that the server & client can launch , bind & connect in any
> order.
> 
>  
> 
> With thanks,
> 
>  
> 
> Riskybiz.
> 
>  
> 
> _Sample Output:_
> 
>  
> 
> [5100] TestDataAccess: ZMQServer: SendMore: 4453 4457 41481.277778
> 41481.290278 98.84000 98.75000
> 
> [5100] TestDataAccess: ZMQServer: SendMore: 4454 4479 41481.284722
> 41481.290972 98.84000 98.76900
> 
> [5100] TestDataAccess: ZMQServer: SendMore: 4455 4482 41481.285417
> 41481.291667 98.84000 98.74400
> 
> [5100] TestDataAccess: ZMQServer: SendMore: 4456 4462 41481.279167
> 41481.291667 98.84000 98.74400
> 
> [5100] TestDataAccess: ZMQServer: SendMore: 4457 4442 41481.272917
> 41481.291667 98.84000 98.66400
> 
> [5100] TestDataAccess: ZMQServer: SendMore: 4458 4422 41481.266667
> 41481.291667 98.84000 98.61900
> 
> [5100] TestDataAccess: ZMQServer: SendMore: 4459 4401 41481.260417
> 41481.291667 98.84000 98.61900
> 
> [5100] TestDataAccess: ZMQServer: SendMore: 4460 4484 41481.286111
> 41481.292361 98.84000 98.71100
> 
> [5100] TestDataAccess: ZMQServer: SendMore: 4461 4487 41481.286806
> 41481.293056 98.84000 98.70200
> 
> [5100] TestDataAccess: ZMQServer: FinalSend: 4462 4466 41481.280556
> 41481.293056 98.84000 98.70200
> 
> [5100] TestDataAccess: ZMQServer: Error In SendMore: Operation cannot be
> accomplished in current state
> 
> [5776] SD_Client: Message: 0 1 41479.919444 41479.926389 100.29400 100.23900
> 
> [5776] SD_Client: Message: 1 4 41479.920139 41479.927083 100.29400 100.21900
> 
> [5776] SD_Client: Message: 2 6 41479.920833 41479.927778 100.29400 100.18000
> 
> [5776] SD_Client: Message: 3 9 41479.921528 41479.928472 100.29400 100.18000
> 
> [5776] SD_Client: Message: 4 10 41479.922222 41479.929167 100.29400
> 100.18000
> 
> [5776] SD_Client: Message: 5 13 41479.922917 41479.929861 100.29400
> 100.18000
> 
> [5776] SD_Client: Message: 6 15 41479.923611 41479.930556 100.29400
> 100.18000
> 
> [5776] SD_Client: Message: 7 18 41479.924306 41479.931250 100.28700
> 100.18000
> 
> [5776] SD_Client: Message: 8 22 41479.925694 41479.931944 100.28700
> 100.18000
> 
> [5776] SD_Client: Message: 9 2 41479.919444 41479.931944 100.29400 100.18000
> 
> [5776] SD_Client: Message: 10 23 41479.926389 41479.932639 100.24900
> 100.18000
> 
>  
> 
>  
> 
> _Server Code:_
> 
>  
> 
> int DataServer()
> 
> {
> 
> //Prepare the context
> 
> void *context = zmq_ctx_new();
> 
> if(context == NULL)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Creating
> Context: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
> }
> 
>  
> 
>  
> 
> //Set
> 
> bool runLoop(true);
> 
>  
> 
> //Prepare the socket
> 
> void *server = zmq_socket(context, ZMQ_REP);        
> 
> if(server == NULL)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Creating REP
> socket: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
> }
> 
>  
> 
> //  Configure socket to not wait at close time
> 
>     int linger = 0;
> 
>     int rc = zmq_setsockopt(server, ZMQ_LINGER, &linger, sizeof (linger));
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Setting Socket Options
> Failed: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;
> 
> }
> 
>    
> 
> rc = zmq_bind(server, "tcp://*:5555");
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Bind To REP Failed: "
> + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;
> 
> }
> 
>  
> 
>  
> 
> OutputDebugStringA("TestDataAccess: ZMQServer: Data Server Ready.... ");
> 
> //Listen for request
> 
> while(runLoop)
> 
> {
> 
> OutputDebugStringA("TestDataAccess: ZMQServer: Waiting For A Request....");
> 
> //Wait for a request
> 
> zmq_msg_t request;
> 
> int rc = zmq_msg_init(&request);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
> Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> }
> 
>  
> 
> rc = zmq_msg_recv(&request, server, 0);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Receiving
> Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
>  
> 
> //Release Message
> 
> rc = zmq_msg_close(&request);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing
> Received Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
> }
> 
> return 1;//error
> 
> }
> 
> OutputDebugStringA("TestDataAccess: ZMQServer: Processing Request....");
> 
>  
> 
> //Cast the request data to a string
> 
> std::string reqStr(static_cast<char*>(zmq_msg_data(&request)),
> zmq_msg_size(&request));
> 
>  
> 
> unsigned int startFromElement;
> 
>  
> 
> if(reqStr == "kill")
> 
> {
> 
> //Send reply
> 
> std::string outMsg = "OK killing";
> 
> zmq_msg_t messageOut;
> 
>  
> 
> int rc = zmq_msg_init_size(&messageOut, outMsg.size());
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
> Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> }
> 
>  
> 
> memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());
> 
>  
> 
> rc = zmq_msg_send(&messageOut, server, 0);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Sending Shutdown
> Acknowledgement: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
> //Release Message
> 
> rc = zmq_msg_close(&messageOut);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
> Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
> }
> 
> }
> 
>  
> 
> //Release Message
> 
> rc = zmq_msg_close(&request);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing
> Received Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
> }
> 
> OutputDebugStringA("TestDataAccess: ZMQServer: Shutting Down Server....");
> 
> runLoop = false;
> 
> break;
> 
> }
> 
> else
> 
> {
> 
> //Convert the request string to an integer                
> 
> if ( ! (std::istringstream(reqStr) >> startFromElement) )
> startFromElement = 0;//checks that the string actually contains numeric
> characters; returns 0 if not;
> 
> }
> 
>  
> 
>  
> 
> //Prepare the reply
> 
> if (startFromElement >= DDV->size()  || DDV->size() == 0)
> 
> {
> 
> //Client already has all the data
> 
> std::string outMsg = reqStr + " Nothing Further";
> 
>  zmq_msg_t messageOut;
> 
>  
> 
> int rc = zmq_msg_init_size(&messageOut, outMsg.size());
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
> Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> }
> 
>                 
> 
>  memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());
> 
>  rc = zmq_msg_send(&messageOut, server, 0);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Sending 'Nothing
> Further': " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
> }
> 
> else if(rc >= 0)
> 
> {
> 
> std::string outStr = "TestDataAccess: ZMQServer: Message: " + reqStr + "
> Nothing Further";
> 
> const char* msgOut = outStr.c_str();
> 
> OutputDebugStringA(msgOut);
> 
> }
> 
>  
> 
> //Release Message
> 
> rc = zmq_msg_close(&messageOut);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
> Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
>             return 1;//error
> 
> }
> 
>  
> 
>  }
> 
> else
> 
> {
> 
>  
> 
> //There is data to send
> 
> for(auto iter = DDV->begin() + startFromElement; iter != DDV->end(); iter++)
> 
> {
> 
> if(iter + 1 != DDV->end())
> 
> {
> 
> std::string elemStr = static_cast<std::ostringstream*>(
> &(std::ostringstream() <<  distance(DDV->begin(),iter)))->str();
> 
> std::string outMsg = elemStr + " " + *iter;
> 
>  
> 
> //SendMore
> 
> zmq_msg_t messageOut;
> 
>  
> 
> int rc = zmq_msg_init_size(&messageOut, outMsg.size());
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
> Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> }                                        
> 
>  
> 
> memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());
> 
>  
> 
> rc = zmq_msg_send(&messageOut, server, ZMQ_SNDMORE);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error In SendMore: " +
> errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
>  
> 
> //Release Message
> 
> rc = zmq_msg_close(&messageOut);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
> Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
> }
> 
> }
> 
> else if(rc >= 0)
> 
> {
> 
> std::string outStr = "TestDataAccess: ZMQServer: SendMore: " + elemStr +
> " " + *iter;
> 
> const char* msgOut = outStr.c_str();
> 
> OutputDebugStringA(msgOut);
> 
> }
> 
>  
> 
> //Release Message
> 
> rc = zmq_msg_close(&messageOut);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
> Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
> }
> 
>  
> 
>  
> 
> }
> 
> else if(iter + 1 == DDV->end())
> 
> {
> 
> std::string elemStr = static_cast<std::ostringstream*>(
> &(std::ostringstream() <<  distance(DDV->begin(),iter)))->str();
> 
> std::string outMsg = elemStr + " " + *iter;
> 
> //Final Send
> 
> zmq_msg_t messageOut;
> 
>  
> 
> int rc = zmq_msg_init_size(&messageOut, outMsg.size());
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
> Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> }
> 
>  
> 
> memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());
> 
>  
> 
> rc = zmq_msg_send(&messageOut, server, 0);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error In FinalSend: "
> + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
>  
> 
> //Release Message
> 
> rc = zmq_msg_close(&messageOut);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
> Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
> }
> 
> return 1;//error
> 
> }
> 
> else if(rc >= 0)
> 
> {
> 
> std::string outStr = "TestDataAccess: ZMQServer: FinalSend: " + elemStr
> + " " + *iter;
> 
> const char* msgOut = outStr.c_str();
> 
> OutputDebugStringA(msgOut);
> 
> }
> 
>  
> 
> //Release Message
> 
> rc = zmq_msg_close(&messageOut);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
> Message: " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
> }
> 
>  
> 
> }
> 
> }//for
> 
> }//else
> 
> }//while
> 
>  
> 
>   
> 
> //Close Server Socket
> 
> rc = zmq_close(server);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Closing Server
> Socket:  " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
> }
> 
>  
> 
> //When you exit the program, close your sockets and then call
> zmq_ctx_destroy(). This destroys the context.
> 
>     rc = zmq_ctx_destroy(context);
> 
> if(rc == -1)
> 
> {
> 
> std::string errStr =  zmq_strerror(zmq_errno());
> 
> std::string errConc = "TestDataAccess: ZMQServer: Error Destroying
> Context:  " + errStr;
> 
> const char* errOut = errConc.c_str();
> 
> OutputDebugStringA(errOut);
> 
> return 1;//error
> 
> }
> 
>  
> 
>   
> 
>    
> 
>    OutputDebugStringA("TestDataAccess: ZMQServer: Server Shut Down.");
> 
>    return 0;
> 
> }
> 
>  
> 
>  
> 
> 
> 
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> 




More information about the zeromq-dev mailing list