[zeromq-dev] Mixing threaded tbb::flow::graph and ZeroMQ
Bill Torpey
wallstprog at gmail.com
Fri Dec 13 15:33:03 CET 2019
HI Brett:
Here are a few thoughts that may help:
- I don’t know CZMQ at all, so comments reflect “pure” 0mq.
- In practice, it’s possible to share PUB sockets using mutexes if you’re careful.
- It is not possible to share SUB sockets, period. It may appear to work, but at some point you will crash. (This has to do with lack of synchronization between user code and code running in background threads).
- There are thread-safe sockets available if you build 0mq in draft mode (see below). These have certain restrictions (e.g., no multi-part messages), and not every native socket type has a thread-safe equivalent.
Good luck.
Bill
From http://api.zeromq.org/master:zmq-socket <http://api.zeromq.org/master:zmq-socket>
> ØMQ has both thread safe socket type and not thread safe socket types. Applications MUST NOT use a not thread safe socket from multiple threads except after migrating a socket from one thread to another with a "full fence" memory barrier.
>
> Following are the thread safe sockets: * ZMQ_CLIENT * ZMQ_SERVER * ZMQ_DISH * ZMQ_RADIO * ZMQ_SCATTER * ZMQ_GATHER
>
> On Dec 12, 2019, at 5:15 PM, Brett Viren <brett.viren at gmail.com> wrote:
>
> Hi,
>
> I want to use ZeroMQ sockets from nodes in a multi-threaded
> tbb::flow::graph and I worry if doing so will violate the socket rules
> on threading.
>
> The test (copied below) creates and destroys any given socket (zsock_t)
> in the main thread. The tbb::flow::graph is also constructed there.
> When that graph is run, any given socket will be used from whatever
> thread TBB happens to apply to the given node invocation (in my
> understanding).
>
> I can tell TBB to not execute particular nodes on multiple threads
> simultaneously so the socket object itself need not contend with
> concurrent access (from the point of view of the app, at least). This
> seems to work.
>
> Pushing my luck, even allowing multiple concurrent invocation the test
> doesn't show any obvious problems. I don't know if TBB took advantage
> of this concurrency allowance
>
>
> I'm hoping someone can say either w/ or w/out the concurrency limit is
> actually safe. Or, if the test is only accidentally working, how might
> I push it further to make it exhibit some problems so I can explore the
> "safety boundary"?
>
>
> Thanks for any guidance!
> -Brett.
>
>
> Compile the test like:
>
> $ g++ -o test_tbbzmq test_tbbzmq.cxx $(pkg-config libczmq --cflags --libs) -ltbb
>
> Eg, run with 1 thread, PUB/SUB sockets
>
> $ ./test_tbbzmq 1 pubsub
>
> Eg, run with 5 threads, PUSH/PULL sockets
>
> $ ./test_tbbzmq 5 pushpull
>
>
> #include <tbb/task_scheduler_init.h>
> #include <tbb/flow_graph.h>
> #include <czmq.h>
>
> #define NLOOPS 10000
>
> struct TGen {
> int count{0};
> bool operator()(int& num) {
> ++count;
> num = count;
> zsys_debug("gen: %d", num);
> return count <= NLOOPS;
> }
> };
>
> // sink/function node body
> struct T2Z {
> zsock_t* sock;
> std::string name;
> T2Z(zsock_t* s, std::string n) : sock(s), name(n) {
> assert(sock);
> }
> int operator()(const int& num) {
> assert(sock);
>
> int rc = zsock_send(sock, "i", num);
> if (rc < 0) {
> zsys_error("t2z %s: %d %s", name.c_str(), errno, strerror(errno));
> return num;
> }
> zsys_debug("t2z: %d", num);
> return num;
> }
> };
>
> // source node body
> struct Z2T {
> zsock_t* sock;
> std::string name;
> Z2T(zsock_t* s, std::string n) : sock(s), name(n) {
> assert(sock);
> }
> bool operator()(int& num) {
> assert(sock);
> int rc = zsock_recv(sock, "i", &num);
> if (rc < 0) {
> zsys_error("z2t %s: %d %s", name.c_str(), errno, strerror(errno));
> return false;
> }
> zsys_debug("z2t: %d", num);
> return num < NLOOPS; // exit on num == NLOOPS
> }
> };
>
> struct Sink {
> int operator()(const int& num) {
> zsys_debug("snk: %d", num);
> return num;
> }
> };
>
> int main(int argc, char* argv[])
> {
> int nthreads = 1;
> if (argc > 1) {
> nthreads = atoi(argv[1]);
> }
> const char* what = "pubsub";
> if (argc > 2) {
> what = argv[2];
> }
>
> zsys_init();
>
> const char* address = "tcp://127.0.0.1:5678";
> zsock_t* pub = NULL;
> zsock_t* sub = NULL;
>
> if (streq(what, "pubsub")) {
> pub = zsock_new_pub(address);
> sub = zsock_new_sub(address, "");
> }
> else if (streq(what, "pushpull")) {
> pub = zsock_new_push(address);
> sub = zsock_new_pull(address);
> }
> else {
> abort();
> }
> assert(pub);
> T2Z t2z(pub, "sender");
>
> assert(sub);
> Z2T z2t(sub, "recver");
>
> TGen gen;
> Sink sink;
>
> zclock_sleep(1000);
>
> tbb::task_scheduler_init m_sched(nthreads);
>
> tbb::flow::graph graph;
> tbb::flow::source_node<int> gen_node(graph, gen, false);
> tbb::flow::source_node<int> z2t_node(graph, z2t, false);
> // change 1 to, eg 10, to allow concurrent invokation
> tbb::flow::function_node<int,int> t2z_node(graph, 1, t2z);
> tbb::flow::function_node<int,int> sink_node(graph, 1, sink);
>
> // gen -tbb-> t2z =zmq=> z2t -tbb-> sink
>
> tbb::flow::make_edge(gen_node, t2z_node);
> tbb::flow::make_edge(z2t_node, sink_node);
>
> z2t_node.activate();
> gen_node.activate();
> graph.wait_for_all();
>
> zsock_destroy(&sub);
> zsock_destroy(&pub);
>
> return 0;
>
> }
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20191213/3c687e43/attachment.htm>
More information about the zeromq-dev
mailing list