[zeromq-dev] Improving message patterns

Paul Colomiets tailhook at yandex.ua
Wed Apr 13 05:26:51 CEST 2011


Hi, 

Zeromq has great set of messaging patterns. I can hardly imagine task which can't be decomposed into one or few of the request/reply, publish/subscribe or push/pull messaging model. But there are plenty of problems with the patterns. I'll try to summarize them.

Request/reply:
1. REP socket works only for samples and useless in any real world application, things it lacks:
  1) publishing presence
  2) heartbeating
  3) ability to discard request
2. REQ socket is also useless for most applications, because of:
  1) unable to retry request
3. XREQ lacks support of routing request to a specific destination
4. XREP is meant as a non-blocking replier socket, but incidentally used for everything other sockets can't do
5. Ineficient load-balancing for some applications

Push/Pull:
1. No routing to specific destination
2. Very weak reliability guarantees
3. Ineficient load-balancing for some applications

Pub/Sub:
1. Weak reliability guarantees
2. Lack of a way to know when message stream was broken
3. Late joiners problem
Can't mention that pub/sub has perfrect routing for specific destination. Recently added choice of subscriber side filtration or publisher side filtration makes that even more great (is it right, is it user's choice?)

If someone have something to add to summary, be my guest.

So let's start with simplest things. There is nothing could be done with reliability of publish subscribe, and probably that's ok. For broken stream notification it can be done by proper transport shutdown of connections, which was discussed a bit. I don't want to repeat myself, just say that shutdown is implemented in every connection oriented network protocol. Including few ones with implicit connections (e.g. sctp and rxrpc). It's interesting that sctp has no half-closed state as I proposed, but has notification of undelivered message. All in all I think this should be discussed more.

Reliability guarantees of push/pull also can't be implemented fully. If we would like to introduce acks for the event of message have been processed by application we just use request/reply pattern, since it is designed specifically for this kind of task.

Both Push/Pull and Req/Rep patterns need to be able to deliver messages to specific destination. Destination which is group of adresses should also be supported. I don't sure if it's ok if last requirement would need a device to work, because device can become a bottleneck and two devices can't be attached for the reason, which device was atached for in the first place. Also usage of unrelated socket type (XREP in current implementation) should be discouraged or even should be removed totally. For this very task identities were introduced, and time has proven that identities just don't work.

We will back to destination routing shortly. Now I'm going to introduce meaning of out of band (OOB) messages. This kind of message is meaningless for applications, it's used for heartbeats, acks and presence. I'm also going to introduce two types of devices: transparent (syn. dumb) ones and smart (syn. complex) ones. Former is a device which just agregates connections, and forwards OOB as is (well, appending address data). Latter is some kind of smart application, examples are: load balancer or device implementing message persistency (BTW both examples could be used with both req/rep and push/pull).

Smart devices are treated by zeromq as endpoints. It doesn't matter if smart device is just a load balancer on the way of request processing. It's like a reverse proxy for http (by the way http is a classic request reply, and is a good place to borrow ideas from), client usually thinks like it talks to origin, but there is no way to know where real origin is. Dumb devices are just like network routers. It doesn't matter how many of them on the path you know destination anyway. The path can also change while request on the fly if some route is reconfigured by BGP. (BTW, thanks Martin for the idea of using network infrastruture examples to illustrate the point).

To illustrate the need to have two types of devices more, I'll show some piece of infrastructure. Consider we have node A which is client. We have a router B, which is implemented with current XREP/XREQ sockets. B is connected to node C which is also a XREP/XREQ device, which 
has replier C1. B is also connected to node D which is XREP/XREQ device with nodes D1, D2, D3, D4 connected to it. Here is a picture http://graf.gafol.net/eAOiRGUTK. You probably already know that node C1 will have 4 times more requests that each of Dn, because B load balances between C and D and both C and D don't forward any info about connected nodes. Having C and D transparent nodes would solve the problem. Today it's partially solved by HWM, but it's not that great solution for the problem. 

