[zeromq-dev] thread safety using inproc sockets for inter-thread communication

Bill Torpey wallstprog at gmail.com
Wed Sep 27 23:52:46 CEST 2017


OK, now I’m really confused.  

I’ve done some more digging, and the original approach looks like it should work, as per Pieter’s book(s) (“Code Connected” — dated Jan. 6 2013, and "The ZeroMQ Guide - for C Developers” — undated) , in “Signaling between Threads”:

- The example explicitly closes the sending side after the send, so that looks OK as per the docs;
- He also specifically says that PUB/SUB sockets should work too — the only downside to that approach being the need to issue an empty subscription on the sub side.

I’ve tried both of these approaches with the following results:

- PUB/SUB sockets intermittently drop messages;
- PAIR sockets hang in zmq_send (see below).

Now, the one thing I’m doing differently from the example is that instead of doing a zmq_recv I’m doing a zmq_poll on the receiving thread.  

I need to do this because I’m reading from multiple sockets in the dispatch thread — in addition to the control (inproc) socket, I also need to poll a “discovery” socket, which is connected to a zmq_proxy, and a “data” socket, which handles direct TCP connections for high-volume data that I don’t want to send through the proxy.

Now I could set a timeout on the poll, but I can’t really set it short enough, and anyway that’s just going to burn (literally) a CPU core.

Is this a bug or a feature?  

Can anyone suggest a solution?  I can’t really see any other way to do inter-thread communication safely with ZeroMQ.

Any hints, tips, suggestions would be much appreciated!  And, let me know if this would be better discussed in GitHub issues.

Thanks in advance.

Bill Torpey

ZeroMQ built from source (d17581929cceceda02b4eb8abb054f996865c7a6) using gcc 4.8.2 on CentOS 6

Below is the stack trace from the hung process.  The deadlock appears to be between thread 5 (doing a zmq_poll on the three sockets described above), and thread 1 (stuck in zmq_send).

In this example, the two sockets are both ZMQ_PAIR’s, the receiver binds and the sender connects, as per the docs.  As mentioned earlier I’ve tried this with PUB/SUB too — that doesn’t hang, but messages go missing.

Thread 9 (Thread 0x7feeaa6b2700 (LWP 19294)):
#0  0x00000033bfee1623 in select () from /lib64/libc.so.6
No symbol table info available.
#1  0x00007feeaaa318c4 in dispatchEntry (closure=0xc9c060) at common/c_cpp/src/c/timers.c:148
        ele = 0xcf83e0
        timeout = {tv_sec = 0, tv_usec = 536612}
        now = {tv_sec = 1506547105, tv_usec = 127372}
        timeptr = 0x7feeaa6b1880
        heap = 0xc9c060
        wakeUpDes = {fds_bits = {64, 0 <repeats 15 times>}}
        selectReturn = 1
        buff = 119 'w'
#2  0x00000033c0607aa1 in start_thread () from /lib64/libpthread.so.0
No symbol table info available.
#3  0x00000033bfee8bcd in clone () from /lib64/libc.so.6
No symbol table info available.

Thread 8 (Thread 0x7feea9cb1700 (LWP 19295)):
#0  0x00000033c060da00 in sem_wait () from /lib64/libpthread.so.0
No symbol table info available.
#1  0x00007feeaaa22c3c in zmqBridgeMamaIoImpl_dispatchThread (closure=0xca1850)
    at /home/btorpey/work/OpenMAMA-zmq/nyfix/src/io.c:238
        dispatchResult = 0
#2  0x00000033c0607aa1 in start_thread () from /lib64/libpthread.so.0
No symbol table info available.
#3  0x00000033bfee8bcd in clone () from /lib64/libc.so.6
No symbol table info available.

Thread 7 (Thread 0x7feea8a0a700 (LWP 19296)):
#0  0x00000033bfee91c3 in epoll_wait () from /lib64/libc.so.6
No symbol table info available.
#1  0x00007feeaa772efc in zmq::epoll_t::loop (this=0xcaeca0) at /home/btorpey/work/libzmq/master/src/epoll.cpp:168
        timeout = 0
        n = -1
        ev_buf = {{events = 1, data = {ptr = 0xcaed80, fd = 13299072, u32 = 13299072, u64 = 13299072}}, {events = 0, data = {
              ptr = 0x0, fd = 0, u32 = 0, u64 = 0}} <repeats 255 times>}
#2  0x00007feeaa7731b0 in zmq::epoll_t::worker_routine (arg_=0xcaeca0) at /home/btorpey/work/libzmq/master/src/epoll.cpp:203
No locals.
#3  0x00007feeaa7be982 in thread_routine (arg_=0xcaed20) at /home/btorpey/work/libzmq/master/src/thread.cpp:106
        signal_set = {__val = {18446744067267100671, 18446744073709551615 <repeats 15 times>}}
        rc = 0
        self = 0xcaed20
#4  0x00000033c0607aa1 in start_thread () from /lib64/libpthread.so.0
No symbol table info available.
#5  0x00000033bfee8bcd in clone () from /lib64/libc.so.6
No symbol table info available.

Thread 6 (Thread 0x7feea8009700 (LWP 19297)):
#0  0x00000033bfee91c3 in epoll_wait () from /lib64/libc.so.6
No symbol table info available.
#1  0x00007feeaa772efc in zmq::epoll_t::loop (this=0xcaf580) at /home/btorpey/work/libzmq/master/src/epoll.cpp:168
        timeout = 0
        n = -1
        ev_buf = {{events = 4, data = {ptr = 0xcdf100, fd = 13496576, u32 = 13496576, u64 = 13496576}}, {events = 1, data = {
              ptr = 0xcb50b0, fd = 13324464, u32 = 13324464, u64 = 13324464}}, {events = 4, data = {ptr = 0xcf8230, 
              fd = 13599280, u32 = 13599280, u64 = 13599280}}, {events = 4, data = {ptr = 0xd0b550, fd = 13677904, 
              u32 = 13677904, u64 = 13677904}}, {events = 4, data = {ptr = 0xd03200, fd = 13644288, u32 = 13644288, 
              u64 = 13644288}}, {events = 4, data = {ptr = 0xd32630, fd = 13837872, u32 = 13837872, u64 = 13837872}}, {
            events = 0, data = {ptr = 0x0, fd = 0, u32 = 0, u64 = 0}} <repeats 250 times>}
#2  0x00007feeaa7731b0 in zmq::epoll_t::worker_routine (arg_=0xcaf580) at /home/btorpey/work/libzmq/master/src/epoll.cpp:203
No locals.
#3  0x00007feeaa7be982 in thread_routine (arg_=0xcaf600) at /home/btorpey/work/libzmq/master/src/thread.cpp:106
        signal_set = {__val = {18446744067267100671, 18446744073709551615 <repeats 15 times>}}
        rc = 0
        self = 0xcaf600
#4  0x00000033c0607aa1 in start_thread () from /lib64/libpthread.so.0
No symbol table info available.
#5  0x00000033bfee8bcd in clone () from /lib64/libc.so.6
No symbol table info available.

Thread 5 (Thread 0x7feea7608700 (LWP 19298)):
#0  0x00000033bfedf383 in poll () from /lib64/libc.so.6
No symbol table info available.
#1  0x00007feeaa7ce83e in zmq::socket_poller_t::wait (this=0x7feea76077f0, events_=0xd52660, n_events_=3, timeout_=-1)
    at /home/btorpey/work/libzmq/master/src/socket_poller.cpp:447
        rc = 1
        timeout = -1
        found = 0
        clock = {last_tsc = 833960319375223, last_time = 269616557}
        now = 0
        end = 0
        first_pass = false
#2  0x00007feeaa7cc376 in zmq_poller_wait_all (poller_=0x7feea76077f0, events=0xd52660, n_events=3, timeout_=-1)
    at /home/btorpey/work/libzmq/master/src/zmq.cpp:1371
        rc = 0
#3  0x00007feeaa7ccbda in zmq_poller_poll (items_=0x7feea76078c0, nitems_=3, timeout_=-1)
    at /home/btorpey/work/libzmq/master/src/zmq.cpp:813
        poller = {tag = 3405691582, signaler = 0x0, items = std::vector of length 3, capacity 4 = {{socket = 0xcaf940, fd = 0, 
              user_data = 0x0, events = 1, pollfd_index = -1}, {socket = 0xcb2d40, fd = 0, user_data = 0x0, events = 1, 
              pollfd_index = -1}, {socket = 0xcd57f0, fd = 0, user_data = 0x0, events = 1, pollfd_index = -1}}, 
          need_rebuild = false, use_signaler = false, poll_size = 3, pollfds = 0x1192eb0}
        repeat_items = false
        j_start = 32750
        found_events = 32750
        rc = 0
        events = 0xd52660
#4  0x00007feeaa7cbd09 in zmq_poll (items_=0x7feea76078c0, nitems_=3, timeout_=-1)
    at /home/btorpey/work/libzmq/master/src/zmq.cpp:861
No locals.
#5  0x00007feeaaa284d4 in zmqBridgeMamaTransportImpl_dispatchThread (closure=0xcaa4b0)
    at /home/btorpey/work/OpenMAMA-zmq/nyfix/src/transport.c:1118
        size = -1
        items = {{socket = 0xcaf940, fd = 0, events = 1, revents = 0}, {socket = 0xcb2d40, fd = 0, events = 1, revents = 0}, {
            socket = 0xcd57f0, fd = 0, events = 1, revents = 0}}
        rc = 1
        impl = 0xcaa4b0
        zmsg = {
          _ = "\000\000\000\000\000\000\000\000`&\325\000\000\000\000\000\340d\271G\374\177\000\000\005\235\242\252\356\177\000\000\070\033\322\000\000\000\000\000 \000e\000\000\000\000\000 *\325", '\000' <repeats 12 times>}
#6  0x00000033c0607aa1 in start_thread () from /lib64/libpthread.so.0
No symbol table info available.
#7  0x00000033bfee8bcd in clone () from /lib64/libc.so.6
No symbol table info available.

Thread 4 (Thread 0x7feea6c07700 (LWP 19299)):
#0  0x00000033c060daf1 in sem_timedwait () from /lib64/libpthread.so.0
No symbol table info available.
#1  0x00007feeb2baf7d8 in wsem_timedwait (sem=0xcf6d80, timeoutval=500) at common/c_cpp/src/c/wSemaphore.c:56
        ts = {tv_sec = 1506547105, tv_nsec = 627401000}
        tv = {tv_sec = 1506547105, tv_usec = 127401}
#2  0x00007feeb2bad0aa in wombatQueue_dispatchInt (queue=0xcf6d80, data=0x0, closure=0x0, isTimed=1 '\001', timout=500)
    at common/c_cpp/src/c/queue.c:273
        impl = 0xcf6d80
        head = 0x0
        cb = 0x0
        closure_ = 0x0
        data_ = 0x0
#3  0x00007feeb2bad2a2 in wombatQueue_timedDispatch (queue=0xcf6d80, data=0x0, closure=0x0, timeout=500)
    at common/c_cpp/src/c/queue.c:335
No locals.
#4  0x00007feeaaa244b8 in zmqBridgeMamaQueue_dispatch (queue=0xcf6ce0) at /home/btorpey/work/OpenMAMA-zmq/nyfix/src/queue.c:253
        status = WOMBAT_QUEUE_OK
        impl = 0xcf6ce0
#5  0x00007feeb2b8aa72 in mamaQueue_dispatch (queue=0xcf6be0) at mama/c_cpp/src/c/queue.c:825
        impl = 0xcf6be0
        status = MAMA_STATUS_OK
#6  0x00007feeb2b8b421 in dispatchThreadProc (closure=0xcf66e0) at mama/c_cpp/src/c/queue.c:1264
        impl = 0xcf66e0
#7  0x00000033c0607aa1 in start_thread () from /lib64/libpthread.so.0
No symbol table info available.
#8  0x00000033bfee8bcd in clone () from /lib64/libc.so.6
No symbol table info available.

Thread 3 (Thread 0x7feea6206700 (LWP 19300)):
#0  0x00000033c060daf1 in sem_timedwait () from /lib64/libpthread.so.0
No symbol table info available.
#1  0x00007feeb2baf7d8 in wsem_timedwait (sem=0xc9aa10, timeoutval=500) at common/c_cpp/src/c/wSemaphore.c:56
        ts = {tv_sec = 1506547105, tv_nsec = 873049000}
        tv = {tv_sec = 1506547105, tv_usec = 373049}
#2  0x00007feeb2bad0aa in wombatQueue_dispatchInt (queue=0xc9aa10, data=0x0, closure=0x0, isTimed=1 '\001', timout=500)
    at common/c_cpp/src/c/queue.c:273
        impl = 0xc9aa10
        head = 0x0
        cb = 0x0
        closure_ = 0x0
        data_ = 0x0
#3  0x00007feeb2bad2a2 in wombatQueue_timedDispatch (queue=0xc9aa10, data=0x0, closure=0x0, timeout=500)
    at common/c_cpp/src/c/queue.c:335
No locals.
#4  0x00007feeaaa244b8 in zmqBridgeMamaQueue_dispatch (queue=0xc9a970) at /home/btorpey/work/OpenMAMA-zmq/nyfix/src/queue.c:253
        status = WOMBAT_QUEUE_TIMEOUT
        impl = 0xc9a970
#5  0x00007feeb2b8aa72 in mamaQueue_dispatch (queue=0xc9a870) at mama/c_cpp/src/c/queue.c:825
        impl = 0xc9a870
        status = MAMA_STATUS_OK
#6  0x00007feeaaa2238e in zmqBridge_start (defaultEventQueue=0xc9a870) at /home/btorpey/work/OpenMAMA-zmq/nyfix/src/bridge.c:162
No locals.
#7  0x00007feeb2b6e714 in mama_start (bridgeImpl=0xc9a490) at mama/c_cpp/src/c/mama.c:1666
        impl = 0xc9a490
        rval = MAMA_STATUS_OK
        prevRefCnt = 0
#8  0x00007feeb3fbb4b3 in Transact::MamaDispatchThread::run (this=0xcf8430) at MamaConnectionImpl.cpp:416
No locals.
#9  0x00007feeb4252b63 in MXThread_launcher (args=0xcf8430) at MXThread.cpp:109
        obj = 0xcf8430
#10 0x00000033c0607aa1 in start_thread () from /lib64/libpthread.so.0
No symbol table info available.
#11 0x00000033bfee8bcd in clone () from /lib64/libc.so.6
No symbol table info available.

Thread 2 (Thread 0x7feea5805700 (LWP 19301)):
#0  0x00000033c060daf1 in sem_timedwait () from /lib64/libpthread.so.0
No symbol table info available.
#1  0x00007feeb2baf7d8 in wsem_timedwait (sem=0xcf8b10, timeoutval=500) at common/c_cpp/src/c/wSemaphore.c:56
        ts = {tv_sec = 1506547105, tv_nsec = 873049000}
        tv = {tv_sec = 1506547105, tv_usec = 373049}
#2  0x00007feeb2bad0aa in wombatQueue_dispatchInt (queue=0xcf8b10, data=0x0, closure=0x0, isTimed=1 '\001', timout=500)
    at common/c_cpp/src/c/queue.c:273
        impl = 0xcf8b10
        head = 0x0
        cb = 0x0
        closure_ = 0x0
        data_ = 0x0
#3  0x00007feeb2bad2a2 in wombatQueue_timedDispatch (queue=0xcf8b10, data=0x0, closure=0x0, timeout=500)
    at common/c_cpp/src/c/queue.c:335
No locals.
#4  0x00007feeaaa244b8 in zmqBridgeMamaQueue_dispatch (queue=0xcf8a70) at /home/btorpey/work/OpenMAMA-zmq/nyfix/src/queue.c:253
        status = WOMBAT_QUEUE_TIMEOUT
        impl = 0xcf8a70
#5  0x00007feeb2b8aa72 in mamaQueue_dispatch (queue=0xcf8990) at mama/c_cpp/src/c/queue.c:825
        impl = 0xcf8990
        status = MAMA_STATUS_OK
#6  0x00007feeb2b8b421 in dispatchThreadProc (closure=0xcf9830) at mama/c_cpp/src/c/queue.c:1264
        impl = 0xcf9830
#7  0x00000033c0607aa1 in start_thread () from /lib64/libpthread.so.0
No symbol table info available.
#8  0x00000033bfee8bcd in clone () from /lib64/libc.so.6
No symbol table info available.

Thread 1 (Thread 0x7feeb26fdc60 (LWP 19291)):
#0  0x00000033bfedf383 in poll () from /lib64/libc.so.6
No symbol table info available.
#1  0x00007feeaa7a5127 in zmq::signaler_t::wait (this=0x1197b48, timeout_=-1)
    at /home/btorpey/work/libzmq/master/src/signaler.cpp:233
        pfd = {fd = 38, events = 1, revents = 0}
        rc = 0
#2  0x00007feeaa7797bb in zmq::mailbox_t::recv (this=0x1197ae0, cmd_=0x7ffc47b96280, timeout_=-1)
    at /home/btorpey/work/libzmq/master/src/mailbox.cpp:81
        rc = -1
        ok = false
#3  0x00007feeaa7aaaee in zmq::socket_base_t::process_commands (this=0x11975f0, timeout_=-1, throttle_=false)
    at /home/btorpey/work/libzmq/master/src/socket_base.cpp:1335
        rc = 0
        cmd = {destination = 0x1193400, type = zmq::command_t::pipe_term_ack, args = {stop = {<No data fields>}, 
            plug = {<No data fields>}, own = {object = 0x7feea7607510}, attach = {engine = 0x7feea7607510}, bind = {pipe = 
    0x7feea7607510}, activate_read = {<No data fields>}, activate_write = {msgs_read = 140662987060496}, hiccup = {
              pipe = 0x7feea7607510}, pipe_term = {<No data fields>}, pipe_term_ack = {<No data fields>}, pipe_hwm = {
              inhwm = -1486850800, outhwm = 32750}, term_req = {object = 0x7feea7607510}, term = {linger = -1486850800}, 
            term_ack = {<No data fields>}, reap = {socket = 0x7feea7607510}, reaped = {<No data fields>}, 
            done = {<No data fields>}}}
#4  0x00007feeaa7aa2e4 in zmq::socket_base_t::send (this=0x11975f0, msg_=0x7ffc47b963f0, flags_=0)
    at /home/btorpey/work/libzmq/master/src/socket_base.cpp:1148
        sync_lock = {mutex = 0x0}
        rc = -1
        timeout = -1
        end = 0
#5  0x00007feeaa7cab76 in s_sendmsg (s_=0x11975f0, msg_=0x7ffc47b963f0, flags_=0)
    at /home/btorpey/work/libzmq/master/src/zmq.cpp:375
        sz = 257
        rc = 0
        max_msgsz = 140663039244940
#6  0x00007feeaa7cacbd in zmq_send (s_=0x11975f0, buf_=0x7ffc47b964f0, len_=257, flags_=0)
    at /home/btorpey/work/libzmq/master/src/zmq.cpp:409
        msg = {
          _ = "\000\000\000\000\000\000\000\000\240q\031\001\000\000\000\000Pd\271G\374\177\000\000\005\235\242\252\356\177\000\000\370\313\323\000\000\000\000\000\360uf\000\000\000\000\000\360u\031\001", '\000' <repeats 11 times>}
        __PRETTY_FUNCTION__ = "int zmq_send(void*, const void*, size_t, int)"
        s = 0x11975f0
        rc = 0
#7  0x00007feeaaa2abd9 in zmqBridgeMamaTransportImpl_sendCommand (impl=0xcaa4b0, msg=0x7ffc47b964f0, msgSize=257)
    at /home/btorpey/work/OpenMAMA-zmq/nyfix/src/transport.c:1656
        status = MAMA_STATUS_OK
        temp = 0x11975f0
        __FUNCTION__ = "zmqBridgeMamaTransportImpl_sendCommand"
        rc = 0
        i = 0
#8  0x00007feeaaa256eb in zmqBridgeMamaSubscriptionImpl_subscribe (transport=0xcaa4b0, topic=0xcf6890 "MXLIB/BU")
    at /home/btorpey/work/OpenMAMA-zmq/nyfix/src/subscription.c:417
        s = MAMA_STATUS_OK
        msg = {command = 83 'S', 
          arg1 = "MXLIB/BU\000\000\000\000\000\000\000\t\000\000\000\000\000\000\000С\312\000\000\000\000\000\020\071\323", '\000' <repeats 21 times>, "\350\303\347\277\063\000\000\000\220h\317\000\000\000\000\000\322\016\350\277\063\000\000\000\020\071\323\000\000\000\000\000\370\313\323\000\000\000\000\000\260e\271G\374\177\000\000g\000\273\262\356\177\000\000\020\071\323", '\000' <repeats 21 times>, "\020/\031\001\000\000\000\000\220h\317\000\000\000\000\000\235\370\272\262\356\177\000\000\036\000\000\000\000\000\000\000\060\267\312\000\000\000\000\000\200\273\312\000\000\000\000\000\060\267\312\000\000\000\000\000\000f\271G\374\177\000\000l\263"...}
        __FUNCTION__ = "zmqBridgeMamaSubscriptionImpl_subscribe"
#9  0x00007feeaaa253f6 in zmqBridgeMamaSubscriptionImpl_create (impl=0xd527e0, source=0x1192600 "MXLIB/BU", symbol=0x0)
    at /home/btorpey/work/OpenMAMA-zmq/nyfix/src/subscription.c:336
        s = MAMA_STATUS_OK
        __FUNCTION__ = "zmqBridgeMamaSubscriptionImpl_create"
#10 0x00007feeaaa24b56 in zmqBridgeMamaSubscription_create (subscriber=0xd52f20, source=0x1192600 "MXLIB/BU", symbol=0x0, 
    tport=0xcaa1d0, queue=0xcf8990, callback=..., subscription=0xd52f10, closure=0xd62f90)
    at /home/btorpey/work/OpenMAMA-zmq/nyfix/src/subscription.c:78
        s = MAMA_STATUS_OK
        __FUNCTION__ = "zmqBridgeMamaSubscription_create"
        impl = 0xd527e0
