[zeromq-dev] Announcing em-zeromq, integration with ruby's eventmachine reactor

Jonathan Rockway jon at jrock.us
Sun Jan 30 05:45:35 CET 2011

* On Sat, Jan 29 2011, Andrew Cholakian wrote:
> I'd like to announce a new project: em-zeromq, https://github.com/andrewvc/em-zeromq ,
> which integrates ZeroMQ sockets with Ruby's EventMachine reactor, using ZeroMQ 2.1's
> new File Descriptor support. 
> I'm sure there are bugs and errors in it, the code right now is somewhat rough, with a
> minimal set of passing specs. I'd appreciate patches and suggestions for improvements
> both in the code and API.

I have a few comments; I noticed there is a comment in the source code
that says it's not useful to check writability, but this is not strictly
true: consider the case where ZMQ_HWM is 1 and there are no peers
connected -- the socket will not be writable, but will be after a peer
is connected and consumes that message.  An object that represents an
application level queue of messages to send (perhaps being lazily
calculated) needs this event to know to generate and send the next
message.  (No point in doing work if nobody is around to notice.)

Also, I don't know how EventMachine handles this, but you can't create a
watcher until you've tried and failed to send a message without
blocking.  (Sockets and pipes work the same way; you do the read/write
and then create an IO watcher if you didn't successfully read or write
the entire message.  Otherwise, the event loop is not involved.)

I'm also confused as to why this is commented out:

#complete_messages = (@socket.getsockopt(ZMQ::EVENTS) & ZMQ::POLLIN) ==

As you need to do that.

The strategy I take in my Perl binding is to provide two APIs: a
low-level set of functions to probe write/read-ability, and a Handle
object that takes a buffer of read/write requests.

Anyway, the API for the low-level looks like:

   probe( poll => <direction>, socket => <socket> )
   io( poll => <direction>, socket => <socket>, cb => <callback> )

probe takes a direction (r / w) and a socket, and returns true if that
operation would currently succeed, or false if it would block.  If true,
you should do the operation immediately; if false, you should create a
watcher with "io", which will defer the operation.  (This is just a thin
wrapper around ZMQ_EVENTS, just like the you commented out in your

io takes the same args, but also a callback that will be called when the
operation becomes possible.  It works by creating the releavant IO (fd)
watcher on the ZMQ_FD, and when the event loop calls the IO watcher's
callback, we do a "probe" and call the callback if probe returns true.
Otherwise, we continue to wait (via the event loop and io watcher) until
this happens.  (I am not sure how often the IO watcher will indicate
readability but the socket won't allow a successful read.  But this is a
case that happens with normal sockets/pipes under some event loops, so I
handle it for maximum correctness / paranoia.)

The higher level API on top of this is a Handle object that has two main
entry points, "push_read" and "push_write".  Each function pushes a
callback onto a queue.  When there is stuff in the queue, the Handle
object checks "probe" and calls the callback with the read message (or
calls the callback to generate a message to write) if probe returns
true.  If not, processing of the queue is deferred (via "io") until
write/readability exists again.  Then the process repeats until the
queue is drained.

The code is here:
  probe/io: https://github.com/jrockway/anyevent-zeromq/blob/master/lib/AnyEvent/ZeroMQ.pm
  handle: https://github.com/jrockway/anyevent-zeromq/blob/master/lib/AnyEvent/ZeroMQ/Handle.pm

This approach has been quite successful for me, both within my
applications and for communicating with Mongrel2.

Jonathan Rockway

print just => another => perl => hacker => if $,=$"

More information about the zeromq-dev mailing list