[zeromq-dev] Mixing threaded tbb::flow::graph and ZeroMQ
Brett Viren
brett.viren at gmail.com
Thu Dec 12 23:15:02 CET 2019
Hi,
I want to use ZeroMQ sockets from nodes in a multi-threaded
tbb::flow::graph and I worry if doing so will violate the socket rules
on threading.
The test (copied below) creates and destroys any given socket (zsock_t)
in the main thread. The tbb::flow::graph is also constructed there.
When that graph is run, any given socket will be used from whatever
thread TBB happens to apply to the given node invocation (in my
understanding).
I can tell TBB to not execute particular nodes on multiple threads
simultaneously so the socket object itself need not contend with
concurrent access (from the point of view of the app, at least). This
seems to work.
Pushing my luck, even allowing multiple concurrent invocation the test
doesn't show any obvious problems. I don't know if TBB took advantage
of this concurrency allowance
I'm hoping someone can say either w/ or w/out the concurrency limit is
actually safe. Or, if the test is only accidentally working, how might
I push it further to make it exhibit some problems so I can explore the
"safety boundary"?
Thanks for any guidance!
-Brett.
Compile the test like:
$ g++ -o test_tbbzmq test_tbbzmq.cxx $(pkg-config libczmq --cflags --libs) -ltbb
Eg, run with 1 thread, PUB/SUB sockets
$ ./test_tbbzmq 1 pubsub
Eg, run with 5 threads, PUSH/PULL sockets
$ ./test_tbbzmq 5 pushpull
#include <tbb/task_scheduler_init.h>
#include <tbb/flow_graph.h>
#include <czmq.h>
#define NLOOPS 10000
struct TGen {
int count{0};
bool operator()(int& num) {
++count;
num = count;
zsys_debug("gen: %d", num);
return count <= NLOOPS;
}
};
// sink/function node body
struct T2Z {
zsock_t* sock;
std::string name;
T2Z(zsock_t* s, std::string n) : sock(s), name(n) {
assert(sock);
}
int operator()(const int& num) {
assert(sock);
int rc = zsock_send(sock, "i", num);
if (rc < 0) {
zsys_error("t2z %s: %d %s", name.c_str(), errno, strerror(errno));
return num;
}
zsys_debug("t2z: %d", num);
return num;
}
};
// source node body
struct Z2T {
zsock_t* sock;
std::string name;
Z2T(zsock_t* s, std::string n) : sock(s), name(n) {
assert(sock);
}
bool operator()(int& num) {
assert(sock);
int rc = zsock_recv(sock, "i", &num);
if (rc < 0) {
zsys_error("z2t %s: %d %s", name.c_str(), errno, strerror(errno));
return false;
}
zsys_debug("z2t: %d", num);
return num < NLOOPS; // exit on num == NLOOPS
}
};
struct Sink {
int operator()(const int& num) {
zsys_debug("snk: %d", num);
return num;
}
};
int main(int argc, char* argv[])
{
int nthreads = 1;
if (argc > 1) {
nthreads = atoi(argv[1]);
}
const char* what = "pubsub";
if (argc > 2) {
what = argv[2];
}
zsys_init();
const char* address = "tcp://127.0.0.1:5678";
zsock_t* pub = NULL;
zsock_t* sub = NULL;
if (streq(what, "pubsub")) {
pub = zsock_new_pub(address);
sub = zsock_new_sub(address, "");
}
else if (streq(what, "pushpull")) {
pub = zsock_new_push(address);
sub = zsock_new_pull(address);
}
else {
abort();
}
assert(pub);
T2Z t2z(pub, "sender");
assert(sub);
Z2T z2t(sub, "recver");
TGen gen;
Sink sink;
zclock_sleep(1000);
tbb::task_scheduler_init m_sched(nthreads);
tbb::flow::graph graph;
tbb::flow::source_node<int> gen_node(graph, gen, false);
tbb::flow::source_node<int> z2t_node(graph, z2t, false);
// change 1 to, eg 10, to allow concurrent invokation
tbb::flow::function_node<int,int> t2z_node(graph, 1, t2z);
tbb::flow::function_node<int,int> sink_node(graph, 1, sink);
// gen -tbb-> t2z =zmq=> z2t -tbb-> sink
tbb::flow::make_edge(gen_node, t2z_node);
tbb::flow::make_edge(z2t_node, sink_node);
z2t_node.activate();
gen_node.activate();
graph.wait_for_all();
zsock_destroy(&sub);
zsock_destroy(&pub);
return 0;
}
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 832 bytes
Desc: not available
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20191212/b3411716/attachment.sig>
More information about the zeromq-dev
mailing list