#11 0x00007feeb2b9561e in mamaSubscriptionImpl_completeBasicInitialisation (subscription=0xd52f10)
    at mama/c_cpp/src/c/subscription.c:3078
        cb = {onCreate = 0x7feeb29345e3 <mamaEnvSubscription_onCreateBasic>, 
          onError = 0x7feeb293471c <mamaEnvSubscription_onErrorBasic>, onMsg = 0x7feeb29347ef <mamaEnvSubscription_onMsgBasic>, 
          onQuality = 0x0, onGap = 0x0, onRecapRequest = 0x0, 
          onDestroy = 0x7feeb2b95c1f <mamaSubscriptionImpl_onSubscriptionDestroyed>}
        impl = 0xd52f10
        ret = MAMA_STATUS_SUBSCRIPTION_INVALID_STATE
#12 0x00007feeb2b95b70 in mamaSubscriptionImpl_createBasic (subscription=0xd52f10, transport=0xcaa1d0, queue=0xcf8990, 
    callbacks=0x7feeb2b38560 <sg_basicCallbacks>, topic=0xd3cbf8 "MXLIB/BU", closure=0xd62f90)
    at mama/c_cpp/src/c/subscription.c:3254
        impl = 0xd52f10
        ret = MAMA_STATUS_NO_BRIDGE_IMPL
