[zeromq-dev] Can't bind same ZMQ_UP/DOWNSTREAM multiple times.
Jon Dyte
jon at totient.co.uk
Fri Aug 20 00:10:22 CEST 2010
Oliver Smith wrote:
> Jon Dyte said the following on 8/19/2010 3:11 PM:
>
>> I'm reading this thread late on, but isn't this what the streamer is
>> for, assuming it is a pipeline, not req-rep?
>>
>> so if you have 3 threads producing data and and 6 threads doing
>> subsequent work (that was the jpg attached earlier in the thread)
>>
>> don't you just need to put a streamer inbetween ?
>>
>> so the 3 producers connect to the streamer and write to it, and the 6
>> workers connect and read from it
>>
>> so 10 threads in this example?
>>
>>
>
> Does the streamer handle inproc:// ?
>
>
does this do what you want
example output
worker thread id publisher thread id,
task no
3059047280 : processing work from : 3014986608.1
3031772016 : processing work from : 3006593904.2
3050654576 : processing work from : 3006593904.1
3075832688 : processing work from : 3023379312.1
3040164720 : processing work from : 3023379312.2
3067439984 : processing work from : 3014986608.2
3075832688 : processing work from : 3023379312.3
3059047280 : processing work from : 3014986608.3
3040164720 : processing work from : 3023379312.4
3050654576 : processing work from : 3006593904.3
3067439984 : processing work from : 3014986608.4
3031772016 : processing work from : 3006593904.4
3075832688 : processing work from : 3023379312.5
#include <zmq.hpp>
#include <stdlib.h>
#include <iostream>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
void *worker_routine (void *arg)
{
unsigned int r;
zmq::context_t *ctx = (zmq::context_t*) arg;
zmq::socket_t sw (*ctx, ZMQ_UPSTREAM);
sw.connect ("inproc://outbound");
while (true)
{
zmq::message_t msg;
sw.recv(&msg);
printf("%u : processing work from : %s\n", (unsigned
int)pthread_self(), (const char*) msg.data());
::fflush(stdout);
::sleep( ((unsigned int)::rand_r(&r)) % 5 );
}
}
void *publisher_routine (void *arg)
{
zmq::context_t *ctx = (zmq::context_t*) arg;
zmq::socket_t sw (*ctx, ZMQ_DOWNSTREAM);
sw.connect ("inproc://inbound");
int tick =0;
while (true)
{
zmq::message_t msg(64);
sprintf((char*)msg.data(),"%u.%d",(unsigned
int)pthread_self(),++tick);
sw.send(msg);
if (tick % 10 == 0)
::sleep(1);
}
}
int main ()
{
::setbuf(stdout, NULL);
zmq::context_t ctx (0);
zmq::socket_t down (ctx, ZMQ_DOWNSTREAM);
zmq::socket_t up (ctx, ZMQ_UPSTREAM);
down.bind("inproc://outbound");
up.bind("inproc://inbound");
::sleep(1); //
int i;
for (i = 0 ; i < 6 ; ++i)
{
pthread_t worker;
int rc = pthread_create (&worker, NULL, worker_routine, (void*)
&ctx);
assert (rc == 0);
}
::sleep(1); // be sure the worker started
for (i = 0 ; i < 3 ; ++i)
{
pthread_t p;
int rc = pthread_create (&p, NULL, publisher_routine, (void*) &ctx);
assert (rc == 0);
}
zmq::device(ZMQ_STREAMER,up,down);
}
More information about the zeromq-dev
mailing list