[zeromq-dev] SUB socket: parts of multipart message getting reversed

Sergey Malov smalov at mdialog.com
Fri Feb 17 17:41:18 CET 2012

I am working on a project where two applications talk to each other
exchanging protocol buffer messages through ZeroMQ sockets. Applications
are written in Scala (2.9.1), Java binding is used to manipulate ZeroMQ
sockets, messages are being serialized using protocol buffer's Java binding
as well. Underlying ZeroMQ library is 2.1.11, Java binding (for ZeroMQ)
compiled from the latest source.
The flow of messages is the following.
Application A pushes message to application B (push/pull socket's pair), B
de-serializes message, processes it, then serializes result and publishes
it for A to pick up (publish/subscribe socket pair). Message A->B is
multipart, first part being a key for message B->A subscription (which is
obviously, multipart too), second part - being an actual message body .
Most (if not all) of the B->A messages arrive (i.e. polling for appropriate
socket succeeds), however occasionally I see for two socket.recv(0) calls
on A side: first call picks up a *body* of the previously sent A->B
message, second call picks up a *key* for the current message.
Obviously it causes major problems processing response in A. I should also
note that it happens only when rate of A->B messages is sufficiently high
(like 500 messages per second).

It is very possible that I did something wrong designing a message flow at
the first place, so below are the relevant (I think) parts of A and B.
Any advice would be very welcome.

Sergey Malov

Application A :
                subSocket.setLinger(...); subSocket.setReceiveTimeOut(...)


val status = poller.register(subSocket) ; // then check status
 val address = new Random().nextInt().toString.getBytes;

pushSocket.send( address, ZMQ.SNDMORE);
val pushMsg = ..... // create protocol buffer message

if (poller.pollin(0))
 val rspAddress = subSocket.recv(0); <- this should be subscription
address, but occasionally it is message body !
val rspBytes = subSocket.recv(0);   <- this should be message body, but
occasionally it is address
                        // sleep pre-defined time to get a steady message

Application B:

                  poller.setTimeout( ....) );
                  val status1 = poller.register(pullSocket); // then check

                  val status2 = poller.register(pubSocket) // then check
  while (true)
  if (poller.pollin(0)) {
  val address = pullSocket.recv(0);
  val inMessage = pullSocket.recv(0);
  // process incoming message
  val result = ... // receive result
  pubSocket.send(address, ZMQ.SNDMORE);
   pubSocket.send(result.toByteArray, 0);
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20120217/297de94f/attachment.htm>

More information about the zeromq-dev mailing list