#13 0x00007feeb2b920e8 in mamaSubscription_createBasic (subscription=0xd52f10, transport=0xcaa1d0, queue=0xcf8990, 
    callbacks=0x7feeb2b38560 <sg_basicCallbacks>, topic=0xd3cbf8 "MXLIB/BU", closure=0xd62f90)
    at mama/c_cpp/src/c/subscription.c:905
No locals.
#14 0x00007feeb2934410 in mamaEnvSubscription_create (queue=0xcf8990, source=0x0, subscription=0xd62f90, 
    symbol=0xd3cbf8 "MXLIB/BU", transport=0xcaa1d0, type=Basic) at mamaEnvSubscription.c:124
        ret = MAMA_STATUS_INVALID_ARG
#15 0x00007feeb2933d81 in mamaEnvSession_createSubscription (callback=0x7ffc47b96960, closure=0xd338e0, session=0xcf8740, 
    source=0x0, symbol=0xd3cbf8 "MXLIB/BU", transport=0xcaa1d0, type=Basic, result=0xd33910) at mamaEnvSession.c:533
        localMamaSubscription = 0x0
        subscription = 0xd62f90
        ret = MAMA_STATUS_OK
#16 0x00007feeb2933091 in mamaEnv_createBasicSubscription (callback=0x7ffc47b96a50, closure=0xd338e0, session=0xcf8740, 
    symbol=0xd3cbf8 "MXLIB/BU", transport=0xcaa1d0, subscription=0xd33910) at mamaEnvSession.c:71
        envSession = 0xcf8740
        localCallback = {m_onCreate = 0x7feeb3fc6836 <Transact::MamaListenerImpl::onCreate(mamaSubscriptionImpl_*, void*)>, 
          m_onError = 0x7feeb3fc68ba <Transact::MamaListenerImpl::onError(mamaSubscriptionImpl_*, mama_status, void*, char const*, void*)>, m_onMsgBasic = 0x7feeb3fc61b4 <Transact::MamaListenerImpl::onMsg(mamaSubscriptionImpl_*, mamaMsgImpl_*, void*, void*)>, 
          m_onMsgWildcard = 0x0}
        ret = MAMA_STATUS_NULL_ARG
