[zeromq-dev] Inproc endpoint and zmq_term.
Victor Vlasenko
vlasenko at sysgears.com
Mon Jan 24 09:27:32 CET 2011
Hi All,
Blocking behavior of zmq_term is very good for correctly flushing all
the messages on termination for over network connections. However it
seems not so good for inproc connections.
For inproc connections with blocking socket.recv() zmq_term might
generate TERM on our blocking recv earlier than messages will be flushed
to the socket. The issue is with order of operations, currently there is
no specific order of when messages will be flushed and when blocking
operation exit with TERM. If however zeromq first tried to flush all the
messages to all sockets and only after that generated TERM on all
blocking socket.recv() operations this way messages weren't loosed for
inprocess connections.
Given that inproc sockets have shared buffer, the correct behavior would
be to not generate TERM on recv() operations until this shared buffer is
empty.
The code that illustrates issue is provided below, problematic place is
marked with !!!...!!!
Victor
import org.zeromq.ZMQ;
public class Balancer {
private volatile int totalHandled = 0;
public class WorkerThread extends Thread {
private ZMQ.Context ctx;
private int handled = 0;
private int threadNo = 0;
public WorkerThread(int threadNo, ZMQ.Context ctx) {
super("Worker-" + threadNo);
this.threadNo = threadNo;
this.ctx = ctx;
}
public void run() {
try {
// Create PULL socket
ZMQ.Socket socket = ctx.socket(ZMQ.PULL);
// Set high water mark to 2,
// so that when this peer
// had 2 messages in its buffer,
// ZeroMQ skipped to next workers
socket.setHWM(2);
// Connect to in-process endpoint
socket.connect("inproc://workers");
while (true) {
byte[] msg;
try {
// Get work piece
msg = socket.recv(0);
} catch (Exception e) {
// ZeroMQ wrapper throws exception
// when context is terminated
// !!!We will loose messages, because we might
get TERM earlier than we get all the messages!!!
socket.close();
break;
}
handled++;
totalHandled++;
System.out.println(getName()
+ " handled work piece " + msg[0]);
int sleepTime = (threadNo % 2 == 0) ? 100 : 200;
// Handle work, by sleeping for some time
Thread.sleep(sleepTime);
}
System.out.println(getName()
+ " handled count " + handled);
} catch (Throwable t) {
t.printStackTrace();
}
}
}
public void run() {
try {
// Create ZeroMQ context
ZMQ.Context ctx = ZMQ.context(1);
// Create PUSH socket
ZMQ.Socket socket = ctx.socket(ZMQ.PUSH);
// Bind socket to in-process endpoint
socket.bind("inproc://workers");
// Create worker threads pool
Thread threads[] = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new WorkerThread(i, ctx);
threads[i].start();
}
// "Send" the work to workers
for (int i = 0; i < 100; i++) {
System.out.println("Sending work piece " + i);
byte[] msg = new byte[1];
msg[0] = (byte)i;
socket.send(msg, 0);
}
socket.close();
// Terminate ZeroMQ context
ctx.term();
System.out.println("Total handled " + totalHandled);
} catch (Throwable t) {
t.printStackTrace();
}
}
public static void main(String[] args) {
Balancer balancer = new Balancer();
balancer.run();
}
More information about the zeromq-dev
mailing list