[zeromq-dev] Problems with 2.0.6 poll using C++ api

Matthew Giedt mgiedt at gmail.com
Fri Mar 19 16:51:41 CET 2010


Here's my OS:

uname -a
Linux ubuntu 2.6.31-14-generic #48-Ubuntu SMP Fri Oct 16 14:05:01 UTC 2009
x86_64 GNU/Linux

The following shows:

1. The application I'm running
2. The output of that application
3. My modifications to dispatcher.cpp and zmq.cpp

Notice that both sockets have a thread_slot == 0 from the dispatcher but the
first socket in the pollitem_t array somehow gets this assigned to 19401264?

 int main( int argc, char *argv[] )
{
    int signum = 0;
    zmq::message_t msg;
    zmq::socket_t* sock1;
    zmq::socket_t* sock2;
    zmq::context_t ctx ( 1, 1, 0 );

    sock1 = new zmq::socket_t( ctx, ZMQ_SUB );
    sock1->connect( "tcp://127.0.0.1:5550" );
    sock1->setsockopt( ZMQ_SUBSCRIBE, "", 0 );

    sock2 = new zmq::socket_t( ctx, ZMQ_SUB );
    sock2->connect( "tcp://127.0.0.1:5551" );
    sock2->setsockopt( ZMQ_SUBSCRIBE, "", 0 );

    zmq::pollitem_t items [ 2 ];
    items[ 0 ].socket = sock1;
    items[ 0 ].events = ZMQ_POLLIN;
    items[ 1 ].socket = sock2;
    items[ 1 ].events = ZMQ_POLLIN;

    while( 1 )
    {
        sock1->recv( &msg );
        cout << " sock1 recv: " << msg.data() << endl;

        sock2->recv( &msg );
        cout << " sock2 recv: " << msg.data() << endl;

        cout << "poll..." << endl;
        signum = zmq::poll( items, 2 );

        printf( " signum = %i\n", signum );

        if( items[ 0 ].revents == ZMQ_POLLIN )
        {
            cout << "received sock1 event!!!" << endl;
        }

        if( items[ 1 ].revents == ZMQ_POLLIN )
        {
            cout << "received sock2 event!!!" << endl;
        }
    }

    return 0;
}


 app_threads.size (): 1
  current: 0
   app_threads [current].associated: 0
   thread_t::equal (thread_t::id (), app_threads [current].tid): 0
 unused: 0
 current: 1
 current == app_threads.size () ?: 1
 using app_threads [ 0 ] to create socket!
 s->get_thread:thread_slot 0

 app_threads.size (): 1
  current: 0
   app_threads [current].associated: 1
   thread_t::equal (thread_t::id (), app_threads [current].tid): 1
 unused: 1
 current: 0
 current == app_threads.size () ?: 0
 using app_threads [ 0 ] to create socket!
 s->get_thread:thread_slot 0

 sock1 recv: 0x12869e8
 sock2 recv: 0x128ace8
poll...

 s->get_thread:thread_slot 19401264
 s->get_thread:thread_slot 0
 app_thread:thread_slot 19401264

terminate called after throwing an instance of 'zmq::error_t'
  what():  Bad address
Aborted


dispatcher.cpp

zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
{
    app_threads_sync.lock ();

    //  Find whether the calling thread has app_thread_t object associated
    //  already. At the same time find an unused app_thread_t so that it can
    //  be used if there's no associated object for the calling thread.
    //  Check whether thread ID is already assigned. If so, return it.

    printf( " app_threads.size (): %i\n", (int)app_threads.size() );

    app_threads_t::size_type unused = app_threads.size ();
    app_threads_t::size_type current;
    for (current = 0; current != app_threads.size (); current++)
    {
        printf( "  current: %i\n", (int)current );
        printf( "   app_threads [current].associated: %i\n", app_threads
[current].associated );
        printf( "   thread_t::equal (thread_t::id (), app_threads
[current].tid): %i\n", thread_t::equal (thread_t::id (), app_threads
[current].tid) );

        if (app_threads [current].associated &&
              thread_t::equal (thread_t::id (), app_threads [current].tid))
            break;
        if (!app_threads [current].associated)
            unused = current;
    }

    printf( " unused: %i\n", (int)unused );
    printf( " current: %i\n", (int)current );
    printf( " current == app_threads.size () ?: %i\n", ( current ==
app_threads.size () ) );

    //  If no app_thread_t is associated with the calling thread,
    //  associate it with one of the unused app_thread_t objects.
    if (current == app_threads.size ()) {
        if (unused == app_threads.size ()) {
            app_threads_sync.unlock ();
            errno = EMTHREAD;
            return NULL;
        }
        app_threads [unused].associated = true;
        app_threads [unused].tid = thread_t::id ();
        current = unused;
    }

    printf( " using app_threads [ %i ] to create socket!\n", (int)current );

    app_thread_t *thread = app_threads [current].app_thread;
    app_threads_sync.unlock ();

    socket_base_t *s = thread->create_socket (type_);
    if (!s)
        return NULL;

    term_sync.lock ();
    sockets++;
    term_sync.unlock ();

    printf( " s->get_thread:thread_slot %i\n\n", s->get_thread
()->get_thread_slot() );

    return s;
}

zmq.cpp

        //  0MQ sockets.
        if (items_ [i].socket) {

            //  Get the app_thread the socket is living in. If there are two
            //  sockets in the same pollset with different app threads,
fail.
            zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;

            printf( " s->get_thread:thread_slot %i\n", s->get_thread
()->get_thread_slot() );

            if (app_thread) {

                printf( " app_thread:thread_slot %i\n",
app_thread->get_thread_slot() );

                if (app_thread != s->get_thread ()) {
                    free (pollfds);
                    errno = EFAULT;
                    return -1;
                }
            }
            else
                app_thread = s->get_thread ();

            nsockets++;
            continue;
        }


On Thu, Mar 18, 2010 at 11:37 PM, Martin Sustrik <sustrik at 250bpm.com> wrote:

> Matthew,
>
> Yes, it looks like 0MQ believes that the two sockets belong to different
>  threads. Unfortunately, the problem doesn't reproduce on my box.
>
> 1. What OS you are running on?
>
> 2. Have a look at dispatcher.cpp, lines 146-154. The code there is called
> when socket is created. It checks whether there's another socket belonging
> to the same thread and if so, it chooses to share the same app_thread_t
> object. Something probably goes wrong there. Can you check it? Maybe current
> thread ID as returned by thread_t::id() is different even if called twice
> from the same thread? Maybe thread ID comparison function, thread_t::equal()
> is broken and returns false even if the two thread IDs are the same?
>
> Martin
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20100319/69589d8b/attachment.htm>


More information about the zeromq-dev mailing list