[zeromq-dev] Devices, and pipeline again

MinRK benjaminrk at gmail.com
Thu Jul 29 20:53:07 CEST 2010


I work with Brian on the pyzmq devices.

On Thu, Jul 29, 2010 at 10:44, Brian Granger <ellisonbg at gmail.com> wrote:

> On Thu, Jul 29, 2010 at 8:46 AM, Pieter Hintjens <ph at imatix.com> wrote:
>> Hi All,
>> I'm writing a new generic zmq_device that will replace zmq_queue,
>> zmq_forwarder and zmq_streamer.
>> The code is here: http://dpaste.de/4DcP/
> It looks like you are only working on the command line part.  Is this
> right, so the underlying zmq_device function remains the same?
>> The zmq_device works a lot like the zmq::device method.  There is
>> documentation at the start of the source code, which I'll expand to
>> become a man page when this is stable.
>> Anyone who is already using devices: your feedback would be useful.
>> Especially, do you use multiple in/out sockets (I've renamed that to
>> 'frontend' and 'backend' here)?  And do you connect outwards from your
>> device, using it as a proxy, or do you connect into it, using it as a
>> mini-broker?
> We always create devices by calling the zmq_device function in our code.
>  We have written a set of functions that will run a device in a thread.  The
> reason we do this is that it is almost always easier to pass arguments in
> Python than muck with XML files.  Also, this let's us put multiple devices
> in a a single process.
> We use devices in just about any way you can imagine.  We often pass the
> wrong type of sockets to a device to make it do novel things.  For example,
> if you pass a single XREQ socket to both arguments of a forwarder device you
> get a REQ/REP style heartbeat socket.  We have also started to write our own
> devices that take more than two sockets.  One example is a monitored queue.
>  It works like a regular queue, but there is a third admin socket that you
> can use to query the status of the device and the messages that have moved
> through it.

Another device we wanted was a queue that worked with two XREP sockets. Of
course, that doesn't normally work, since xrep messages arrive prepended by
the requester's identity, so a normal queue will try to send out to the same
identity that the message came from.  I wrote a copy of zmq::device that
simply swaps the identities around at the beginning of each message before

the device is insocket: xrep1, outsocket: xrep2 (and behaves symmetrically)
xreq1 is connected to xrep1 and as identity 'id1', etc.

the sequence of events in this device:
xreq 1 sends [ 'id2', 'msg' ] to xrep 1
xrep 1 recvs [ 'id1', 'id2', 'msg' ]
<swap first two elements>
xrep 2 sends [ 'id2', 'id1', 'msg' ]
xreq 2 recvs [ 'id1' 'msg' ]

And there can be many xreq sockets on either side of the device, keyed by
their identities.

The effect is that all of the XREQ sockets really behave like XREP, in that
their messages are prepended with an ID specifying the destination, yet they
each have only one connection (to the device), and each message they receive
tells them who it came from.

Further, if you pass the same XREP socket as both in and out, you have a
fully symmetric keyed message relay - anyone can send a message to anyone
else (as opposed to just between side A and side B and back). For N xreq
sockets, you still only need to keep track of the address of the device's


>> Second thing, those pipeline sockets again.  Here is the core of the
>> generic device, acting as broker (such short sweet code, thank you
>> 0MQ!):
>>        int device = parse_device (argv [1]);
>>        switch (device) {
>>            case ZMQ_QUEUE:
>>                frontend_type = ZMQ_XREP;   //  Client
>>                backend_type  = ZMQ_XREQ;   //  Service
>>                break;
>>            case ZMQ_FORWARDER:
>>                frontend_type = ZMQ_SUB;    //  Subscriber
>>                backend_type  = ZMQ_PUB;    //  Publisher
>>                break;
>>            case ZMQ_STREAMER:
>>                frontend_type = ZMQ_PULL;   //  Puller
>>                backend_type  = ZMQ_PUSH;   //  Pusher
>>                break;
>>            default:
>>                return (1);             //  Invalid device
>>        }
>>        //  Create sockets, configure them, and start the device
>>        zmq::context_t context (1);
>>        zmq::socket_t frontend (context, frontend_type);
>>        zmq::socket_t backend (context, backend_type);
>>        if (frontend_type == ZMQ_SUB)
>>            frontend.setsockopt (ZMQ_SUBSCRIBE, "", 0);
>>        frontend.bind (argv [2]);
>>        backend.bind (argv [3]);
>>        zmq::device (device, frontend, backend);
>> As you can see I'm using ZMQ_PUSH and ZMQ_PULL in place of
>> ZMQ_DOWNSTREAM and ZMQ_UPSTREAM.  It seems to work.  A node pulls
>> data, works on it, and pushes it on.  So nodes are pullers, or
>> pushers, or both.  It's short, verbish (like the other socket types)
>> and feels clear.
> I think that hardcoding the socket types in the command line program does
> make sense, but not in the zmq_device function API.
>> If there are no outright objections after a day or so I'll start to
>> deprecate ZMQ_DOWNSTREAM/UPSTREAM and replace with ZMQ_PUSH/PULL.
>> We'll keep the old definitions around, of course, this does not break
>> the ABI.
> I think PUSH/PULL are fine.
> Brian
>> -Pieter
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> --
> Brian E. Granger, Ph.D.
> Assistant Professor of Physics
> Cal Poly State University, San Luis Obispo
> bgranger at calpoly.edu
> ellisonbg at gmail.com
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20100729/5de6a05e/attachment.htm>

More information about the zeromq-dev mailing list