[zeromq-dev] Mixing threaded tbb::flow::graph and ZeroMQ

Brett Viren brett.viren at gmail.com
Fri Dec 13 22:18:01 CET 2019


Hi again,

Fyi, I'm convinced not to push my luck with the non-thread safe sockets.
Even with tbb::flow::graph inherent mutex (concurrency=1) I was able to
make my test segfault or lose messages by adding an N-to-1 socket
connection pattern.

For reference, my updated test is copied below.

Also, I found this ground had been tread previously:

  https://github.com/zeromq/libzmq/issues/2759
  https://github.com/WallStProg/zmqtests

I guess it's a well worn path :)

Thanks again for the input.

-Brett.


  $ ./test_tbbzmq 10 1 pushpull

#include <tbb/task_scheduler_init.h>
#include <tbb/flow_graph.h>
#include <czmq.h>

#define NLOOPS 10000
#define NGENS 3

struct TGen {
    int count{0};
    int id;
    TGen(int id) : id(id) {}
    ~TGen() {
        zsys_debug("gen%d: %d destructing", id, count);
    }
    bool operator()(int& num) {
        num = count;
        ++count;
        if (count <= NLOOPS) {
            zsys_debug("gen%d: %d", id, num);
            return true;
        }
        zsys_debug("gen%d: %d exiting", id, num);
        return false;
    }
};

// sink/function node body
struct T2Z {
    zsock_t* sock;
    int id;
    int count{0};
    T2Z(zsock_t* s, int id) : sock(s), id(id) {
        assert(sock);
    }
    ~T2Z() {
        zsys_debug("t2z%d: %d destructing", id, count);
    }
    int operator()(const int& num) {
        assert(sock);
        
        int rc = zsock_send(sock, "i", num);
        ++count;
        if (rc < 0) {
            zsys_error("t2z%d: %d %s", id, errno, strerror(errno));
            return num;
        }
        zsys_debug("t2z%d: %d", id, num);
        return num;
    }
};

// source node body converting zmq message to tbb
struct Z2T {
    zsock_t* sock;
    int id;
    int count{0};
    Z2T(zsock_t* s, int id) : sock(s), id(id) {
        assert(sock);
    }
    ~Z2T() {
        zsys_debug("z2t%d: %d destructing", id, count);
    }
    bool operator()(int& num) {
        assert(sock);
        int rc = zsock_recv(sock, "i", &num);
        if (rc < 0) {
            zsys_error("z2t%d: %d %s", id, errno, strerror(errno));
            return false;
        }
        ++count;
        if (count < NGENS * NLOOPS) {
            zsys_debug("z2t%d: %d [%d]", id, num, count);
            return true;
        }
        zsys_debug("z2t%d: %d [%d] exiting", id, num, count);
        return false;
    }
};

struct Sink {
    int count{0};
    int operator()(const int& num) {
        zsys_debug("snk: %d at %d", num, count);
        ++count;
        return num;
    }
};

int main(int argc, char* argv[])
{
    // NGENS*[gen -tbb-> t2z] =zmq=> z2t -tbb-> sink


    int nthreads = 1;
    if (argc > 1) {
        nthreads = atoi(argv[1]);
    }
    int nconcurs = 1;
    if (argc > 2) {
        nconcurs = atoi(argv[1]);
    }
    const char* what = "pubsub";
    if (argc > 3) {
        what = argv[3];
    }

    zsys_init();


    const char* address = "tcp://127.0.0.1:5678";


    int sender_stype = ZMQ_PUB;
    int recver_stype = ZMQ_SUB;

    if (streq(what, "pubsub")) {
        sender_stype = ZMQ_PUB;
        recver_stype = ZMQ_SUB;
    }    
    else if (streq(what, "pushpull")) {
        sender_stype = ZMQ_PUSH;
        recver_stype = ZMQ_PULL;
    }    
    else {
        abort();
    }

    std::vector<T2Z> t2zs;
    for (int ind=0; ind<NGENS; ++ind) {
        zsock_t* sock = zsock_new(sender_stype);
        assert(sock);
        int rc = zsock_connect(sock, "%s", address);
        assert (rc >= 0);
        t2zs.emplace_back(sock, ind);
    }

    zsock_t* recv_sock = zsock_new(recver_stype);
    assert(recv_sock);
    if (recver_stype == ZMQ_SUB) {
        zsock_set_subscribe(recv_sock, "");
    }
    int port = zsock_bind(recv_sock, "%s", address);
    assert (port == 5678);
    Z2T z2t(recv_sock, 0);


    // give time for everybody to bind/connect
    zclock_sleep(1000);

    std::vector<TGen> gens;
    for (int ind=0; ind<NGENS; ++ind) {
        gens.emplace_back(ind);
    }
    Sink sink;

    tbb::task_scheduler_init m_sched(nthreads);

    tbb::flow::graph graph;
    std::vector< tbb::flow::source_node<int> > gen_nodes;
    std::vector< tbb::flow::function_node<int,int> > t2z_nodes;
    for (int ind=0; ind<NGENS; ++ind) {
        gen_nodes.emplace_back(graph, gens[ind], false);
        t2z_nodes.emplace_back(graph, nconcurs, t2zs[ind]);
    }
    tbb::flow::source_node<int> z2t_node(graph, z2t, false);    
    tbb::flow::function_node<int,int> sink_node(graph, nconcurs, sink);

    for (int ind=0; ind<NGENS; ++ind) {
        tbb::flow::make_edge(gen_nodes[ind], t2z_nodes[ind]);
    }
    tbb::flow::make_edge(z2t_node, sink_node);

    z2t_node.activate();
    for (auto& gn : gen_nodes) {
        gn.activate();
    }
    graph.wait_for_all();    

    for (auto& t2z : t2zs) {
        zsock_destroy(&t2z.sock);
    }
    zsock_destroy(&z2t.sock);

    return 0;

}

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 832 bytes
Desc: not available
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20191213/74888aa8/attachment.sig>


More information about the zeromq-dev mailing list