[zeromq-dev] FW: ZeroMQ Error when client starts first, server second.
Christian Kamm
kamm at incasoftware.de
Sat Jul 27 18:16:06 CEST 2013
On 07/27/2013 11:22 AM, Riskybiz wrote:
> It is possible that a different thread could be trying to add to the
> vector while the response is being built but I had hoped that using the
> concurrent vector would eliminate the problem?
No. Your code thinks it's in the last iteration and sends without
SNDMORE - but then the vector grows and it tries to add another frame
afterwards and fails.
I don't know the concurrency library you are using, but maybe reading
the end() iterator just once would help.
Christian
The server loop is
> running on a different thread to the rest of the application. The ‘for’
> loop building the message uses an iterator to detect the ‘end’ of the
> DDV vector so that the just as much data as is presently available is
> sent. The vector only grows with push_back(), it’s never erased or
> changed; the only operations on the vector are push_back() and iterator
> dereferencing to fetch the underlying string. Also the program
> functionality is all normal when server starts first, so am not sure
> that the problem lies in composing the message.
>
>
>
> I forgot to mention in my post that the REP socket becomes unresponsive
> to further requests after the ‘Operation cannot be accomplished in
> current state’ error it can be seen how the client receives all of the
> message up to part 4462. The client then requests element 4463 from the
> vector but never gets a response.
>
>
>
> Riskybiz.
>
>
>
>
>
> _Further sample output:_
>
> _ _
>
> [5776] SD_Client: Message: 4458 4422 41481.266667 41481.291667 98.84000
> 98.61900
>
> [5776] SD_Client: Message: 4459 4401 41481.260417 41481.291667 98.84000
> 98.61900
>
> [5776] SD_Client: Message: 4460 4484 41481.286111 41481.292361 98.84000
> 98.71100
>
> [5776] SD_Client: Message: 4461 4487 41481.286806 41481.293056 98.84000
> 98.70200
>
> [5776] SD_Client: Message: 4462 4466 41481.280556 41481.293056 98.84000
> 98.70200
>
> [5776] SD_Client: Sending Data Request For Element No:4463
>
> [5776] SD_Client: Waiting For Reply....
>
> [5776] SD_Client: Waiting For Reply....
>
> [5776] SD_Client: Waiting For Reply....
>
> [5776] SD_Client: Waiting For Reply....
>
>
>
>
>
> *From:*Christian Kamm [mailto:kamm at incasoftware.de]
> *Sent:* 27 July 2013 06:31
> *To:* Riskybiz
> *Cc:* zeromq-dev at lists.zeromq.org <mailto:zeromq-dev at lists.zeromq.org>
> *Subject:* Re: [zeromq-dev] ZeroMQ Error when client starts first,
> server second.
>
>
>
>> [5100] TestDataAccess: ZMQServer: SendMore: 4461 4487 41481.286806
> 41481.293056 98.84000 98.70200
>> [5100] TestDataAccess: ZMQServer: FinalSend: 4462 4466 41481.280556
> 41481.293056 98.84000 98.70200
>> [5100] TestDataAccess: ZMQServer: Error In SendMore: Operation cannot
> be accomplished in current state
>> [5776] SD_Client: Message: 0 1 41479.919444 41479.926389 100.29400
>
> That looks to me like you are trying to send a message part after the
> final part has been sent. That'd be consistent with the client actually
> getting the finished message and the error message you're getting.
>
> I didn't see any synchronization in your code. Maybe another thread
> grows the DDV vector while the repose is being built?
>
> Christian
>
> Riskybiz <riskybizLive at live.com <mailto:riskybizLive at live.com>> wrote:
>
> I have a REQ & REP server and client configured so that the client sends
> a request for data beginning from a certain vector index. The server
> then sends messages (multipart if necessary) where each message part
> represents the contents of one vector element. On the first request it
> is common for a multipart message of several thousand parts to be sent
> by REP. Subsequent requests will result in either a ‘nothing further’
> message, a single message or a multipart message of just a few parts.
> The bulk of the data is sent on the first request.
>
>
>
> I am experiencing ZMQ behaviour I cannot explain. Please note this is
> my first ZMQ project!
>
>
>
> When the server is launched first with the client launched later; then
> all is well. The large multipart message goes through and is received
> properly, subsequent smaller transmissions can then be seen as expected.
> I am careful that only one request is sent from a client at a time, the
> client will wait for a reply so that the lock step REQ – REP routine is
> maintained.
>
>
>
> HOWEVER if the CLIENT is launched FIRST and the SERVER a few (maybe 10)
> seconds LATER then an error becomes apparent.
>
>
>
> The first request from REQ results in a large multipart message being
> sent from the server REP. The sample output below shows the ‘FinalSend’
> from the server when part 4462 of a large multipart message is sent.
> Then the error ‘Operation cannot be accomplished in current state’ is
> shown. This error causes an inelegant exit by my application.
>
>
>
> After the server error the client can be seen processing the large
> multipart message as it is received beginning with part 0, 1, 2, 3 etc….
>
>
>
> Now; I am not sure how to correct this problem. Looking at the patterns
> of behaviour for REQ & REP sockets does not give a direct answer other
> than a suspicion that a high-water-mark issue could be present? Or
> perhaps there is some socket option that needs resetting or has got stuck?
>
>
>
> Any tips or help much appreciated. It’s really important to my
> application that the server & client can launch , bind & connect in any
> order.
>
>
>
> With thanks,
>
>
>
> Riskybiz.
>
>
>
> _Sample Output:_
>
>
>
> [5100] TestDataAccess: ZMQServer: SendMore: 4453 4457 41481.277778
> 41481.290278 98.84000 98.75000
>
> [5100] TestDataAccess: ZMQServer: SendMore: 4454 4479 41481.284722
> 41481.290972 98.84000 98.76900
>
> [5100] TestDataAccess: ZMQServer: SendMore: 4455 4482 41481.285417
> 41481.291667 98.84000 98.74400
>
> [5100] TestDataAccess: ZMQServer: SendMore: 4456 4462 41481.279167
> 41481.291667 98.84000 98.74400
>
> [5100] TestDataAccess: ZMQServer: SendMore: 4457 4442 41481.272917
> 41481.291667 98.84000 98.66400
>
> [5100] TestDataAccess: ZMQServer: SendMore: 4458 4422 41481.266667
> 41481.291667 98.84000 98.61900
>
> [5100] TestDataAccess: ZMQServer: SendMore: 4459 4401 41481.260417
> 41481.291667 98.84000 98.61900
>
> [5100] TestDataAccess: ZMQServer: SendMore: 4460 4484 41481.286111
> 41481.292361 98.84000 98.71100
>
> [5100] TestDataAccess: ZMQServer: SendMore: 4461 4487 41481.286806
> 41481.293056 98.84000 98.70200
>
> [5100] TestDataAccess: ZMQServer: FinalSend: 4462 4466 41481.280556
> 41481.293056 98.84000 98.70200
>
> [5100] TestDataAccess: ZMQServer: Error In SendMore: Operation cannot be
> accomplished in current state
>
> [5776] SD_Client: Message: 0 1 41479.919444 41479.926389 100.29400 100.23900
>
> [5776] SD_Client: Message: 1 4 41479.920139 41479.927083 100.29400 100.21900
>
> [5776] SD_Client: Message: 2 6 41479.920833 41479.927778 100.29400 100.18000
>
> [5776] SD_Client: Message: 3 9 41479.921528 41479.928472 100.29400 100.18000
>
> [5776] SD_Client: Message: 4 10 41479.922222 41479.929167 100.29400
> 100.18000
>
> [5776] SD_Client: Message: 5 13 41479.922917 41479.929861 100.29400
> 100.18000
>
> [5776] SD_Client: Message: 6 15 41479.923611 41479.930556 100.29400
> 100.18000
>
> [5776] SD_Client: Message: 7 18 41479.924306 41479.931250 100.28700
> 100.18000
>
> [5776] SD_Client: Message: 8 22 41479.925694 41479.931944 100.28700
> 100.18000
>
> [5776] SD_Client: Message: 9 2 41479.919444 41479.931944 100.29400 100.18000
>
> [5776] SD_Client: Message: 10 23 41479.926389 41479.932639 100.24900
> 100.18000
>
>
>
>
>
> _Server Code:_
>
>
>
> int DataServer()
>
> {
>
> //Prepare the context
>
> void *context = zmq_ctx_new();
>
> if(context == NULL)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Creating
> Context: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> }
>
>
>
>
>
> //Set
>
> bool runLoop(true);
>
>
>
> //Prepare the socket
>
> void *server = zmq_socket(context, ZMQ_REP);
>
> if(server == NULL)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Creating REP
> socket: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> }
>
>
>
> // Configure socket to not wait at close time
>
> int linger = 0;
>
> int rc = zmq_setsockopt(server, ZMQ_LINGER, &linger, sizeof (linger));
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Setting Socket Options
> Failed: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;
>
> }
>
>
>
> rc = zmq_bind(server, "tcp://*:5555");
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Bind To REP Failed: "
> + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;
>
> }
>
>
>
>
>
> OutputDebugStringA("TestDataAccess: ZMQServer: Data Server Ready.... ");
>
> //Listen for request
>
> while(runLoop)
>
> {
>
> OutputDebugStringA("TestDataAccess: ZMQServer: Waiting For A Request....");
>
> //Wait for a request
>
> zmq_msg_t request;
>
> int rc = zmq_msg_init(&request);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
> Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> }
>
>
>
> rc = zmq_msg_recv(&request, server, 0);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Receiving
> Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
>
>
> //Release Message
>
> rc = zmq_msg_close(&request);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing
> Received Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> }
>
> return 1;//error
>
> }
>
> OutputDebugStringA("TestDataAccess: ZMQServer: Processing Request....");
>
>
>
> //Cast the request data to a string
>
> std::string reqStr(static_cast<char*>(zmq_msg_data(&request)),
> zmq_msg_size(&request));
>
>
>
> unsigned int startFromElement;
>
>
>
> if(reqStr == "kill")
>
> {
>
> //Send reply
>
> std::string outMsg = "OK killing";
>
> zmq_msg_t messageOut;
>
>
>
> int rc = zmq_msg_init_size(&messageOut, outMsg.size());
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
> Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> }
>
>
>
> memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());
>
>
>
> rc = zmq_msg_send(&messageOut, server, 0);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Sending Shutdown
> Acknowledgement: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> //Release Message
>
> rc = zmq_msg_close(&messageOut);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
> Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> }
>
> }
>
>
>
> //Release Message
>
> rc = zmq_msg_close(&request);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing
> Received Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> }
>
> OutputDebugStringA("TestDataAccess: ZMQServer: Shutting Down Server....");
>
> runLoop = false;
>
> break;
>
> }
>
> else
>
> {
>
> //Convert the request string to an integer
>
> if ( ! (std::istringstream(reqStr) >> startFromElement) )
> startFromElement = 0;//checks that the string actually contains numeric
> characters; returns 0 if not;
>
> }
>
>
>
>
>
> //Prepare the reply
>
> if (startFromElement >= DDV->size() || DDV->size() == 0)
>
> {
>
> //Client already has all the data
>
> std::string outMsg = reqStr + " Nothing Further";
>
> zmq_msg_t messageOut;
>
>
>
> int rc = zmq_msg_init_size(&messageOut, outMsg.size());
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
> Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> }
>
>
>
> memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());
>
> rc = zmq_msg_send(&messageOut, server, 0);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Sending 'Nothing
> Further': " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> }
>
> else if(rc >= 0)
>
> {
>
> std::string outStr = "TestDataAccess: ZMQServer: Message: " + reqStr + "
> Nothing Further";
>
> const char* msgOut = outStr.c_str();
>
> OutputDebugStringA(msgOut);
>
> }
>
>
>
> //Release Message
>
> rc = zmq_msg_close(&messageOut);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
> Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> }
>
>
>
> }
>
> else
>
> {
>
>
>
> //There is data to send
>
> for(auto iter = DDV->begin() + startFromElement; iter != DDV->end(); iter++)
>
> {
>
> if(iter + 1 != DDV->end())
>
> {
>
> std::string elemStr = static_cast<std::ostringstream*>(
> &(std::ostringstream() << distance(DDV->begin(),iter)))->str();
>
> std::string outMsg = elemStr + " " + *iter;
>
>
>
> //SendMore
>
> zmq_msg_t messageOut;
>
>
>
> int rc = zmq_msg_init_size(&messageOut, outMsg.size());
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
> Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> }
>
>
>
> memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());
>
>
>
> rc = zmq_msg_send(&messageOut, server, ZMQ_SNDMORE);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error In SendMore: " +
> errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
>
>
> //Release Message
>
> rc = zmq_msg_close(&messageOut);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
> Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> }
>
> }
>
> else if(rc >= 0)
>
> {
>
> std::string outStr = "TestDataAccess: ZMQServer: SendMore: " + elemStr +
> " " + *iter;
>
> const char* msgOut = outStr.c_str();
>
> OutputDebugStringA(msgOut);
>
> }
>
>
>
> //Release Message
>
> rc = zmq_msg_close(&messageOut);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
> Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> }
>
>
>
>
>
> }
>
> else if(iter + 1 == DDV->end())
>
> {
>
> std::string elemStr = static_cast<std::ostringstream*>(
> &(std::ostringstream() << distance(DDV->begin(),iter)))->str();
>
> std::string outMsg = elemStr + " " + *iter;
>
> //Final Send
>
> zmq_msg_t messageOut;
>
>
>
> int rc = zmq_msg_init_size(&messageOut, outMsg.size());
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
> Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> }
>
>
>
> memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());
>
>
>
> rc = zmq_msg_send(&messageOut, server, 0);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error In FinalSend: "
> + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
>
>
> //Release Message
>
> rc = zmq_msg_close(&messageOut);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
> Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> }
>
> return 1;//error
>
> }
>
> else if(rc >= 0)
>
> {
>
> std::string outStr = "TestDataAccess: ZMQServer: FinalSend: " + elemStr
> + " " + *iter;
>
> const char* msgOut = outStr.c_str();
>
> OutputDebugStringA(msgOut);
>
> }
>
>
>
> //Release Message
>
> rc = zmq_msg_close(&messageOut);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
> Message: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> }
>
>
>
> }
>
> }//for
>
> }//else
>
> }//while
>
>
>
>
>
> //Close Server Socket
>
> rc = zmq_close(server);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Closing Server
> Socket: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> }
>
>
>
> //When you exit the program, close your sockets and then call
> zmq_ctx_destroy(). This destroys the context.
>
> rc = zmq_ctx_destroy(context);
>
> if(rc == -1)
>
> {
>
> std::string errStr = zmq_strerror(zmq_errno());
>
> std::string errConc = "TestDataAccess: ZMQServer: Error Destroying
> Context: " + errStr;
>
> const char* errOut = errConc.c_str();
>
> OutputDebugStringA(errOut);
>
> return 1;//error
>
> }
>
>
>
>
>
>
>
> OutputDebugStringA("TestDataAccess: ZMQServer: Server Shut Down.");
>
> return 0;
>
> }
>
>
>
>
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
More information about the zeromq-dev
mailing list