[zeromq-dev] Publisher side filtering... (draft)

Gerard Toonstra gtoonstra at gmail.com
Tue Nov 30 11:51:48 CET 2010


I've been working on this again yesterday and attached is a draft approach
that works, but has some serious issues to be addressed.
So this should be regarded a draft, intended to receive comments on the
approach taken. The following issues are to be addressed:

   1. I've now used the "fair-queue" object to maintain a list of pipe
   readers in pub.cpp. The pub socket should periodically check if anything new
   was written by subscribers, which indicate topic changes. At the moment, it
   checks fq_t whenever any message is sent by the pub socket. The "if
   (has_in())" should probably become a "while (fq.has_in())" call to process
   all subscription changes first. fq_t only verifies all currently 'active'
   sockets, so this should not be too expensive...?
   2. The current implementation relies on the reader_t object from fq_t to
   find the identity, then find the writer socket through the identity and
   update the subscription list accordingly. There should be better ways to do
   this.
   3. The pub socket now stores pipes three times, so there are three lists.
   One is to keep track of readers, so that subscription requests can be
   handled. This requires some correlation between the reader list and a writer
   list, so that the actual subscription list can be found and updated. I think
   a separate structure which associates the reader and writer in a single
   structure together with the subscription list and whatever else is needed
   for multipart functionality is a better way of doing this. This means that
   the class is essentially reimplemented. There's also a need for a separate
   list to keep track of all readers that have data to be read.
   4. For subscriptions to be sent upstream, the sub should connect first.
   Subscribes that took place prior to connecting are not propagated yet. This
   should probably change.
   5. Should this really replace the current pub/sub implementations, or
   should this implementation become a different socket type altogether?  I can
   imagine that not everybody may need this and there is always a small
   performance penalty to be paid due to list updates, the subscription
   updating, etc.( some people may prefer to do this client side to spread the
   load).


Multipart seems to be working this way, but looks a bit kludgy. Any tips on
better ways of doing that are welcome.

Other comments are welcome too.


Rgds,

Gerard




On Sat, Oct 30, 2010 at 5:41 PM, Martin Sustrik <sustrik at 250bpm.com> wrote:

>  Gerard,
>
>
>> 1. Forwarding of subscription from the application thread (socket) to the
>> I/O thread (session).
>>
>
> this could essentially be the same setsockopt call?
>
>
> Yes. There's no need for any API change. Single setsockopt  call should
> trigger the subscription forwarding all the way upstream to the ultimate
> publisher.
>
>
>
>
>> 2. Forwarding of subscription from the I/O thread to the peer application.
>> In other words, the subscription has to be passed on the wire upstream.
>>
>
> If this is over the same connection that data is received from through the
> pub socket, then all protocols must support
> reads and writes happening at the same time without any mechanism for
> synchronization (request/reply, etc). Could this become a problem?
> The benefit of doing this is that there's no more need to find a connection
> through an identity and no further need for an additional socket to be
> created.
>
> Yes. It should be the same connection. The messages are passed in one
> direction, the other direction is idle, just asking to be used to forward
> subscriptions.
>
>
>
>  3. Forwarding a subscription through a forwarder device.
>>
>
> It says in the manual that forwarding devices should only use PUB/SUB
> sockets. There is only one in socket (not a list of sockets) used in the API
> call.
> This implies that the forwarding device is getting all messages from a
> single publisher.
>
> The forwarding device has a number of connections to clients towards it is
> publishing messages. The PUB socket is the component that does the
> filtering.
> This means that the forwarder should have a means to query the PUB socket
> for subscriptions that are currently active, union them together and then
> send the union of that upstream to the single publisher it is connected to.
>
> If there is any one client on the forwarder that has not sent a
> subscription upstream, then the result of such a union is *, or no filter at
> all.
>
> I would say we need a XPUB and XSUB sockets that would allow user to
> send/recv subscriptions as messages rather than setting them/tearing them
> down via socket options.
>
> Standard PUB and SUB sockets would then be simple wrappers on top of XPUB
> and XSUB.
>
>
>
>  While accomplishing these three steps there are several problems to
>> solve:
>>
>> 1. The filtering algorithm has to be able to maintain several sets of
>> filters (one per downstream connection). Jon Dyte have done some work on
>> this. He may share the code.
>>
>
> If Jon is reading this, is there any way in which we can collaborate?
>
>
>>
>> 2. Unsubscriptions have to be propagated up the tree in the same way as
>> subscriptions.
>>
>
> ZMQ_UNSUBSCRIBE / setsockopt.
>
>
>>
>> 3. When connection between two nodes breaks, related unsubscriptions
>> should be generated automatically.
>>
>
> I found this discussion on the list related to this whole thing:
>
> http://www.mail-archive.com/zeromq-dev@lists.zeromq.org/msg03420.html
>
> Does it actually matter if an unsubscribe doesn't take place directly
> upstream?   I think it matters for the
> client process, but the client has been subscribed and there has been a
> flow of traffic before. So if you want
> immediate reduction of traffic flow based on an UNSUBSCRIBE, I'd look into
> generating the unsubscribes based on
> disconnects etc...
>
> Yes. That's what I meant.
>
>
> One potential problem here is that state at either side of the connection
> may start to diverge over longer periods of time
> due to whatever bug, inconsistency or network failure.
>
> Sorry, I don't get it.
>
>
>
> Another approach here is to periodically send the full subscription list
> over the wire and allow, for a short time period,
> the waste in network traffic.
>
> No point. Subscribe on connect, unsubscribe on disconnect should work well.
>
>
> The benefit of doing this is that correct state is better guaranteed and
> there is no need to hunt down potential unsubscribes,
> because the list of currently known active subscriptions is being
> transmitted at some point in time.
>
>
>
>> 4. When connection between two nodes in (re)established, subscriber should
>> forward all the subscriptions upstream.
>>
>> Martin
>>
>
> individual calls to ZMQ_SUBSCRIBE, or should this be optimized using e.g. a
> delimiter or an array? (ZMQ_SUBSCRIBE_LIST).
>
> It should be done _inside_ 0MQ, so no API would be involved.
>
> Martin
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20101130/eeaf9f61/attachment.htm>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: diff.patch
Type: text/x-patch
Size: 11170 bytes
Desc: not available
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20101130/eeaf9f61/attachment.bin>


More information about the zeromq-dev mailing list