[zeromq-dev] Inter thread communication for scalability

Goswin von Brederlow goswin-v-b at web.de
Wed Jan 15 15:36:12 CET 2014

On Tue, Jan 14, 2014 at 01:16:32PM -0500, Lindley French wrote:
> In this case your "buffers" are really just messages, aren't they? A thread
> grabs one (receives a message), processes it, and writes the result into
> another buffer (sends a message).
> The hard part is that ZeroMQ sockets don't like to be touched by multiple
> threads, which complicates the many-to-many pattern you have going here.
> I'm no expert, but I would suggest....
> Each "pool", A and B, becomes a single thread with two ZMQ inproc sockets,
> one push and one pull. These are both bound to well-known endpoints. All
> the thread does is continually shove messages from the pull socket to the
> push socket.
> Each thread in "Thread set 1" has a push inproc socket connected to pool
> A's pull socket.
> Each thread in "Thread set 2" has a pull inproc socket connected to pool
> A's push socket and a push inproc socket connected to pool B's pull socket.
> For each message it receives, it just processes it and spits it out the
> other socket.
> The thread in "Thread set 3" has a pull inproc socket connected to pool B's
> push socket. It just continually receives messages and outputs them.
> This may seem complicated because concepts that were distinct before
> (buffer pools and worker threads) are now the same thing: they're both just
> threads with sockets. The critical difference is that the "buffer pools"
> bind to well-known endpoints, so you can only have a few of them, while the
> worker threads connect to those well-known endpoints, so you can have as
> many as you like.
> Will this perform as well as your current code? I don't know. Profile it
> and find out.
> On Tue, Jan 14, 2014 at 12:23 PM, Kenneth Adam Miller <
> kennethadammiller at gmail.com> wrote:
> > So, I have two pools of shared buffers; pool A, which is a set of buffers
> > of uncompressed data, and pool B, for compressed data. I three sets of
> > threads.
> >
> > Thread set 1 pulls from pool A, and fills buffers it receives from pool A
> > up with uncompressed data.
> >
> > Thread set 2 is given a pool from A that has recently been filled. It
> > pulls a buffer from pool B, compresses from A into B, and then returns the
> > buffer it was given, cleared, back to pool A.
> >
> > Thread set 3 is a single thread, that is continually handed compressed
> > data from thread set 2, which it outputs. When data is finished output, it
> > returns the buffer to pool B, cleared.
> >
> > Can anybody describe a scheme to me that will allow thread sets 1 & 2 to
> > scale?
> >
> > Also, suppose for pools A and B, I'm using shared queues that are just C++
> > stl lists. When I pop from the front, I use a lock for removal to make sure
> > that removal is deterministic. When I enqueue, I use a separate lock to
> > ensure that the internals of the STL list is respected (don't want two
> > threads receiving iterators to the same beginning node, that would probably
> > corrupt the container or cause data loss, or both). Is this the appropriate
> > way to go about it? Thread sets 1 & 2 will likely have more than one
> > thread, but there's no guarantee that thread sets 1 & 2 will have equal
> > threads.
> >
> > I was reading the ZeroMQ manual, and I read the part about multi-threading
> > and message passing, and I was wondering what approaches should be taken
> > with message passing when data is inherently shared between threads.

All those threads get confusing. Lets draw a picture:

   |                                                                    |
   v                                                                    |
 Pool A          Thread Set 1         Router X                          |
PULL-PUSH --==> PULL-read-PUSH ==--> PULL-PUSH --==> PULL Thread Set 2 PUSH
                                                         \            /
                                      Pool B             /            \
                                     PULL-PUSH --==> PULL              PUSH
                                      ^                                 ||
                                      |                                 ||
                                      +----------- PUSH-write-PULL <--==++
                                                      Thread 3

At the start "Pool A" and "Pool B" allocate a number a buffers and
push them (messages containing a pointer to the buffer). Just make
sure the high water mark is high enough for all buffers to fit.

After that all processes recv() from their PULL sockets and send() to
their PUSH sockets.

Note 1: -= denotes a 1-to-N connect and =- a N-to-1 connect

Note 2: I inserted a router process "Router X" to avoid a N-to-M
connection between thread set 1 and thread set 2. The "Router X" gives
both thread sets a single fixed address to connect to. Not sure if
that is required.

Was that what you invisioned?


PS: Do you need buffer pools? Why not just allocate buffers as needed
and free them when done? Is that to limit the total number of buffers
or is your malloc()/free() the limiting factor?

More information about the zeromq-dev mailing list