[zeromq-dev] Can't bind same ZMQ_UP/DOWNSTREAM multiple times.

Jon Dyte jon at totient.co.uk
Fri Aug 20 00:10:22 CEST 2010


Oliver Smith wrote:
> Jon Dyte said the following on 8/19/2010 3:11 PM:
>   
>> I'm reading this thread late on, but isn't this what the streamer is
>> for, assuming it is a pipeline, not req-rep?
>>
>> so if you have 3 threads producing data and and 6 threads doing
>> subsequent work (that was the jpg attached earlier in the thread)
>>
>> don't you just need to put a streamer inbetween ?
>>
>> so the 3 producers connect to the streamer and write to it, and the 6
>> workers connect and read from it
>>
>> so 10 threads in this example?
>>    
>>     
>
> Does the streamer handle inproc:// ?
>
>   
does this do what you want
example output

worker thread id                                   publisher thread id, 
task no
3059047280  : processing work from : 3014986608.1
3031772016  : processing work from : 3006593904.2
3050654576  : processing work from : 3006593904.1
3075832688  : processing work from : 3023379312.1
3040164720  : processing work from : 3023379312.2
3067439984  : processing work from : 3014986608.2
3075832688  : processing work from : 3023379312.3
3059047280  : processing work from : 3014986608.3
3040164720  : processing work from : 3023379312.4
3050654576  : processing work from : 3006593904.3
3067439984  : processing work from : 3014986608.4
3031772016  : processing work from : 3006593904.4
3075832688  : processing work from : 3023379312.5





#include <zmq.hpp>
#include <stdlib.h>
#include <iostream>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

void *worker_routine (void *arg)
{
    unsigned int r;
    zmq::context_t *ctx = (zmq::context_t*) arg;
    zmq::socket_t sw (*ctx, ZMQ_UPSTREAM);
    sw.connect ("inproc://outbound");
    while (true)
    {
        zmq::message_t msg;
        sw.recv(&msg);
        printf("%u  : processing work from : %s\n", (unsigned 
int)pthread_self(), (const char*) msg.data());
        ::fflush(stdout);
        ::sleep( ((unsigned int)::rand_r(&r)) % 5 );
    }
  
}

void *publisher_routine (void *arg)
{
    zmq::context_t *ctx = (zmq::context_t*) arg;
    zmq::socket_t sw (*ctx, ZMQ_DOWNSTREAM);
    sw.connect ("inproc://inbound");

    int tick =0;
    while (true)
    {
        zmq::message_t msg(64);
        sprintf((char*)msg.data(),"%u.%d",(unsigned 
int)pthread_self(),++tick);
        sw.send(msg);
        if (tick % 10 == 0)
            ::sleep(1);
    }
  
}
 
int main ()
{

    ::setbuf(stdout, NULL);

    zmq::context_t ctx (0);

    zmq::socket_t down (ctx, ZMQ_DOWNSTREAM);
    zmq::socket_t up (ctx, ZMQ_UPSTREAM);
  
    down.bind("inproc://outbound");
    up.bind("inproc://inbound");
      
    ::sleep(1); //

    int i;    
    for (i = 0 ; i < 6 ; ++i)
    {
        pthread_t worker;
        int rc = pthread_create (&worker, NULL, worker_routine, (void*) 
&ctx);
        assert (rc == 0);
    }
     
    ::sleep(1); // be sure the worker started

    for (i = 0 ; i < 3 ; ++i)
    {
        pthread_t p;
        int rc = pthread_create (&p, NULL, publisher_routine, (void*) &ctx);
        assert (rc == 0);
    }  

    zmq::device(ZMQ_STREAMER,up,down);

   
}




More information about the zeromq-dev mailing list