[zeromq-dev] Java/Windows issues

Robin Weisberg robin at scout-trading.com
Tue Aug 18 15:45:07 CEST 2009

On Tue, Aug 18, 2009 at 4:13 AM, Martin Sustrik <sustrik at fastmq.com> wrote:

> Hi Robin,
>  So reading the RFC it seems PGM should support the multiple
>> publishers/multiple listeners scenario. What was issue Robert had here?
>> http://lists.zeromq.org/pipermail/zeromq-dev/2009-July/001060.html? Is it
>> easily fixable?
> The problem is that current implementation allows only for a single
> producer within a multicast group/port. The goal is to have well-defined
> ordering of the messages and consistent connect/disconnect/recovery
> semantics. Think of a video feed. There's one source, multiple destinations.
> Occasionally there's a full frame sent, however, the frames are mostly just
> deltas from the previous frame. When disconnection happens, consumer
> application must be notified about the fact, so that after reconnection
> happens, it can drop deltas till it gets first full frame. This won't work
> with multiple sources.
> You'll get the same scenario with FIX/FAST messages (which are deltas in
> fact).
> What you need is a bit different scenario:
> 1. Multiple sources
> 2. Intepretation of messages is not dependent on their ordering
> It's doable and the use case is a valid one so only question is whether to
> implement it in 0MQ/1.0 or rather postpone it for 0MQ/2.0.

Sounds like you need to replace PGM (e.g. w/ UDP), right? Thats a pretty big
job, I'd just implement in 0MQ2.

Not a bad thing really as it allows you to eventually implement a lot of
fancy stuff: your own session ids/seq numbers for recovery which can then be
used further up the stack e.g. for persistent solutions, checks for
duplicate publishers of the same session, awareness of other
publishers/listeners and knowing when they arrive/disappear and what seq
numbers they are up to, throttling listeners who request recovery too much,

>     A thought: Subscriptions (A?C-.*) should not be done on per-source
>>    basis, rather on per multicast group basis.
>> Just to make sure we are on the same page I'm not suggesting that a
>> listener client uses the api to subscribe to each source. The listener
>> client subscribes to a virtual destination(topic). The per source stuff is
>> calculated inside the API in order to handle recovery from multiple
>> publishers and wildcard subscriptions.
> Ok. Good. We are on the same page. I just thought you suggest using
> per-source subscriptions.
>     If there are two distinct types of messages that require different
>>    subscriptions, they should be mapped to two distinct multicast
>>    groups. The point is that that way you can manage   each message
>>    flow separately on networking level. Think of setting different QoS
>>    in your switch/router for real-time market data vs. delayed stock
>>    quote feed.
>> I think what you are saying here is to forget about the concept of a
>> virtual destination (topic) and if you are connected to the multicast group
>> you are going to get everything. Listeners will filter out messages they
>> don't want based on content.
> That's the way the multicast group works. Consumers are going to get
> everything. Filtering on the topic is done on the consuming side.

Understood, but some middleware implementations have the concept of topics
built in and filter before queueing the message to the app. Also they may
have out of process software message switches which receive multicast and
filter by topic before delivering to your process over TCP. Sometimes useful
if you have UI's that you don't want directly connected to a high volume
multicast group. I think this can just be implemented as a wrapper around a
lower level layer like below. This way the overhead isn't there when you
don't need it.

> This is in contrast with TCP based message distribution where filtering
> (a.k.a. routing) can be done on the producer side.
> Both are going to be supported. There's some technical documentation of the
> underlying design here:
> http://www.zeromq.org/whitepapers:routing

>  I think thats perfectly fine for a lower level api designed for high speed
>> and its a perfectly fine starting point. What you are building is a reliable
>> messaging api that supports multicast and tcp across lots of programming
>> languages/os'es. Underneath you are proxying to PGM and TCP which implement
>> the reliability/ordering for you.
>> Implementing more advanced features like guaranteed messaging, topic based
>> subscriptions, wildcard subscriptions, knowing when a listener goes down in
>> a multicast env etc you need more advanced protocols either on top of
>> PGM/TCP, or instead of PGM/TCP.
> Yes.
>     * Requiring client application to handle source IDs means it has to
>>    handle multiple (per-source) states, async notifications about
>>    connections/disconnections etc. In short, it means the client
>>    application has to be rather complex. It would be good to shift the
>>    complexity into 0MQ layer.
>> I wasn't suggesting the client app should use the source id. Clients
>> send/receive to/from a virtual destination. Multiple sources can be sending
>> to the destination, the listening client doesn't need to know about them.
>> The purpose of the source id is so the API can implement recovery which
>> should be handled underneath the hood. It is useful to let the app
>> optionally know about sources e.g. when getting alerts about problems like:
>> "hey your tcp connection to source X just disappeared" or "a multicast
>> recovery w/ source X is happening". This is primarily for providing useful
>> info for handling production problems or so the app can choose to stop
>> processing until there is manual intervention. I don't expect apps to
>> actually use the source id for anything aside from printing it out.
> Ok. That makes sense. As for TCP-based dialogues, there is going to be
> "application identity" that can be set by the user. When application
> disconnects, 0MQ will know exactly which application was that and can
> provide its identity (arbitrary string) to the user.
> With PGM the identity is a GSI/TSI which is a binary BLOB, which can be
> passed to the user on disconnection, however, it's less informative than the
> "application identity" above.