Now i'm going to leave dumb devices alone. They must be just drop-in devices with no features could be added ever. It's ok even if they would need internals of zeromq to operate, they would just be included in the library. If they don't need internals they can be included in libzmq or libzapi or whatever.

Smart devices are the very different picture. They need specifically designed socket types for them. Comparing them to request/reply and push/pull they (1) must be non-blocking, (2) must give an access for OOB data for end user. Since they are for experts, and you can do lots of cool stuff with them, I'm going to propose single socket type, let's call it MUX for sake of this letter.

But first things first. We are going to advertise all the enpoints each other. No matter how many dumb devices between them. The exact semantics is to be decided. For current discussion let's imagine that every socket sends it's (random) identity in OOB message to connected nodes. Each dumb device adds route address and forward it further through all of the connections, just like in pub/sub. After reconnect address will be sent on next heartbeat. Before connecting user can set socket option ZMQ_ADVERT, which is opaque string which will be sent to other endpoints along with advertisement and heartbeat. It can be service name, shard name, password, service capacity or combination of them. Each connection has a random identity, each endpoint has single unique name. When presence reaches other endpoint, it remembers traceroute and endpoint id. It keeps each traceroute for each endpoint along with last heartbeat info. For MUX sockets OOB message is delivered to user with endpoint id and advertisement (but not traceroute, to simplify bookkeeping in application).

Then user can send a message to an endpoint. zeromq selects one of the routes recorded (exact algorithm can be improved in future without changing API) for the endpoint, replaces the enpoint id with traceroute and forwards request. Recipient records traceroute just like heartbeat, and delivers to application endpoint id and data. When application want reply it returns endpoint id and data, and it's up to zeromq to pick up some route back (even if some network infrastructure changed).

Req and push sockets pick up endpoint to send using round-robin (think of it as two redundant routers don't matter which one will be picked up).  Of course req could have request repeating implemented. It can blacklist timed out endpoint, until heartbeat comes for the case that crashed. Acks for pull should be end-to-end. Rep socket records the enpoint address (well, and other address data if it was forwarded for more far location) and sends it along with reply. Pull is like rep but doesn't send anything (ignores sender endpoint, or uses it for ack).

This way load-balancing is implemented easily (you just put endpoint ids into LRU or use any other load balancing algorithm), and that works with rep socket. You can use LRU per service if you need or you can use ADVERT to authorize connection. If you connect two routers for inter cluster load balancing you can advertise cluster capacity so your custom router can send more requests simultaneously to other router comparing to individual worker.

All this adds some network traffic for heartbeat spread. But I don't expect it would be so big to care. It's tunable anyway. Also number of dumb devices increases network overhead, but with current implementation is same for req/rep. For push/pull, if you don't need expert features (like better load balancing), you can use simple device like one which we use today with normal push/pull socket with probably no much additional overhead.

With described semantics current XREQ and XREP can be implemented in some compatibility library on top of MUX socket. MUX socket by itself is intended for experts. It's the same for push/pull and req/rep, because routing needs are quite similar. For better interoperability probably presence info should contain socket type so router writers can drop (or ignore?) the connection. Ah, well pair can be implemented in terms of MUX socket adding ability to put a dumb device between peers or even to multiplex few pairs on single connection while forwarding (each peer need to filter advertising to only specific one).

Not all issues described, but I should probably wrap up :) So quick summary:
1. Implement presence and heartbeating on all sockets
2. Improve REQ/REP to usable state: (presence, heartbeating), resetting, and maybe request repeating
3. Implement single MUX socket type which does not load balances and give a chance to do better work for application
4. Implement dumb devices which does not load-balance in any way 
5. Implement OOB messages to give device writers all info needed to load balance, allowing them to authorize endpoint and do other cool things

Any opinions?

--
Paul


More information about the zeromq-dev mailing list