#17 0x00007feeb3fc5ea8 in Transact::MamaListenerImpl::listen (this=0xd338e0, 
    subject=0x7feeb42a3161 <_ZN8TransactL19bulk_update_subjectE> "MXLIB/BU") at MamaListenerImpl.cpp:131
        cb = {onCreate = 0x7feeb3fc6836 <Transact::MamaListenerImpl::onCreate(mamaSubscriptionImpl_*, void*)>, 
          onError = 0x7feeb3fc68ba <Transact::MamaListenerImpl::onError(mamaSubscriptionImpl_*, mama_status, void*, char const*, void*)>, onMsg = 0x7feeb3fc61b4 <Transact::MamaListenerImpl::onMsg(mamaSubscriptionImpl_*, mamaMsgImpl_*, void*, void*)>, 
          onQuality = 0x0, onGap = 0x0, onRecapRequest = 0x0, 
          onDestroy = 0x7feeb3fc6972 <Transact::MamaListenerImpl::onDestroy(mamaSubscriptionImpl_*, void*)>}
        __PRETTY_FUNCTION__ = "virtual Transact::MXResult Transact::MamaListenerImpl::listen(const char*)"
        result = {m_result = 0}
#18 0x00007feeb428b764 in Transact::BulkUpdateListener::BulkUpdateListener (this=0x1192430, pSession=0xcf8710)
    at SubscriberImpl.cpp:1124
