[zeromq-dev] Inproc endpoint and zmq_term.

Victor Vlasenko vlasenko at sysgears.com
Mon Jan 24 09:27:32 CET 2011


Hi All,

Blocking behavior of zmq_term is very good for correctly flushing all 
the messages on termination for over network connections. However it 
seems not so good for inproc connections.

For inproc connections with blocking socket.recv() zmq_term might 
generate TERM on our blocking recv earlier than messages will be flushed 
to the socket. The issue is with order of operations, currently there is 
no specific order of when messages will be flushed and when blocking 
operation exit with TERM. If however zeromq first tried to flush all the 
messages to all sockets and only after that generated TERM on all 
blocking socket.recv() operations this way messages weren't loosed for 
inprocess connections.

Given that inproc sockets have shared buffer, the correct behavior would 
be to not generate TERM on recv() operations until this shared buffer is 
empty.

The code that illustrates issue is provided below, problematic place is 
marked with !!!...!!!

Victor

import org.zeromq.ZMQ;

public class Balancer {
     private volatile int totalHandled = 0;

     public class WorkerThread extends Thread {
         private ZMQ.Context ctx;
         private int handled = 0;
         private int threadNo = 0;

         public WorkerThread(int threadNo, ZMQ.Context ctx) {
             super("Worker-" + threadNo);
             this.threadNo = threadNo;
             this.ctx = ctx;
         }

         public void run() {
             try {
                 // Create PULL socket
                 ZMQ.Socket socket = ctx.socket(ZMQ.PULL);
                 // Set high water mark to 2,
                 // so that when this peer
                 // had 2 messages in its buffer,
                 // ZeroMQ skipped to next workers
                 socket.setHWM(2);
                 // Connect to in-process endpoint
                 socket.connect("inproc://workers");

                 while (true) {
                     byte[] msg;
                     try {
                         // Get work piece
                         msg = socket.recv(0);
                     } catch (Exception e) {
                         // ZeroMQ wrapper throws exception
                         // when context is terminated
                         // !!!We will loose messages, because we might 
get TERM earlier than we get all the messages!!!
                         socket.close();
                         break;
                     }
                     handled++;
                     totalHandled++;
                     System.out.println(getName()
                               + " handled work piece " + msg[0]);
                     int sleepTime = (threadNo % 2 == 0) ? 100 : 200;
                     // Handle work, by sleeping for some time
                     Thread.sleep(sleepTime);
                 }
                 System.out.println(getName()
                               + " handled count " + handled);
             } catch (Throwable t) {
                 t.printStackTrace();
             }
         }
     }

     public void run() {
         try {
             // Create ZeroMQ context
             ZMQ.Context ctx = ZMQ.context(1);
             // Create PUSH socket
             ZMQ.Socket socket = ctx.socket(ZMQ.PUSH);
             // Bind socket to in-process endpoint
             socket.bind("inproc://workers");

             // Create worker threads pool
             Thread threads[] = new Thread[10];
             for (int i = 0; i < threads.length; i++) {
                 threads[i] = new WorkerThread(i, ctx);
                 threads[i].start();
             }

             // "Send" the work to workers
             for (int i = 0; i < 100; i++) {
                 System.out.println("Sending work piece " + i);
                 byte[] msg = new byte[1];
                 msg[0] = (byte)i;
                 socket.send(msg, 0);
             }
             socket.close();

             // Terminate ZeroMQ context
             ctx.term();

             System.out.println("Total handled " + totalHandled);
         } catch (Throwable t) {
             t.printStackTrace();
         }
     }

     public static void main(String[] args) {
         Balancer balancer = new Balancer();
         balancer.run();
     }






More information about the zeromq-dev mailing list