[zeromq-dev] SUB socket: parts of multipart message getting reversed
Sergey Malov
smalov at mdialog.com
Fri Feb 17 17:41:18 CET 2012
Hi,
I am working on a project where two applications talk to each other
exchanging protocol buffer messages through ZeroMQ sockets. Applications
are written in Scala (2.9.1), Java binding is used to manipulate ZeroMQ
sockets, messages are being serialized using protocol buffer's Java binding
as well. Underlying ZeroMQ library is 2.1.11, Java binding (for ZeroMQ)
compiled from the latest source.
The flow of messages is the following.
Application A pushes message to application B (push/pull socket's pair), B
de-serializes message, processes it, then serializes result and publishes
it for A to pick up (publish/subscribe socket pair). Message A->B is
multipart, first part being a key for message B->A subscription (which is
obviously, multipart too), second part - being an actual message body .
Most (if not all) of the B->A messages arrive (i.e. polling for appropriate
socket succeeds), however occasionally I see for two socket.recv(0) calls
on A side: first call picks up a *body* of the previously sent A->B
message, second call picks up a *key* for the current message.
Obviously it causes major problems processing response in A. I should also
note that it happens only when rate of A->B messages is sufficiently high
(like 500 messages per second).
It is very possible that I did something wrong designing a message flow at
the first place, so below are the relevant (I think) parts of A and B.
Any advice would be very welcome.
Sergey Malov
Application A :
--------------------
pushSocket.setLinger(...);
subSocket.setLinger(...); subSocket.setReceiveTimeOut(...)
poller.setTimeout(...);
pushSocket.connect(pushEndPoint);
subSocket.connect(subEndPoint);
val status = poller.register(subSocket) ; // then check status
val address = new Random().nextInt().toString.getBytes;
while(true)
{
pushSocket.send( address, ZMQ.SNDMORE);
val pushMsg = ..... // create protocol buffer message
pushSocket.send(pushMsg.toByteArray,0);
subSocket.subscribe(address);
poller.poll;
if (poller.pollin(0))
{
val rspAddress = subSocket.recv(0); <- this should be subscription
address, but occasionally it is message body !
val rspBytes = subSocket.recv(0); <- this should be message body, but
occasionally it is address
}
// sleep pre-defined time to get a steady message
rate
Thread.sleep(SLEEP_TIME);
}
}
Application B:
-------------------
poller.setTimeout( ....) );
pullSocket.bind(pullEndPoint);
val status1 = poller.register(pullSocket); // then check
status
pubSocket.bind(pubEndPoint);
val status2 = poller.register(pubSocket) // then check
status
while (true)
{
poller.poll;
if (poller.pollin(0)) {
val address = pullSocket.recv(0);
val inMessage = pullSocket.recv(0);
// process incoming message
val result = ... // receive result
pubSocket.send(address, ZMQ.SNDMORE);
pubSocket.send(result.toByteArray, 0);
};
};
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20120217/297de94f/attachment.htm>
More information about the zeromq-dev
mailing list