[zeromq-dev] Can't bind same ZMQ_UP/DOWNSTREAM multiple times.

Oliver Smith oliver at kfs.org
Mon Aug 16 18:33:38 CEST 2010


  On 8/16/2010 3:22 AM, Pieter Hintjens wrote:
>> s1.bind("tcp://127.0.0.1:12345")
>> s2.bind("tcp://127.0.0.1.12345")<<- Gives address already in use.
>>
>> Src ==N==>  W1 ==>N+==>  W2 ==>  Dst
> After reading your explanation it's still unclear to me why you are
> trying to bind to the same endpoint twice, and whether you expect that
> to succeed or not (it won't).
As it happens, my usage case here is transitioning 10 years of Visual 
Source Safe history to Subversion using SourceGear's "SourceOffSite" 
command line tool.

The first stage is to extract the directory structure. The second stage 
is to extract the file lists within the directory structure. The third 
stage is to extract the revision history. Finally, I aggregate the 
revision history by datetime/user to create repository commits suitable 
for subversion.

There is a fairly costly overhead to every invocation of the command 
line tool, which can be excellently amortized by running it in parallel.

I need N workers at stage 2 to be able to forward the list of files they 
find to M workers at stage 3. That is, each worker at stage 2 play the 
downstream role to each of the workers at stage 3.

Stage 2 and stage 3 are running on different machines for performance 
reasons (stage 2 can run anywhere, whereas stage 3 benefits from being 
on the same machine as the SourceOffSite server).

In serial - this task ran for over 54 hours before crashing due to the 
inevitable bit of Visual Source Safe corruption :) First tests seem to 
indicate the parallel run will run upto 10x faster.

def stage1:
     for dir in getDirectories():
         stage2->send(dir)

     while commit = stage4->recv():
         ...

def stage2:
     while dir = stage2->recv():
         for file in getFiles(dir):
             stage3->send(file)

def stage3:
     while file = stage3->recv():
         for commit in getCommits(file):
             stage4->send(commit)

I guess the only way to achieve this is with a concentrator.

def stage2:
     stage3a = connect(zmq.UPSTREAM, concport)
     while file = stage2->recv():
         for file in getFiles(dir):
             stage3->send(file)

def stage3a:
     stage3a = bind(zmq.DOWNSTREAM, concport)
     stage3b = bind(zmq.UPSTREAM, stage3port)
     while file = stage3a->recv():
         stage3b->send(file)

def stage3b:
     stage3b = connect(zmq.UPSTREAM, stage3port)
     while file = stage3b->recv():
         for commit in getCommits(file):
             stage4->send(commit)


As a networking guy, I can understand why that is the way it is. It 
makes less sense as a parallelism guy :)

- Oliver




More information about the zeromq-dev mailing list