[zeromq-dev] SUB socket: parts of multipart message getting reversed

Joshua Foster jhawk28 at gmail.com
Fri Feb 17 23:13:53 CET 2012


I would check to make sure that you are not actually wrapping messages between the loop. try using a socket.hasReceiveMore() before pulling the next part and then using draining the rest of the message parts if it doesn't match your expectation.

A typical drain call that you can use could be like this: 
while (socket.hasRecvMore()) {
  socket.recv(ZMQ.NOBLOCK)
}

You could also add logging to see when this occurs. I have noticed on 3.1 in testing that I sometimes don't get the first frame which could throw the process off. https://zeromq.jira.com/browse/LIBZMQ-283 has something like this (note: 3.0 so it may not be in 2.1.11).

Joshua

On Feb 17, 2012, at 11:41 AM, Sergey Malov wrote:

> Hi,
> I am working on a project where two applications talk to each other exchanging protocol buffer messages through ZeroMQ sockets. Applications are written in Scala (2.9.1), Java binding is used to manipulate ZeroMQ sockets, messages are being serialized using protocol buffer's Java binding as well. Underlying ZeroMQ library is 2.1.11, Java binding (for ZeroMQ) compiled from the latest source.
> The flow of messages is the following.
> Application A pushes message to application B (push/pull socket's pair), B de-serializes message, processes it, then serializes result and publishes it for A to pick up (publish/subscribe socket pair). Message A->B is multipart, first part being a key for message B->A subscription (which is obviously, multipart too), second part - being an actual message body .
> Most (if not all) of the B->A messages arrive (i.e. polling for appropriate socket succeeds), however occasionally I see for two socket.recv(0) calls on A side: first call picks up a *body* of the previously sent A->B message, second call picks up a *key* for the current message. 
> Obviously it causes major problems processing response in A. I should also note that it happens only when rate of A->B messages is sufficiently high (like 500 messages per second).
> 
> It is very possible that I did something wrong designing a message flow at the first place, so below are the relevant (I think) parts of A and B.
> Any advice would be very welcome.
> 
> Sergey Malov
> 
> 
> Application A :
> --------------------
>                 pushSocket.setLinger(...);
>                 subSocket.setLinger(...); subSocket.setReceiveTimeOut(...)
> 
> 		poller.setTimeout(...);
> 		pushSocket.connect(pushEndPoint);
> 
> 		subSocket.connect(subEndPoint);
> 		val status = poller.register(subSocket) ; // then check status
> 		val address = new Random().nextInt().toString.getBytes;
> 
> 		while(true)
> 		{
> 			pushSocket.send( address, ZMQ.SNDMORE);
> 			val pushMsg = ..... // create protocol buffer message
> 			pushSocket.send(pushMsg.toByteArray,0);
> 
> 			subSocket.subscribe(address);
> 			poller.poll;
> 			if (poller.pollin(0))
> 			{
> 				val rspAddress = subSocket.recv(0); <- this should be subscription address, but occasionally it is message body !
> 				val rspBytes = subSocket.recv(0);   <- this should be message body, but occasionally it is address 
> 			}
>                         // sleep pre-defined time to get a steady message rate 
> 			Thread.sleep(SLEEP_TIME);
> 		}
> 	}
> 
> Application B:
> -------------------
> 
>                   poller.setTimeout( ....) ); 
>                   pullSocket.bind(pullEndPoint);
>                   val status1 = poller.register(pullSocket); // then check status
>   
>                   pubSocket.bind(pubEndPoint);
>                   val status2 = poller.register(pubSocket) // then check status 
> 		  while (true)
> 		  {
> 			  poller.poll;
> 			  if (poller.pollin(0)) {
> 				  val address = pullSocket.recv(0);
> 				  val inMessage = pullSocket.recv(0);
> 				  // process incoming message 
> 				  val result = ... // receive result
> 				  pubSocket.send(address, ZMQ.SNDMORE);
> 				  pubSocket.send(result.toByteArray, 0);
> 			  };
> 		  };
> 
> 
> 
>  
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev




More information about the zeromq-dev mailing list