[zeromq-dev] libevent + ZMQ_FD fail!

Martin Sustrik sustrik at 250bpm.com
Thu Dec 16 13:50:48 CET 2010


Hi,

I would say the problem is that you are using ZMQ_FD as if it was 
level-triggered. In fact it is edge triggered.

I.e. you should get ZMQ_EVENTS to find out whether particular 0MQ socket 
is readeable/writeable.

Further on poll(ZMQ_FD) signals event only if the state changes. For 
example if the socket was unreadable and become readable. If the socket 
was readable and remains readable, ZMQ_FD is not signaled.

Martin

On 12/16/2010 11:31 AM, kasicass wrote:
> On 2010-12-6 16:48, Martin Sustrik wrote:
>> Hi,
>>
>>> zmq 2.1.0, ZMQ_FD bug
>>> pull = zmq_bind(), push = zmq_connect(), libevent + pull, ok!
>>> push = zmq_bind(), pull = zmq_connect(), libevent + pull, fail!
>>
>> What does "fail" main?
>>
>>> example:
>>> https://github.com/kasicass/kasicass/blob/master/zmq/test.c
>>
>> Can you provide a minimal test case?
>>
>> Martin
>
> hi Martin,
>
> Here is the detailed description of the problem I encountered. Is it a bug?
>
> //// main thread (ok version) ////
> pull = zmq_socket(ZMQ_PULL)
> zmq_bind(pull)
>
> push = zmq_socket(ZMQ_PUSH)
> zmq_connect(push)
> -> zmq::socket_base_t::connect()
> -> send_bind() // send 'bind' to pull.mailbox
>
> start thread 2
>
> sleep(1)
> while (1)
> {
> zmq_send(push);
> }
>
> //// thread 2 ////
> fd = zmq_getsockopt(pull, ZMQ_FD);
>
> while (1)
> {
> select(fd for read);
> zmq_getsockopt(pull, ZMQ_EVENTS);
> while (1) zmq_recv(pull, ZMQ_NOBLOCK);
> }
>
> // ======== sequence chart ========
> //
> // thread 2 (first loop):
> // select(fd for read); // got 'bind' message
> // zmq_getsockopt(pull, ZMQ_EVENTS); // socket_base_t::process_commands()
> // // -> pull attach
> // // pull_t::xhas_in()
> // // -> fq_t::has_in()
> // // -> ypipe_t::check_read()
> // // -> c.cas (&queue.front (), NULL); // c = NULL
> // zmq_recv(pull, ZMQ_NOBLOCK); // do nothing, goto to select() again
> //
> // main thread:
> // zmq_send(push); // push_t::xsend()
> // // -> lb_t::send()
> // // -> ypipe_t::flush()
> // // -> c.cas (w, f) != w
> // // (Compare-and-swap was unseccessful because 'c' is NULL.)
> // // -> send 'activate_reader' to pull.mailbox
> //
> // second loop:
> // select(fd for read); // got 'activate_reader' message
> // zmq_getsockopt(pull, ZMQ_EVENTS); // socket_base_t::process_commands()
> // // -> fq_t::activated()
> // // pull_t::xhas_in() / set 'ZMQ_POLLIN'
> // zmq_recv(pull, ZMQ_NOBLOCK); // consuming all zmq_msg_t
>
>
> -----------------------------------------------------------------------------
>
>
> //// main thread (fail version) ////
> push = zmq_socket(ZMQ_PUSH)
> zmq_bind(push)
>
> pull = zmq_socket(ZMQ_PULL)
> zmq_connect(pull)
> -> zmq::socket_base_t::connect()
> -> send_bind() // send 'bind' to push.mailbox
>
> start thread 2
>
> sleep(1);
> while (1)
> {
> zmq_send(push); // consume the 'bind' message
> }
>
> //// thread 2 ////
> fd = zmq_getsockopt(pull, ZMQ_FD);
>
> while (1)
> {
> select(fd for read);
> zmq_getsockopt(pull, ZMQ_EVENTS);
> while (1) zmq_recv(pull, ZMQ_NOBLOCK);
> }
>
>
> // ======== sequence chart ========
> //
> // main thread:
> // zmq_send(push); // push_t::xsend()
> // // -> lb_t::send()
> // // -> ypipe_t::flush()
> // // -> c.cas (w, f) != w
> // // (Compare-and-swap was unseccessful because 'c' is NULL.)
> // // (no 'activate_reader' to pull.mailbox)
> //
> // thread 2 (first loop):
> // select(fd for read); // block here, as no 'activate_reader' message
> // ...
>




More information about the zeromq-dev mailing list