[zeromq-dev] N00b question re HWM

David Cheney dcheney at peaxy.net
Tue Jun 25 02:35:47 CEST 2013


I have s simple "Sender" S.java and "Receiver" R.java using jzmq.  I start
the receiver, then the sender.  I stop the receiver after it has received
about 350 messages; the sender reports queuing 1000 messages - so far so
good.  I restart and quickly stop the receiver.  It reports receiving all
but the first few queued messages, then the sender blocks once it has
queued its second batch of 1000.

My attempts to set the HWM seem to have no effect.  The sender stops on
units of 1000 regardless of what I set the HWM to on either end.  Clearly
there is something I'm missing.  Thanks in advance!

Platform is kvm under Centos 6.4 / Linux 2.6.32 64 bit, with Java at 1.7.0.


*--- Sender ---*

package com.peaxy.zmqtest;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;

public class S
{
    static       String targetIp  = null;
    static       int    targetPort = 5558;

    public static void main (String[] args) throws InterruptedException
    {
        if (args.length > 0) {
            targetIp = args[0];
        } else {
            System.err.println("usage: S <ip.address>");
            System.exit(1);
        }

        Context context = ZMQ.context(1);
        ZMQ.Socket sender = context.socket(ZMQ.PUSH);
        System.out.println("sending events to " + targetIp + ":" +
targetPort);
        sender.connect("tcp://" + S.targetIp + ":" + S.targetPort);
        sender.setSndHWM(2000);
        // sender.setSendTimeOut(50);   // -1 is infinite

        int callerSequenceNum = 1;
        int unsent = 0;
        while (callerSequenceNum <= 1000000) {
            String m = "message " + callerSequenceNum;
            boolean sent = sender.send(m.getBytes(), 0);
            if (sent) {
                if (unsent > 0) {
                    System.err.println("could not send message " +
callerSequenceNum);
                    unsent = 0;
                }
                System.out.print(callerSequenceNum + " ");
                if ((callerSequenceNum % 25) == 0) System.out.println("\n");
                Thread.sleep(10);
            } else {
                unsent++;
                System.err.println("failed to send " + callerSequenceNum +
", missed " + unsent + ", sleeping 10 secs");
                Thread.sleep(10000);
            }
            callerSequenceNum++;
        }
        sender.close();
        context.term();
    }
}

*--- Receiver ---*

package com.peaxy.zmqtest;

import org.zeromq.ZMQ;

public class R {
    private static final int CPU_CORES = 8;

    public static void main (String[] args) throws InterruptedException
    {
        ZMQ.Context context = ZMQ.context(CPU_CORES-1);
        ZMQ.Socket receiver = context.socket(ZMQ.PULL);
        receiver.bind("tcp://*:5558");
        receiver.setRcvHWM(2000);
        // receiver.setReceiveTimeOut(500);
        long lastMessage = 0;

        while (lastMessage <= 1000000) {
            byte[] bytes = receiver.recv(0); // ZMQ.DONTWAIT);
            if (bytes == null || bytes.length == 0) {
                System.out.println("timed out after " +
receiver.getReceiveTimeOut());
                Thread.sleep(10);
                continue;
            }
            // received a message..
            String message = new String(bytes);
            int msgNumber =
Integer.parseInt(message.substring(message.indexOf(' ')+1));
            if (msgNumber != lastMessage + 1) {
                System.err.println("***** missed " +
(msgNumber-lastMessage) + " messages");
            } else {
                System.out.println(message);
            }
            lastMessage = msgNumber;
        }
        receiver.close();
        context.term();
    }
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20130624/972fd0ce/attachment.htm>


More information about the zeromq-dev mailing list