Yes the application identity is something that would be nice to have control
over as mentioned above so you can use it for recovery and not have multiple
layers of seq numbers.

>         Maybe zsock is the abstraction for the connection mechanism, but
>>        a more sophisticated api could be used on top of it for handling
>>        of virtual destinations/wildcards/subscriptions/more flexible
>>        recovery?
>>    Go on. Suggestions?
>> In Java. Note the Listeners/Handlers could just be types of messages
>> received when you do sock.receive() instead of callbacks. You get the idea
>> though.
> I prefer message types. The callbacks would make people use mutexes to
> synchronise the callbacks with other code which would severely damage
> performance of the system.

Works for me

>> ZSock sock = new ZSock("pgm://eth0; <
>> sock.addZSockListener( new ZSockListener() {
>>  void disconnected() {}
>>  void connected() {}
>> });
>> ZRecoverableSock recovSock = new ZRecoverableSock ("Instance1",
>> sock);//first param is the persistent name which remains the same across
>> restarts so listeners can identify the sender even it is moved to a
>> different host
> Yes. But would work only for unicast based transports as discussed above.

Why? The ZRecoverableSock will write every message from a file and can
replay the messages when any listener requests them. Any listener can
request a resend. You may want to add features so listeners request replays
over TCP or at least receive the replays over unicast UDP, so you don't bog
down other listeners. You could also add options to make it aware if
subscribers and tracks which messages have been received by which subscriber
and then removes the messages that no longer need to be stored.

>  //FileMessageStore implements the methods "Iterator<ZMessage>
>> getMessages(int start, int end)"
>> // and "void storeMessages(int startSeqNum, Iterator<ZMessage> msgs)"
>> // and "void removeMessages(int start, int end)"
>> recovSock.setMessageStore(new FileMessageStore("file.dat"));
>> ZSubscriptionSocket subSock = new ZSubscriptionSocket (recovSock);
>> subSock.addMsgHandler(new ZSubscriptionSocketMsgHandler() {
>>  void onMessage(ZMessage msg, int destID) {//could include Source as a
>> param
>>   System.out.println("Received msg: " + msg + " on destination "  +
>> destID);
>>  }
>> });
>> subSock.subscribe("MD-[A-M].*");
>> subSock.startListeners();
>> int destID = subSock.createDestination("MD-MSFT");
>> subSock.send(destID, new ZMesg("Hello"));
>> //For those who are interested, not required for operation
>> subSock.addSubscriptionSocketListener(new ZSubscriptionSocketListener() {
>>  void onNewSource(Source src) {}
>>  void onSourceDisconnect(Source src) { }
>>  void onNewListener(Listener l) {}
>>  void onListenerDisconnect(Listener l) {}
>> });
> These can be added, but have to be used just as informatory events rather
> than for strict consistency checking. That's in line with what you are
> saying.
>> You could also have a ZReqReplySock that wraps any of the above sockets.
> Will be done in 0MQ/2.0

OK, I'm curious to see how thats implemented in multicast. Does everyone see
the responses and have to filter them?

>  And has a ZMessage reply = sock.request(ZMessage msg);
>> A ZThreadPool could pull messages of a socket(s) and distribute across
>> multiple threads.
>> A ZMessageSwitch  copies messages from one socket to another
> The idea is to disguise in-process messaging as simply an additional
> transport mechanism in 0MQ/2.0:
> s.connect ("zmq.tcp://");
> s.connect ("zmq.pgm://");
> s.connect ("inproc://abc");
>  The disadvantage to nesting the different functionalities is there is some
>> duplication. TCP and PGM have their own less persistent session ids/seq
>> numbers while the ZRecoverySock must have its own. Makes your message
>> payloads slightly bigger. Doesn't really matter for file based stores be
>> perf is so slow but a large in memory store would suffer. Of course a
>> ZRecoverableSock could run over a UDP/IP socket and do pretty well (maybe
>> needs windowing)
> Additional sequence numbers on top of TCP/PGM are unaviodable for
> guaranteed delivery (the one that survives component restart). As you say,
> UDP is an option, however, it would mean reimplementing all the nice TCP
> functionality (thing of congestion control mechanisms) so I believe few
> additional bytes per message are worth of it.

Congestion control is nice for point to point. Doesn't really apply much in
multicast of course, just because one guy goes down doesn't mean anybody
else is throttled.

> Martin
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.zeromq.org/pipermail/zeromq-dev/attachments/20090818/df715fee/attachment.htm 

More information about the zeromq-dev mailing list