[zeromq-dev] Java/Windows issues
Robin Weisberg
robin at scout-trading.com
Tue Aug 18 06:12:48 CEST 2009
On Mon, Aug 17, 2009 at 4:12 AM, Martin Sustrik <sustrik at fastmq.com> wrote:
> Hi Robin,
>
> I've had quite a bit of experience w/ other middleware products,
>> I'd be happy to have a discussion about it if you are
>> interested. Pieter has contacted me and seems like I'll be
>> talking to him Tue @ 10, maybe we can piggyback on that. Its a
>> difficult topic to cover in email because its so large, but here
>> are some thoughts based on not to much thinking but some
>> experience as a user.
>>
>> * Sequencing could be handled at the level of the Exchange
>> i.e. each
>> exchange has different seq numbers/internal msg queues. Queues
>> track these seq numbers and can request recovery from the
>> underlying exchange if a message is missed.
>>
>>
>> This applies to unicast connections, right? (PGM has recovery
>> mechanism of its own.) If so, then yes, this is on a long-term roadmap.
>>
>>
>> OK, I think I wasn't clear bec I'm mapping my thoughts into 0mq speak
>> poorly. We need to separate out the concept of a message source (exchange)
>> from the "virtual destination" name the source sends to. in 0MQ speak
>> (Queues/Exchanges) those get mixed together.
>>
>> A message source needs to have its own seq numbers and handle recovery
>> requests for its seq numbers. It has a MessageStore to handle those requests
>> and can do so w/ various levels of guarantee/performance based on the store
>> used. A source can use various underlying connection methodologies (maybe
>> even have multiple at once e.g. multiple tcp connections) which may or may
>> not support some level of recovery themselves (pgm/tcp both have their own
>> seq numbers/recovery, although plain udp doesn't).
>>
>> A listener needs to be able to map a virtual destination to all the
>> sources and track the seq numbers individually. The way to do this for new
>> listeners/sources to announce themselves. Before a source publishes on a
>> virtual destination it sends a message to all the listeners saying ABC-DEF
>> maps to 1234 (where 1234 is a GUID generated by the publisher so multiple
>> sources don't step on each other, or a value looked up from a central
>> broker) and the current seq number is X. A listener could say hey I'm
>> interested in all virtual destinations matching regular expression "A?C-.*"
>> I'm going to handle any message w/ ID 1234, also I need to request messages
>> 5-X to catchup. When a new listener announces themselves sources need to
>> send the same message so listeners can react appropriately.
>>
>
> PGM already specifies part of this functionality. See GSI/TSI in RFC3208
> for handling of source IDs.
So reading the RFC it seems PGM should support the multiple
publishers/multiple listeners scenario. What was issue Robert had here?
http://lists.zeromq.org/pipermail/zeromq-dev/2009-July/001060.html? Is it
easily fixable?
>
>
> A thought: Subscriptions (A?C-.*) should not be done on per-source basis,
> rather on per multicast group basis.
Just to make sure we are on the same page I'm not suggesting that a listener
client uses the api to subscribe to each source. The listener client
subscribes to a virtual destination(topic). The per source stuff is
calculated inside the API in order to handle recovery from multiple
publishers and wildcard subscriptions.
> If there are two distinct types of messages that require different
> subscriptions, they should be mapped to two distinct multicast groups. The
> point is that that way you can manage each message flow separately on
> networking level. Think of setting different QoS in your switch/router for
> real-time market data vs. delayed stock quote feed.
>
I think what you are saying here is to forget about the concept of a virtual
destination (topic) and if you are connected to the multicast group you are
going to get everything. Listeners will filter out messages they don't want
based on content.
I think thats perfectly fine for a lower level api designed for high speed
and its a perfectly fine starting point. What you are building is a reliable
messaging api that supports multicast and tcp across lots of programming
languages/os'es. Underneath you are proxying to PGM and TCP which implement
the reliability/ordering for you.
Implementing more advanced features like guaranteed messaging, topic based
subscriptions, wildcard subscriptions, knowing when a listener goes down in
a multicast env etc you need more advanced protocols either on top of
PGM/TCP, or instead of PGM/TCP.
> As for different transport mechanisms (PGM vs. TCP) the plan with 0MQ/2.0
> is to allow combining the transports in arbitrary way. Say PGM for LAN
> delivery and TCP for WAN. See the diagrams at
>
> http://www.zeromq.org/docs:zsock
>
> * Multiple exchanges could use the same underlying connection
>> (e.g.
>> tcp connection or multicast address). The
>> exchangeID/queueID are
>> looked at to ensure messages are not received by the
>> application
>> when appropriate.
>>
>>
>> The problem here is that with multiple senders to a single exchange
>> there would be multiple intertwined message streams received by a
>> single consumer. That would make semantics of things like error
>> notifications or message ordering quite vague. Alternative would be
>> to tag each message by the ID of the producer that have sent it.
>> That would mean that each receiver would have to manage a context
>> for each sender. Not really what you would call easy-to-use
>> messaging. There must be a more elegant solution for the problem.
>> Thoughts?
>>
>> I hope my above comment clarified some of this. The API should handle most
>> of this behind the scenes. The user binds a local queue to a connection(s)
>> and subscribes to a virtual destination possibly w/ wildcards and magic
>> happens. When the underlying connection mechanism drops (e.g. tcp
>> disconnect, missing heartbeats when using udp), the api puts a message on
>> the queue that includes the virtual destination name (in the case of the
>> wildcards it could do multiple notifications for each matched destination or
>> just one for all of them).
>>
>
> Yes, sure. The question is: Is there a way to completely hide the source ID
> from the consumer application? There are two concerns IMO:
>
> * Requiring client application to handle source IDs means it has to handle
> multiple (per-source) states, async notifications about
> connections/disconnections etc. In short, it means the client application
> has to be rather complex. It would be good to shift the complexity into 0MQ
> layer.
I wasn't suggesting the client app should use the source id. Clients
send/receive to/from a virtual destination. Multiple sources can be sending
to the destination, the listening client doesn't need to know about them.
The purpose of the source id is so the API can implement recovery which
should be handled underneath the hood. It is useful to let the app
optionally know about sources e.g. when getting alerts about problems like:
"hey your tcp connection to source X just disappeared" or "a multicast
recovery w/ source X is happening". This is primarily for providing useful
info for handling production problems or so the app can choose to stop
processing until there is manual intervention. I don't expect apps to
actually use the source id for anything aside from printing it out.
>
> * Correlating individual messages with source-specific context in client
> application may become the performance bottleneck of the system (with
> current makret data rates nanoseconds matters). Moving the functionality to
> 0MQ would allow us to solve the problem once instead of forcing each user to
> implement/tune it anew.
Pretty sure we got out of synch further up :), this isn't what I was
suggesting. Clients don't need to know about sources.
PGM already needs to correlate a message's TSI/seq num to what it expects
the seq number to be for that TSI in order to determine if it needs to do
recovery. What I'm suggesting is similar although the seqnums/TSI could be
configured persistent.
One thing I suggested I'm re-thinking was having "TSI" be the source and
virtual destination. It allows flexibility to manage recovery differently
for each virtual destination (e.g. some persistent some not). It does this
at the cost of not guaranteeing order across different destinations. I'm not
tied to that feature, as its pretty simple to use the api to create 2
different sources w/ different recovery setups using the same multicast
address. It is how some other middleware implementations work. Now that I
understand a bit better how PGM works and I see how this feature would
require writing a lot of duplicate functionality on top of PGM (inefficient)
or replacing PGM (a chunk of work), I don't think its worth the trouble.
>
> Maybe zsock is the abstraction for the connection mechanism, but a more
>> sophisticated api could be used on top of it for handling of virtual
>> destinations/wildcards/subscriptions/more flexible recovery?
>>
>
> Go on. Suggestions?
In Java. Note the Listeners/Handlers could just be types of messages
received when you do sock.receive() instead of callbacks. You get the idea
though.
ZSock sock = new ZSock("pgm://eth0;239.151.1.1:100");
sock.addZSockListener( new ZSockListener() {
void disconnected() {}
void connected() {}
});
ZRecoverableSock recovSock = new ZRecoverableSock ("Instance1",
sock);//first param is the persistent name which remains the same across
restarts so listeners can identify the sender even it is moved to a
different host
//FileMessageStore implements the methods "Iterator<ZMessage>
getMessages(int start, int end)"
// and "void storeMessages(int startSeqNum, Iterator<ZMessage> msgs)"
// and "void removeMessages(int start, int end)"
recovSock.setMessageStore(new FileMessageStore("file.dat"));
ZSubscriptionSocket subSock = new ZSubscriptionSocket (recovSock);
subSock.addMsgHandler(new ZSubscriptionSocketMsgHandler() {
void onMessage(ZMessage msg, int destID) {//could include Source as a param
System.out.println("Received msg: " + msg + " on destination " +
destID);
}
});
subSock.subscribe("MD-[A-M].*");
subSock.startListeners();
int destID = subSock.createDestination("MD-MSFT");
subSock.send(destID, new ZMesg("Hello"));
//For those who are interested, not required for operation
subSock.addSubscriptionSocketListener(new ZSubscriptionSocketListener() {
void onNewSource(Source src) {}
void onSourceDisconnect(Source src) { }
void onNewListener(Listener l) {}
void onListenerDisconnect(Listener l) {}
});
You could also have a ZReqReplySock that wraps any of the above sockets. And
has a ZMessage reply = sock.request(ZMessage msg);
A ZThreadPool could pull messages of a socket(s) and distribute across
multiple threads.
A ZMessageSwitch copies messages from one socket to another
The disadvantage to nesting the different functionalities is there is some
duplication. TCP and PGM have their own less persistent session ids/seq
numbers while the ZRecoverySock must have its own. Makes your message
payloads slightly bigger. Doesn't really matter for file based stores be
perf is so slow but a large in memory store would suffer. Of course a
ZRecoverableSock could run over a UDP/IP socket and do pretty well (maybe
needs windowing)
>
>
> * Some middleware products allow you to use wildcards as a
>>
>> convenient way to bind multiple queues to multiple exchanges
>> in
>> one shot. Underneath each exchange has its own seq
>> numbers/internal msg queues and ordering is only guaranteed
>> at the
>> exchange level.
>>
>>
>> What about newly created exchanges? Would you expect new exchange to
>> send messages to the queue that have bound to "*" in the past? That
>> would get pretty tricky without a centralised message broker.
>> Think I covered this above...but yes its adds a layer of complexity :)
>>
>>
>>
>> * Exchanges/Queues should be able to be created w/o using a
>> server
>>
>> (zsock?). Basically the server allows you to look up the
>> appropriate connection for a given queue, if you don't want to
>> store it in a config file, but use of the zmq_server should be
>> optional.
>>
>>
>> Yes, zsock. However, note that most of your points above implicitly
>> assume there's an central authority managing bindings between
>> individual endpoints.
>>
>> Hopefully I addressed this above.
>>
>>
>>
>> * When a connection disconnects all queues/exchanges get a
>> message
>>
>> much like the gap message although w/ some more details. Same
>> story If there are seq gaps (e.g. messages for recovery
>> started/recovery failed/recovery success)
>>
>>
>> See above.
>>
>> * Some more advanced stuff:
>> o Exchanges could be configured to have a MessageStore
>> the
>>
>> default could just keep X messages and/or X seconds of
>> messages in memory.
>>
>>
>> Limit for messages in memory (and on disk) is already implemented.
>> TTL is a possible feature to add in the future.
>>
>> Making MessageStore and API people could implement would be nice. Might
>> get you some contributions :)
>> I've been wondering does setting the watermarks to 0 essentially give you
>> a disk based queue? Would it recover messages on restart?
>>
>>
>>
>> o More advances stores could write messages to a file
>> (where
>>
>> its configurable whether the OS synch calls are used
>> or not)
>> or db etc.
>>
>>
>> Done.
>>
>> o MessageStores should be able to be wrapped in and
>>
>> AsynchMessageStore that writes the message to Store in
>> background thread.
>>
>>
>> Ack.
>>
>> I'm not sure who would want that, but its easy to write :)
>>
>>
>>
>> o Also you can configure wheter to persist all messages or
>> only messages that have not been explicitly
>> acknowledged by
>> all queues. This requires exchanges to become aware
>> of which
>> queues are bound to them and for the queues to
>> acknowledge
>> receipt (these can be batched i.e. every (X messages
>> and/or
>> X seconds the queue says I'm at seq number Y). This is
>> different from the model of the exchange always keep x
>> seconds worth of messages for recovery regardless of
>> who is
>> listening.
>>
>>
>> Yup. However, it doesn't have to be done on per-message basis. If
>> message X wasn't acknowledged, then message X+1 haven't been
>> acknowledged etc. Thus, what you hold on disk is actually a message
>> window. A positive side effect is that disk operations are efficient
>> - there's not much head movement back and forth.
>>
>> Agreed
>>
>>
>>
>> o At some point you will want a message broker/router
>> that can
>>
>> configured for routing over wans using
>> compression/encryption and mapping multicast to tcp and
>> back. Should be as simple as reading a file that says
>> bind
>> queue X at exchange Y and republish it on exchange Z
>> (which
>> is encrypted/compressed). Another broker is configured
>> to
>> bind queue A to exchange Z and republish on exchange B
>> (could be noncompressed multicast at a diff site).
>>
>>
>> TCP/PGM mapping will be easily doable with zsock (it was actually
>> one of the use cases behind the design).
>>
>> Compression is not a big deal. If there's popular demand for it, it
>> can be implemented pretty quickly.
>>
>>
>>
>> o Embed a web server in the library which you can turn
>> on so
>> you can check out stats and debug problems
>>
>>
>> That would be nice. Anyway, it's out of scope for the time being.
>> One day may may get that far :)
>>
>>
>> I know you guys are focused on the high speed stuff which I
>> think is great. There really isn't another open source solution
>> that supports multicast/multiple platforms/multiple language
>> bindings.
>>
>>
>> Right. That's the idea.
>>
>>
>> I also think its important to optimize for the non persistent
>> cases. Of course as a user I prefer to have one api/product for
>> all the stuff I do so even if the persistent stuff isn't the
>> best of breed, I'm more likely to use the product because its a
>> one stop shop.
>>
>>
>> It's matter of resources. Right now we are focusing on transient
>> data distribution as there are no competing FOSS products in the
>> area. Once that's done (or if somebody opts to sponsor the
>> development) we can move to persistency/fire-and-forget features.
>>
>> Understood
>>
>>
>> HTH
>> Martin
>>
>>
>>
> Martin
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20090818/b3c78b63/attachment.htm>
More information about the zeromq-dev
mailing list