[zeromq-dev] Usage case description and questions

Brian Granger ellisonbg at gmail.com
Thu Feb 11 20:44:06 CET 2010


Hi,

I have been asking some questions about various aspects of zeromq and
have gotten some great answers.  Some of the usage cases we are
looking at are a little unusual - or perhaps, we are just still trying
to figure out how to map our abstractions onto zeromq.  I thought it
would be useful to describe our usage case so I can ask some more
concrete questions.

As part of the IPython project (http://ipython.scipy.org) we are
building a distributed architecture that enables a python script to be
parallelized.  This architecture allows tasks in the Python script
(let's say, the script is running on your laptop at Starbucks...) to
be sent to a remote cluster or supercomputer, where a set of worker
processes will perform the work.  If it helps you can picture a task
as a python function along with a set of arguments.

The distribution of tasks to the worker processes is handled by an
additional process, which we call the controller.  The controller is
basically a message hub and contains a set of queues and scheduling
logic that controls what tasks are performed by which workers.
The number of workers managed by a single controller can be large -
512 - 1024 is not impossible.  And we need latency to be as low as
possible as it determines the minimum task granularity that you can
expect to get a parallel speedup for.  The controller almost always
runs on the same local subnet as the workers.

We are looking at implementing our controller and messaging using
zeromq.  This is attractive for a number of reasons: performance,
simple wire protocols, support for python, c/c++, Java, .NET and did I
mention performance!

Most of my questions are about how we can implement our scheduling
algorithms in the controller.  We support a wide range of scheduling
approaches:

* Dynamic load balancing.  In this scenario, tasks are sent to workers
that are "least" busy.
In english, this is "run this task ASAP anywhere you can."

* Worker specific scheduling.  By this, I mean we need the ability to
track worker processes  by id/name/number and send particular tasks to
that particular worker.
In english this is "run this task only on this worker, even if it takes longer."

Regardless of how the task is scheduled, the workers return the
results of the task to the main python script by sending messages back
through the controller.  Additionally, if a worker dies while working
on a task or the task fails for any reason, the main python script
needs to be receive a message indicating that.  If a task fails in a
manner that leaves the worker in a functioning state, the worker can
simply send a message indicating that the task failed.  However, if
the task fails because the worker dies (someone unplugs the worker),
the worker is not able to send back a message indicating failure.

The way we are currently thinking of mapping this onto the zeromq
messaging model by using topics to indicate which worker should
perform the task.  When a worker connects to the controller, it could
send a presence message.  That message would declare a topic that the
worker would subscribe to.  We this might have the following set of
topics:

cluster.worker1
cluster.worker2
cluster.worker3
...
cluster.worker_any  # this is for load balancing
cluster.worker_all   # multicast to all workers.

Some of these topics might simply be different socket types rather
than topics.  But, the overall idea is to use zeromq topics, queues
and socket types to handle the scheduling.

Now, the parts I am not clear on:

* We need a way of handling workers coming and going. In particular,
we need a way of detecting if a worker dies.  In our current RPC based
approach, we detect this using TCP errors messages.  Once the TCP
connection is lost, the workers is assumed dead and we can reschedule
the task and notify the main script.

* We need a way of re-scheduling tasks that fail to different workers.
 The idea that messages get queued in zeromq and can't be unqueued or
re-queued in a different way seems to be problematic for us.  The only
way I can see around this is for us to maintain our our application
level queues that feed the zeromq queues.  Not ideal.

After thinking more about all of this, I agree that "disconnect
notifcation" is probably not the best way of handling these things.
Some other options:

* It would be fantastic if there were a "HEARTBEAT" type zeromq socket
that has a single method "isalive" that either endpoint can call and
get back true or false.  I know it is easy to write this logic into
our application, but because of the subtleties of Python's Global
Interpreter lock, this has to be written at the C level.  Might as
well build it into zeromq itself.

* We might be able to handle some of the zeromq queue management if
there was an API for working with the queues attached to each socket.
Actions like: how many outbound messages are in the queue, get a list
of the topics of each message in the queue, remove a message, clear
all outbound messages older than T, etc.  This way, if we detect
(using a heartbeat that a worker has died, we can at least handle the
zeromq queues in our application.  Right now the zeromq queues are
black boxes, which is tough.

Alright, that is a start.  Any thoughts, ideas or feedback would be
greatly appreciated.
As an aside, it looks like if we used zeromq, we could improve our
latency by a factor of 10-100.  That would simply be amazing and would
enable to scale up our system to very large cluster and
supercomputers.

Cheers,

Brian



-- 
Brian E. Granger, Ph.D.
Assistant Professor of Physics
Cal Poly State University, San Luis Obispo
bgranger at calpoly.edu
ellisonbg at gmail.com



More information about the zeromq-dev mailing list