No locals.
#19 0x00007feeb428b5e0 in Transact::BulkUpdateListener::CreateBulkUpdateListener (pSession=0xcf8710) at SubscriberImpl.cpp:1108
No locals.
#20 0x00007feeb4287b97 in Transact::SubscriberImpl::initializeContent (this=0xd21df0) at SubscriberImpl.cpp:314
        locker = {mLocked = true, mLock = @0xd21e40}
#21 0x00007feeb42870ec in Transact::SubscriberImpl::subscribe (this=0xd21df0, source=0x407d79 "SUBSCRIBER_TEST", 
    key=0x407d89 "DELL", initialImage=false, imageTimeout=10, updateContent=Transact::UPDCONTENT_ASSENT, ignoreVersion=false, 
    cacheMode=Transact::CACHEMODE_IFREQUIRED, initialImageHint=false, initialImageOnly=false) at SubscriberImpl.cpp:213
        result = {m_result = 0}
#22 0x0000000000403997 in ISubscriberTest::doSubscribe (this=0x7ffc47b96ea0) at ISubscriberTest.cpp:521
        pSubscriber = {_M_ptr = 0xd21df0}
        cb = {_M_ptr = 0xd1afd0}
        result = {m_result = 0}
#23 0x0000000000403190 in ISubscriberTest::performTest2 (this=0x7ffc47b96ea0) at ISubscriberTest.cpp:391
No locals.
#24 0x0000000000402dbf in ISubscriberTest::run (this=0x7ffc47b96ea0) at ISubscriberTest.cpp:338
        result = true
        mxresult = {m_result = 0}
