[zeromq-dev] FW: ZeroMQ Error when client starts first, server second.
Riskybiz
riskybizLive at live.com
Sat Jul 27 11:22:57 CEST 2013
From: Riskybiz [mailto:riskybizLive at live.com]
Sent: 27 July 2013 10:22
To: 'Christian Kamm'
Subject: RE: [zeromq-dev] ZeroMQ Error when client starts first, server second.
Hi Christian, thanks for the reply. The DDV vector is declared like so:
concurrency::concurrent_vector<std::string> *DDV = new concurrency::concurrent_vector<std::string>;//create concurrent vector
It is possible that a different thread could be trying to add to the vector while the response is being built but I had hoped that using the concurrent vector would eliminate the problem? The server loop is running on a different thread to the rest of the application. The ‘for’ loop building the message uses an iterator to detect the ‘end’ of the DDV vector so that the just as much data as is presently available is sent. The vector only grows with push_back(), it’s never erased or changed; the only operations on the vector are push_back() and iterator dereferencing to fetch the underlying string. Also the program functionality is all normal when server starts first, so am not sure that the problem lies in composing the message.
I forgot to mention in my post that the REP socket becomes unresponsive to further requests after the ‘Operation cannot be accomplished in current state’ error it can be seen how the client receives all of the message up to part 4462. The client then requests element 4463 from the vector but never gets a response.
Riskybiz.
Further sample output:
[5776] SD_Client: Message: 4458 4422 41481.266667 41481.291667 98.84000 98.61900
[5776] SD_Client: Message: 4459 4401 41481.260417 41481.291667 98.84000 98.61900
[5776] SD_Client: Message: 4460 4484 41481.286111 41481.292361 98.84000 98.71100
[5776] SD_Client: Message: 4461 4487 41481.286806 41481.293056 98.84000 98.70200
[5776] SD_Client: Message: 4462 4466 41481.280556 41481.293056 98.84000 98.70200
[5776] SD_Client: Sending Data Request For Element No:4463
[5776] SD_Client: Waiting For Reply....
[5776] SD_Client: Waiting For Reply....
[5776] SD_Client: Waiting For Reply....
[5776] SD_Client: Waiting For Reply....
From: Christian Kamm [mailto:kamm at incasoftware.de]
Sent: 27 July 2013 06:31
To: Riskybiz
Cc: zeromq-dev at lists.zeromq.org
Subject: Re: [zeromq-dev] ZeroMQ Error when client starts first, server second.
> [5100] TestDataAccess: ZMQServer: SendMore: 4461 4487 41481.286806 41481.293056 98.84000 98.70200
> [5100] TestDataAccess: ZMQServer: FinalSend: 4462 4466 41481.280556 41481.293056 98.84000 98.70200
> [5100] TestDataAccess: ZMQServer: Error In SendMore: Operation cannot be accomplished in current state
> [5776] SD_Client: Message: 0 1 41479.919444 41479.926389 100.29400
That looks to me like you are trying to send a message part after the final part has been sent. That'd be consistent with the client actually getting the finished message and the error message you're getting.
I didn't see any synchronization in your code. Maybe another thread grows the DDV vector while the repose is being built?
Christian
Riskybiz <riskybizLive at live.com> wrote:
I have a REQ & REP server and client configured so that the client sends a request for data beginning from a certain vector index. The server then sends messages (multipart if necessary) where each message part represents the contents of one vector element. On the first request it is common for a multipart message of several thousand parts to be sent by REP. Subsequent requests will result in either a ‘nothing further’ message, a single message or a multipart message of just a few parts. The bulk of the data is sent on the first request.
I am experiencing ZMQ behaviour I cannot explain. Please note this is my first ZMQ project!
When the server is launched first with the client launched later; then all is well. The large multipart message goes through and is received properly, subsequent smaller transmissions can then be seen as expected. I am careful that only one request is sent from a client at a time, the client will wait for a reply so that the lock step REQ – REP routine is maintained.
HOWEVER if the CLIENT is launched FIRST and the SERVER a few (maybe 10) seconds LATER then an error becomes apparent.
The first request from REQ results in a large multipart message being sent from the server REP. The sample output below shows the ‘FinalSend’ from the server when part 4462 of a large multipart message is sent. Then the error ‘Operation cannot be accomplished in current state’ is shown. This error causes an inelegant exit by my application.
After the server error the client can be seen processing the large multipart message as it is received beginning with part 0, 1, 2, 3 etc….
Now; I am not sure how to correct this problem. Looking at the patterns of behaviour for REQ & REP sockets does not give a direct answer other than a suspicion that a high-water-mark issue could be present? Or perhaps there is some socket option that needs resetting or has got stuck?
Any tips or help much appreciated. It’s really important to my application that the server & client can launch , bind & connect in any order.
With thanks,
Riskybiz.
Sample Output:
[5100] TestDataAccess: ZMQServer: SendMore: 4453 4457 41481.277778 41481.290278 98.84000 98.75000
[5100] TestDataAccess: ZMQServer: SendMore: 4454 4479 41481.284722 41481.290972 98.84000 98.76900
[5100] TestDataAccess: ZMQServer: SendMore: 4455 4482 41481.285417 41481.291667 98.84000 98.74400
[5100] TestDataAccess: ZMQServer: SendMore: 4456 4462 41481.279167 41481.291667 98.84000 98.74400
[5100] TestDataAccess: ZMQServer: SendMore: 4457 4442 41481.272917 41481.291667 98.84000 98.66400
[5100] TestDataAccess: ZMQServer: SendMore: 4458 4422 41481.266667 41481.291667 98.84000 98.61900
[5100] TestDataAccess: ZMQServer: SendMore: 4459 4401 41481.260417 41481.291667 98.84000 98.61900
[5100] TestDataAccess: ZMQServer: SendMore: 4460 4484 41481.286111 41481.292361 98.84000 98.71100
[5100] TestDataAccess: ZMQServer: SendMore: 4461 4487 41481.286806 41481.293056 98.84000 98.70200
[5100] TestDataAccess: ZMQServer: FinalSend: 4462 4466 41481.280556 41481.293056 98.84000 98.70200
[5100] TestDataAccess: ZMQServer: Error In SendMore: Operation cannot be accomplished in current state
[5776] SD_Client: Message: 0 1 41479.919444 41479.926389 100.29400 100.23900
[5776] SD_Client: Message: 1 4 41479.920139 41479.927083 100.29400 100.21900
[5776] SD_Client: Message: 2 6 41479.920833 41479.927778 100.29400 100.18000
[5776] SD_Client: Message: 3 9 41479.921528 41479.928472 100.29400 100.18000
[5776] SD_Client: Message: 4 10 41479.922222 41479.929167 100.29400 100.18000
[5776] SD_Client: Message: 5 13 41479.922917 41479.929861 100.29400 100.18000
[5776] SD_Client: Message: 6 15 41479.923611 41479.930556 100.29400 100.18000
[5776] SD_Client: Message: 7 18 41479.924306 41479.931250 100.28700 100.18000
[5776] SD_Client: Message: 8 22 41479.925694 41479.931944 100.28700 100.18000
[5776] SD_Client: Message: 9 2 41479.919444 41479.931944 100.29400 100.18000
[5776] SD_Client: Message: 10 23 41479.926389 41479.932639 100.24900 100.18000
Server Code:
int DataServer()
{
//Prepare the context
void *context = zmq_ctx_new();
if(context == NULL)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Creating Context: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
}
//Set
bool runLoop(true);
//Prepare the socket
void *server = zmq_socket(context, ZMQ_REP);
if(server == NULL)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Creating REP socket: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
}
// Configure socket to not wait at close time
int linger = 0;
int rc = zmq_setsockopt(server, ZMQ_LINGER, &linger, sizeof (linger));
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Setting Socket Options Failed: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;
}
rc = zmq_bind(server, "tcp://*:5555");
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Bind To REP Failed: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;
}
OutputDebugStringA("TestDataAccess: ZMQServer: Data Server Ready.... ");
//Listen for request
while(runLoop)
{
OutputDebugStringA("TestDataAccess: ZMQServer: Waiting For A Request....");
//Wait for a request
zmq_msg_t request;
int rc = zmq_msg_init(&request);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Initialising Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
}
rc = zmq_msg_recv(&request, server, 0);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Receiving Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
//Release Message
rc = zmq_msg_close(&request);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Received Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
}
return 1;//error
}
OutputDebugStringA("TestDataAccess: ZMQServer: Processing Request....");
//Cast the request data to a string
std::string reqStr(static_cast<char*>(zmq_msg_data(&request)), zmq_msg_size(&request));
unsigned int startFromElement;
if(reqStr == "kill")
{
//Send reply
std::string outMsg = "OK killing";
zmq_msg_t messageOut;
int rc = zmq_msg_init_size(&messageOut, outMsg.size());
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Initialising Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
}
memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());
rc = zmq_msg_send(&messageOut, server, 0);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Sending Shutdown Acknowledgement: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
//Release Message
rc = zmq_msg_close(&messageOut);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
}
}
//Release Message
rc = zmq_msg_close(&request);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Received Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
}
OutputDebugStringA("TestDataAccess: ZMQServer: Shutting Down Server....");
runLoop = false;
break;
}
else
{
//Convert the request string to an integer
if ( ! (std::istringstream(reqStr) >> startFromElement) ) startFromElement = 0;//checks that the string actually contains numeric characters; returns 0 if not;
}
//Prepare the reply
if (startFromElement >= DDV->size() || DDV->size() == 0)
{
//Client already has all the data
std::string outMsg = reqStr + " Nothing Further";
zmq_msg_t messageOut;
int rc = zmq_msg_init_size(&messageOut, outMsg.size());
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Initialising Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
}
memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());
rc = zmq_msg_send(&messageOut, server, 0);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Sending 'Nothing Further': " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
}
else if(rc >= 0)
{
std::string outStr = "TestDataAccess: ZMQServer: Message: " + reqStr + " Nothing Further";
const char* msgOut = outStr.c_str();
OutputDebugStringA(msgOut);
}
//Release Message
rc = zmq_msg_close(&messageOut);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
}
}
else
{
//There is data to send
for(auto iter = DDV->begin() + startFromElement; iter != DDV->end(); iter++)
{
if(iter + 1 != DDV->end())
{
std::string elemStr = static_cast<std::ostringstream*>( &(std::ostringstream() << distance(DDV->begin(),iter)))->str();
std::string outMsg = elemStr + " " + *iter;
//SendMore
zmq_msg_t messageOut;
int rc = zmq_msg_init_size(&messageOut, outMsg.size());
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Initialising Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
}
memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());
rc = zmq_msg_send(&messageOut, server, ZMQ_SNDMORE);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error In SendMore: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
//Release Message
rc = zmq_msg_close(&messageOut);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
}
}
else if(rc >= 0)
{
std::string outStr = "TestDataAccess: ZMQServer: SendMore: " + elemStr + " " + *iter;
const char* msgOut = outStr.c_str();
OutputDebugStringA(msgOut);
}
//Release Message
rc = zmq_msg_close(&messageOut);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
}
}
else if(iter + 1 == DDV->end())
{
std::string elemStr = static_cast<std::ostringstream*>( &(std::ostringstream() << distance(DDV->begin(),iter)))->str();
std::string outMsg = elemStr + " " + *iter;
//Final Send
zmq_msg_t messageOut;
int rc = zmq_msg_init_size(&messageOut, outMsg.size());
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Initialising Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
}
memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());
rc = zmq_msg_send(&messageOut, server, 0);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error In FinalSend: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
//Release Message
rc = zmq_msg_close(&messageOut);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
}
return 1;//error
}
else if(rc >= 0)
{
std::string outStr = "TestDataAccess: ZMQServer: FinalSend: " + elemStr + " " + *iter;
const char* msgOut = outStr.c_str();
OutputDebugStringA(msgOut);
}
//Release Message
rc = zmq_msg_close(&messageOut);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent Message: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
}
}
}//for
}//else
}//while
//Close Server Socket
rc = zmq_close(server);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Closing Server Socket: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
}
//When you exit the program, close your sockets and then call zmq_ctx_destroy(). This destroys the context.
rc = zmq_ctx_destroy(context);
if(rc == -1)
{
std::string errStr = zmq_strerror(zmq_errno());
std::string errConc = "TestDataAccess: ZMQServer: Error Destroying Context: " + errStr;
const char* errOut = errConc.c_str();
OutputDebugStringA(errOut);
return 1;//error
}
OutputDebugStringA("TestDataAccess: ZMQServer: Server Shut Down.");
return 0;
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20130727/e001e01a/attachment.htm>
More information about the zeromq-dev
mailing list