[zeromq-dev] ZeroMQ Error when client starts first, server second.

Riskybiz riskybizLive at live.com
Sat Jul 27 00:58:44 CEST 2013


I have a REQ & REP server and client configured so that the client sends a
request for data beginning from a certain vector index.  The server then
sends messages (multipart if necessary) where each message part represents
the contents of one vector element.  On the first request it is common for a
multipart message of several thousand parts to be sent by REP.  Subsequent
requests will result in either a 'nothing further' message, a single message
or a multipart message of just a few parts.  The bulk of the data is sent on
the first request.

 

I am experiencing ZMQ behaviour I cannot explain.  Please note this is my
first ZMQ project!

 

When the server is launched first with the client launched later; then all
is well.  The large multipart message goes through and is received properly,
subsequent smaller transmissions can then be seen as expected. I am careful
that only one request is sent from a client at a time, the client will wait
for a reply so that the lock step REQ - REP routine is maintained.

 

HOWEVER if the CLIENT is launched FIRST and the SERVER a few (maybe 10)
seconds LATER then an error becomes apparent.  

 

The first request from REQ results in a large multipart message being sent
from the server REP.  The sample output below shows the 'FinalSend' from the
server when part 4462 of a large multipart message is sent.  Then the error
'Operation cannot be accomplished in current state' is shown.  This error
causes an inelegant exit by my application.

 

After the server error the client can be seen processing the large multipart
message as it is received beginning with part 0, 1, 2, 3 etc..

 

Now; I am not sure how to correct this problem.  Looking at the patterns of
behaviour for REQ & REP sockets does not give a direct answer other than a
suspicion that a high-water-mark issue could be present?  Or perhaps there
is some socket option that needs resetting or has got stuck?

 

Any tips or help much appreciated.  It's really important to my application
that the server & client can launch , bind & connect in any order.

 

With thanks,

 

Riskybiz.

 

Sample Output:

 

[5100] TestDataAccess: ZMQServer: SendMore: 4453 4457 41481.277778
41481.290278 98.84000 98.75000

[5100] TestDataAccess: ZMQServer: SendMore: 4454 4479 41481.284722
41481.290972 98.84000 98.76900

[5100] TestDataAccess: ZMQServer: SendMore: 4455 4482 41481.285417
41481.291667 98.84000 98.74400

[5100] TestDataAccess: ZMQServer: SendMore: 4456 4462 41481.279167
41481.291667 98.84000 98.74400

[5100] TestDataAccess: ZMQServer: SendMore: 4457 4442 41481.272917
41481.291667 98.84000 98.66400

[5100] TestDataAccess: ZMQServer: SendMore: 4458 4422 41481.266667
41481.291667 98.84000 98.61900

[5100] TestDataAccess: ZMQServer: SendMore: 4459 4401 41481.260417
41481.291667 98.84000 98.61900

[5100] TestDataAccess: ZMQServer: SendMore: 4460 4484 41481.286111
41481.292361 98.84000 98.71100

[5100] TestDataAccess: ZMQServer: SendMore: 4461 4487 41481.286806
41481.293056 98.84000 98.70200

[5100] TestDataAccess: ZMQServer: FinalSend: 4462 4466 41481.280556
41481.293056 98.84000 98.70200

[5100] TestDataAccess: ZMQServer: Error In SendMore: Operation cannot be
accomplished in current state

[5776] SD_Client: Message: 0 1 41479.919444 41479.926389 100.29400 100.23900

[5776] SD_Client: Message: 1 4 41479.920139 41479.927083 100.29400 100.21900

[5776] SD_Client: Message: 2 6 41479.920833 41479.927778 100.29400 100.18000

[5776] SD_Client: Message: 3 9 41479.921528 41479.928472 100.29400 100.18000

[5776] SD_Client: Message: 4 10 41479.922222 41479.929167 100.29400
100.18000

[5776] SD_Client: Message: 5 13 41479.922917 41479.929861 100.29400
100.18000

[5776] SD_Client: Message: 6 15 41479.923611 41479.930556 100.29400
100.18000

[5776] SD_Client: Message: 7 18 41479.924306 41479.931250 100.28700
100.18000

[5776] SD_Client: Message: 8 22 41479.925694 41479.931944 100.28700
100.18000

[5776] SD_Client: Message: 9 2 41479.919444 41479.931944 100.29400 100.18000

[5776] SD_Client: Message: 10 23 41479.926389 41479.932639 100.24900
100.18000

 

 

Server Code:

 

int DataServer()

{

//Prepare the context

void *context = zmq_ctx_new();

if(context == NULL)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Creating Context: "
+ errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

}

 

 

//Set

bool runLoop(true);

 

//Prepare the socket

void *server = zmq_socket(context, ZMQ_REP);        

if(server == NULL)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Creating REP socket:
" + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

}

 

//  Configure socket to not wait at close time

    int linger = 0;

    int rc = zmq_setsockopt(server, ZMQ_LINGER, &linger, sizeof (linger));

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Setting Socket Options
Failed: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;

}

    

rc = zmq_bind(server, "tcp://*:5555");

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Bind To REP Failed: " +
errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;

}

 

 

OutputDebugStringA("TestDataAccess: ZMQServer: Data Server Ready.... ");