#25 0x0000000000402b2d in main (argc=3, argv=0x7ffc47b970e8) at ISubscriberTest.cpp:289
        subscriberTest = {m_cmdInsert = <incomplete type>, m_cmdUpdate = <incomplete type>, m_cmdDelete = <incomplete type>, 
          m_TTConn = <incomplete type>, m_subject = 0x407d89 "DELL", m_source = 0x407d79 "SUBSCRIBER_TEST", 
          m_ignoreVersions = false, m_initialImage = false, m_imageTimeout = 10, m_cacheMode = Transact::CACHEMODE_IFREQUIRED, 
          m_updateContent = Transact::UPDCONTENT_ASSENT, m_timeoutExpected = false}
        testResult = false
        progname = 0x33bfa0e420 <_dl_fixup+224> "H\211\303d\213\004%\030"
Detaching from program: /shared/work/transact/openmama/test/bin/ISubscriberTest, process 19291




> On Sep 26, 2017, at 5:50 PM, Bill Torpey <wallstprog at gmail.com> wrote:
> 
> btw, disregard that CALL_ZMQ_FUNC business — that’s just a helper macro that checks rc etc.
> 
>  void* temp = zmq_socket(impl->mZmqContext, ZMQ_PUB);
>  zmq_connect(temp, ZMQ_CONTROL_ENDPOINT);
>  zmq_pollitem_t pollitems [] = { { temp, 0, ZMQ_POLLIN, 0 } };
>  zmq_poll(pollitems, 1, 1);				// see https://github.com/zeromq/libzmq/issues/2267
>  zmq_send(temp, msg, msgSize, 0);
>  zmq_disconnect(temp, ZMQ_CONTROL_ENDPOINT);
>  zmq_close(temp);
> 
> 
>> On Sep 26, 2017, at 5:22 PM, Bill Torpey <wallstprog at gmail.com> wrote:
>> 
>> Hi All:
>> 
>> I don’t see this addressed specifically in the docs, in the GitHub issues, or here, so thought I’d ask.
>> 
>> I bind an inproc sub socket for inter-thread communication, which I include in zmq_poll.  When I want to e.g., subscribe from a different thread I create a pub socket, connect to the first and send it a command message, which then gets executed on the main zmq thread.  So far, so good.
>> 
>> What I’m wondering if it’s OK to immediately disconnect the socket after calling zmq_send, like so:
>> 
>>  void* temp = zmq_socket(impl->mZmqContext, ZMQ_PUB);
>>  zmq_connect(temp, ZMQ_CONTROL_ENDPOINT);
>>  zmq_pollitem_t pollitems [] = { { temp, 0, ZMQ_POLLIN, 0 } };
>>  CALL_ZMQ_FUNC(zmq_poll(pollitems, 1, 1));				// see https://github.com/zeromq/libzmq/issues/2267
>>  zmq_send(temp, msg, msgSize, 0);
>>  zmq_disconnect(temp, ZMQ_CONTROL_ENDPOINT);
>>  zmq_close(temp);
>> 
>> This *seems* to work, but I’m concerned that maybe I’m just getting "lucky”?  It does *not* appear to work with ZMQ_PAIR sockets, for whatever reason, although I’ve seen that approach recommended in a few places. 
>> 
>> Thanks in advance for any assistance!
>> 
>> Regards,
>> 
>> Bill Torpey
> 




More information about the zeromq-dev mailing list