[zeromq-dev] jzmq recvZeroCopy not returning anything to me and cause error while exit
Alex Suo
alex.suo at gmail.com
Tue Sep 17 15:05:45 CEST 2013
Hi there,
Thank you for your time in advance. I am doing my high performance system
based on ZeroMQ and jzmq in Java. While I am trying to use direct buffer, I
encountered some problem. I have summarize it into the following test code
in windows.
My unit test environment is:
Windows 7 64bit
ZeroMQ 3.2.3
jzmq master from GIT, checked out in July 2013
JDK 1.7.0_25
And the ftest code is as following. You can see in the subscriber thread
that if I use the recv(ZMQ.DONTWAIT) way it works perfectly; but if I use
the direct buffer, it gives me no response while running and gives me the
following error on exit:
Exception in thread "Thread-0" org.zeromq.ZMQException: Resource
temporarily unavailable(0xb)
at org.zeromq.ZMQ$Socket.recvZeroCopy(Native Method)
at
org.as.algo.messaging.bus.zmq.ZMQReadynessTest$1.run(ZMQReadynessTest.java:48)
If I try to use in-direct ByteBuffer, I don't have exception on exit but
still cannot receive anything.
Please kindly assist me. Lots of thanks.
Alex
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
import org.zeromq.ZMQ;
/**
* Test if ZMQ is ready on this box.
*
* @author Alex Suo
*
*/
public class ZMQReadynessTest {
private ZMQ.Context context;
@Before
public void setUp() {
context = ZMQ.context(1);
}
@Test
public void testSimpleMessage() {
String topic = "tcp://127.0.0.1:31216";
final AtomicInteger counter = new AtomicInteger();
// create a simple subscriber
final ZMQ.Socket subscribeSocket = context.socket(ZMQ.SUB);
subscribeSocket.connect(topic);
subscribeSocket.subscribe("TestTopic".getBytes());
Thread subThread = new Thread() {
@Override
public void run() {
while (true) {
String value = null;
// This would result in trouble
{
ByteBuffer buffer = ByteBuffer.allocateDirect(100);
if (subscribeSocket.recvZeroCopy(buffer,
buffer.remaining(), ZMQ.DONTWAIT) > 0) {
buffer.flip();
value = buffer.asCharBuffer().toString();
System.out.println(buffer.asCharBuffer().toString());
}
}
// This works perfectly
/*
{
byte[] bytes = subscribeSocket.recv(ZMQ.DONTWAIT);
if (bytes == null || bytes.length == 0) {
continue;
}
value = new String(bytes);
}
*/
if (value != null && value.length() > 0) {
counter.incrementAndGet();
System.out.println(value);
break;
}
}
}
};
subThread.start();
// create a simple publisher - wait 3 sec to make sure its ready
ZMQ.Socket publishSocket = context.socket(ZMQ.PUB);
publishSocket.bind("tcp://*:31216");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
// publish a sample message
try {
publishSocket.send("TestTopic".getBytes(), ZMQ.SNDMORE);
publishSocket.send("This is test string".getBytes(), 0);
subThread.join(100);
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
assertTrue(counter.get() > 0);
System.out.println(counter.get());
}
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20130917/b1d427a4/attachment.htm>
More information about the zeromq-dev
mailing list