[zeromq-dev] Java/Windows issues
Martin Sustrik
sustrik at fastmq.com
Tue Aug 18 10:13:41 CEST 2009
Hi Robin,
> So reading the RFC it seems PGM should support the multiple
> publishers/multiple listeners scenario. What was issue Robert had here?
> http://lists.zeromq.org/pipermail/zeromq-dev/2009-July/001060.html? Is
> it easily fixable?
The problem is that current implementation allows only for a single
producer within a multicast group/port. The goal is to have well-defined
ordering of the messages and consistent connect/disconnect/recovery
semantics. Think of a video feed. There's one source, multiple
destinations. Occasionally there's a full frame sent, however, the
frames are mostly just deltas from the previous frame. When
disconnection happens, consumer application must be notified about the
fact, so that after reconnection happens, it can drop deltas till it
gets first full frame. This won't work with multiple sources.
You'll get the same scenario with FIX/FAST messages (which are deltas in
fact).
What you need is a bit different scenario:
1. Multiple sources
2. Intepretation of messages is not dependent on their ordering
It's doable and the use case is a valid one so only question is whether
to implement it in 0MQ/1.0 or rather postpone it for 0MQ/2.0.
> A thought: Subscriptions (A?C-.*) should not be done on per-source
> basis, rather on per multicast group basis.
>
> Just to make sure we are on the same page I'm not suggesting that a
> listener client uses the api to subscribe to each source. The listener
> client subscribes to a virtual destination(topic). The per source stuff
> is calculated inside the API in order to handle recovery from multiple
> publishers and wildcard subscriptions.
Ok. Good. We are on the same page. I just thought you suggest using
per-source subscriptions.
> If there are two distinct types of messages that require different
> subscriptions, they should be mapped to two distinct multicast
> groups. The point is that that way you can manage each message
> flow separately on networking level. Think of setting different QoS
> in your switch/router for real-time market data vs. delayed stock
> quote feed.
>
>
> I think what you are saying here is to forget about the concept of a
> virtual destination (topic) and if you are connected to the multicast
> group you are going to get everything. Listeners will filter out
> messages they don't want based on content.
That's the way the multicast group works. Consumers are going to get
everything. Filtering on the topic is done on the consuming side.
This is in contrast with TCP based message distribution where filtering
(a.k.a. routing) can be done on the producer side.
Both are going to be supported. There's some technical documentation of
the underlying design here:
http://www.zeromq.org/whitepapers:routing
> I think thats perfectly fine for a lower level api designed for high
> speed and its a perfectly fine starting point. What you are building is
> a reliable messaging api that supports multicast and tcp across lots of
> programming languages/os'es. Underneath you are proxying to PGM and TCP
> which implement the reliability/ordering for you.
>
> Implementing more advanced features like guaranteed messaging, topic
> based subscriptions, wildcard subscriptions, knowing when a listener
> goes down in a multicast env etc you need more advanced protocols either
> on top of PGM/TCP, or instead of PGM/TCP.
Yes.
> * Requiring client application to handle source IDs means it has to
> handle multiple (per-source) states, async notifications about
> connections/disconnections etc. In short, it means the client
> application has to be rather complex. It would be good to shift the
> complexity into 0MQ layer.
>
>
> I wasn't suggesting the client app should use the source id. Clients
> send/receive to/from a virtual destination. Multiple sources can be
> sending to the destination, the listening client doesn't need to know
> about them. The purpose of the source id is so the API can implement
> recovery which should be handled underneath the hood. It is useful to
> let the app optionally know about sources e.g. when getting alerts about
> problems like: "hey your tcp connection to source X just disappeared" or
> "a multicast recovery w/ source X is happening". This is primarily for
> providing useful info for handling production problems or so the app can
> choose to stop processing until there is manual intervention. I don't
> expect apps to actually use the source id for anything aside from
> printing it out.
Ok. That makes sense. As for TCP-based dialogues, there is going to be
"application identity" that can be set by the user. When application
disconnects, 0MQ will know exactly which application was that and can
provide its identity (arbitrary string) to the user.
With PGM the identity is a GSI/TSI which is a binary BLOB, which can be
passed to the user on disconnection, however, it's less informative than
the "application identity" above.
> Maybe zsock is the abstraction for the connection mechanism, but
> a more sophisticated api could be used on top of it for handling
> of virtual destinations/wildcards/subscriptions/more flexible
> recovery?
>
>
> Go on. Suggestions?
>
> In Java. Note the Listeners/Handlers could just be types of messages
> received when you do sock.receive() instead of callbacks. You get the
> idea though.
I prefer message types. The callbacks would make people use mutexes to
synchronise the callbacks with other code which would severely damage
performance of the system.
>
> ZSock sock = new ZSock("pgm://eth0;239.151.1.1:100
> <http://239.151.1.1:100>");
> sock.addZSockListener( new ZSockListener() {
> void disconnected() {}
> void connected() {}
> });
>
> ZRecoverableSock recovSock = new ZRecoverableSock ("Instance1",
> sock);//first param is the persistent name which remains the same across
> restarts so listeners can identify the sender even it is moved to a
> different host
Yes. But would work only for unicast based transports as discussed above.
> //FileMessageStore implements the methods "Iterator<ZMessage>
> getMessages(int start, int end)"
> // and "void storeMessages(int startSeqNum, Iterator<ZMessage> msgs)"
> // and "void removeMessages(int start, int end)"
> recovSock.setMessageStore(new FileMessageStore("file.dat"));
>
> ZSubscriptionSocket subSock = new ZSubscriptionSocket (recovSock);
> subSock.addMsgHandler(new ZSubscriptionSocketMsgHandler() {
> void onMessage(ZMessage msg, int destID) {//could include Source as a param
> System.out.println("Received msg: " + msg + " on destination " +
> destID);
> }
> });
>
> subSock.subscribe("MD-[A-M].*");
> subSock.startListeners();
>
> int destID = subSock.createDestination("MD-MSFT");
> subSock.send(destID, new ZMesg("Hello"));
>
> //For those who are interested, not required for operation
> subSock.addSubscriptionSocketListener(new ZSubscriptionSocketListener() {
> void onNewSource(Source src) {}
> void onSourceDisconnect(Source src) { }
> void onNewListener(Listener l) {}
> void onListenerDisconnect(Listener l) {}
> });
These can be added, but have to be used just as informatory events
rather than for strict consistency checking. That's in line with what
you are saying.
>
> You could also have a ZReqReplySock that wraps any of the above sockets.
Will be done in 0MQ/2.0
> And has a ZMessage reply = sock.request(ZMessage msg);
> A ZThreadPool could pull messages of a socket(s) and distribute across
> multiple threads.
> A ZMessageSwitch copies messages from one socket to another
The idea is to disguise in-process messaging as simply an additional
transport mechanism in 0MQ/2.0:
s.connect ("zmq.tcp://192.168.0.1:5555");
s.connect ("zmq.pgm://224.0.0.1:5555");
s.connect ("inproc://abc");
> The disadvantage to nesting the different functionalities is there is
> some duplication. TCP and PGM have their own less persistent session
> ids/seq numbers while the ZRecoverySock must have its own. Makes your
> message payloads slightly bigger. Doesn't really matter for file based
> stores be perf is so slow but a large in memory store would suffer. Of
> course a ZRecoverableSock could run over a UDP/IP socket and do pretty
> well (maybe needs windowing)
Additional sequence numbers on top of TCP/PGM are unaviodable for
guaranteed delivery (the one that survives component restart). As you
say, UDP is an option, however, it would mean reimplementing all the
nice TCP functionality (thing of congestion control mechanisms) so I
believe few additional bytes per message are worth of it.
Martin
More information about the zeromq-dev
mailing list