//Listen for request

while(runLoop)

{

OutputDebugStringA("TestDataAccess: ZMQServer: Waiting For A Request....");

//Wait for a request

zmq_msg_t request;

int rc = zmq_msg_init(&request);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
Message: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

}

 

rc = zmq_msg_recv(&request, server, 0);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Receiving Message: "
+ errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

 

//Release Message

rc = zmq_msg_close(&request);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Received
Message: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

}

return 1;//error

}

OutputDebugStringA("TestDataAccess: ZMQServer: Processing Request....");

 

//Cast the request data to a string

std::string reqStr(static_cast<char*>(zmq_msg_data(&request)),
zmq_msg_size(&request));

 

unsigned int startFromElement;

 

if(reqStr == "kill")

{

//Send reply

std::string outMsg = "OK killing";

zmq_msg_t messageOut;

 

int rc = zmq_msg_init_size(&messageOut, outMsg.size());

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
Message: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

}

 

memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());

 

rc = zmq_msg_send(&messageOut, server, 0);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Sending Shutdown
Acknowledgement: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

//Release Message

rc = zmq_msg_close(&messageOut);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
Message: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

}

}

 

//Release Message

rc = zmq_msg_close(&request);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Received
Message: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

}

OutputDebugStringA("TestDataAccess: ZMQServer: Shutting Down Server....");

runLoop = false;

break;

}

else

{

//Convert the request string to an integer                

if ( ! (std::istringstream(reqStr) >> startFromElement) ) startFromElement =
0;//checks that the string actually contains numeric characters; returns 0
if not;

}

 

 

//Prepare the reply

if (startFromElement >= DDV->size()  || DDV->size() == 0)

{

//Client already has all the data

std::string outMsg = reqStr + " Nothing Further";

 zmq_msg_t messageOut;

 

int rc = zmq_msg_init_size(&messageOut, outMsg.size());

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
Message: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

}

                 

 memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());

 rc = zmq_msg_send(&messageOut, server, 0);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Sending 'Nothing
Further': " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

}

else if(rc >= 0)

{

std::string outStr = "TestDataAccess: ZMQServer: Message: " + reqStr + "
Nothing Further";

const char* msgOut = outStr.c_str();

OutputDebugStringA(msgOut);

}

 

//Release Message

rc = zmq_msg_close(&messageOut);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
Message: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

            return 1;//error

}

 

 }

else

{

 

//There is data to send

for(auto iter = DDV->begin() + startFromElement; iter != DDV->end(); iter++)

{

if(iter + 1 != DDV->end())

{

std::string elemStr = static_cast<std::ostringstream*>(
&(std::ostringstream() <<  distance(DDV->begin(),iter)))->str();

std::string outMsg = elemStr + " " + *iter;

 

//SendMore

zmq_msg_t messageOut;

 

int rc = zmq_msg_init_size(&messageOut, outMsg.size());

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
Message: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

}                                        

 

memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());

 

rc = zmq_msg_send(&messageOut, server, ZMQ_SNDMORE);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error In SendMore: " +
errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

 

//Release Message

rc = zmq_msg_close(&messageOut);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
Message: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

}

}

else if(rc >= 0)

{

std::string outStr = "TestDataAccess: ZMQServer: SendMore: " + elemStr + " "
+ *iter;

const char* msgOut = outStr.c_str();

OutputDebugStringA(msgOut);

}

 

//Release Message

rc = zmq_msg_close(&messageOut);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
Message: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

}

 

 

}

else if(iter + 1 == DDV->end())

{

std::string elemStr = static_cast<std::ostringstream*>(
&(std::ostringstream() <<  distance(DDV->begin(),iter)))->str();

std::string outMsg = elemStr + " " + *iter;

//Final Send

zmq_msg_t messageOut;

 

int rc = zmq_msg_init_size(&messageOut, outMsg.size());

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Initialising
Message: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

}

 

memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size());

 

rc = zmq_msg_send(&messageOut, server, 0);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error In FinalSend: " +
errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

 

//Release Message

rc = zmq_msg_close(&messageOut);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
Message: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

}

return 1;//error

}

else if(rc >= 0)

{

std::string outStr = "TestDataAccess: ZMQServer: FinalSend: " + elemStr + "
" + *iter;

const char* msgOut = outStr.c_str();

OutputDebugStringA(msgOut);

}

 

//Release Message

rc = zmq_msg_close(&messageOut);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent
Message: " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

}

 

}

}//for

}//else

}//while

 

   

//Close Server Socket

rc = zmq_close(server);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Closing Server
Socket:  " + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

}

 

//When you exit the program, close your sockets and then call
zmq_ctx_destroy(). This destroys the context.

    rc = zmq_ctx_destroy(context);

if(rc == -1)

{

std::string errStr =  zmq_strerror(zmq_errno());

std::string errConc = "TestDataAccess: ZMQServer: Error Destroying Context:
" + errStr;

const char* errOut = errConc.c_str();

OutputDebugStringA(errOut);

return 1;//error

}

 

   

   

   OutputDebugStringA("TestDataAccess: ZMQServer: Server Shut Down.");

   return 0;

}

 

 

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20130726/8538232b/attachment.htm>


More information about the zeromq-dev mailing list