[zeromq-dev] Problems with 2.0.6 poll using C++ api
Matthew Giedt
mgiedt at gmail.com
Fri Mar 19 16:51:41 CET 2010
Here's my OS:
uname -a
Linux ubuntu 2.6.31-14-generic #48-Ubuntu SMP Fri Oct 16 14:05:01 UTC 2009
x86_64 GNU/Linux
The following shows:
1. The application I'm running
2. The output of that application
3. My modifications to dispatcher.cpp and zmq.cpp
Notice that both sockets have a thread_slot == 0 from the dispatcher but the
first socket in the pollitem_t array somehow gets this assigned to 19401264?
int main( int argc, char *argv[] )
{
int signum = 0;
zmq::message_t msg;
zmq::socket_t* sock1;
zmq::socket_t* sock2;
zmq::context_t ctx ( 1, 1, 0 );
sock1 = new zmq::socket_t( ctx, ZMQ_SUB );
sock1->connect( "tcp://127.0.0.1:5550" );
sock1->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
sock2 = new zmq::socket_t( ctx, ZMQ_SUB );
sock2->connect( "tcp://127.0.0.1:5551" );
sock2->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
zmq::pollitem_t items [ 2 ];
items[ 0 ].socket = sock1;
items[ 0 ].events = ZMQ_POLLIN;
items[ 1 ].socket = sock2;
items[ 1 ].events = ZMQ_POLLIN;
while( 1 )
{
sock1->recv( &msg );
cout << " sock1 recv: " << msg.data() << endl;
sock2->recv( &msg );
cout << " sock2 recv: " << msg.data() << endl;
cout << "poll..." << endl;
signum = zmq::poll( items, 2 );
printf( " signum = %i\n", signum );
if( items[ 0 ].revents == ZMQ_POLLIN )
{
cout << "received sock1 event!!!" << endl;
}
if( items[ 1 ].revents == ZMQ_POLLIN )
{
cout << "received sock2 event!!!" << endl;
}
}
return 0;
}
app_threads.size (): 1
current: 0
app_threads [current].associated: 0
thread_t::equal (thread_t::id (), app_threads [current].tid): 0
unused: 0
current: 1
current == app_threads.size () ?: 1
using app_threads [ 0 ] to create socket!
s->get_thread:thread_slot 0
app_threads.size (): 1
current: 0
app_threads [current].associated: 1
thread_t::equal (thread_t::id (), app_threads [current].tid): 1
unused: 1
current: 0
current == app_threads.size () ?: 0
using app_threads [ 0 ] to create socket!
s->get_thread:thread_slot 0
sock1 recv: 0x12869e8
sock2 recv: 0x128ace8
poll...
s->get_thread:thread_slot 19401264
s->get_thread:thread_slot 0
app_thread:thread_slot 19401264
terminate called after throwing an instance of 'zmq::error_t'
what(): Bad address
Aborted
dispatcher.cpp
zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
{
app_threads_sync.lock ();
// Find whether the calling thread has app_thread_t object associated
// already. At the same time find an unused app_thread_t so that it can
// be used if there's no associated object for the calling thread.
// Check whether thread ID is already assigned. If so, return it.
printf( " app_threads.size (): %i\n", (int)app_threads.size() );
app_threads_t::size_type unused = app_threads.size ();
app_threads_t::size_type current;
for (current = 0; current != app_threads.size (); current++)
{
printf( " current: %i\n", (int)current );
printf( " app_threads [current].associated: %i\n", app_threads
[current].associated );
printf( " thread_t::equal (thread_t::id (), app_threads
[current].tid): %i\n", thread_t::equal (thread_t::id (), app_threads
[current].tid) );
if (app_threads [current].associated &&
thread_t::equal (thread_t::id (), app_threads [current].tid))
break;
if (!app_threads [current].associated)
unused = current;
}
printf( " unused: %i\n", (int)unused );
printf( " current: %i\n", (int)current );
printf( " current == app_threads.size () ?: %i\n", ( current ==
app_threads.size () ) );
// If no app_thread_t is associated with the calling thread,
// associate it with one of the unused app_thread_t objects.
if (current == app_threads.size ()) {
if (unused == app_threads.size ()) {
app_threads_sync.unlock ();
errno = EMTHREAD;
return NULL;
}
app_threads [unused].associated = true;
app_threads [unused].tid = thread_t::id ();
current = unused;
}
printf( " using app_threads [ %i ] to create socket!\n", (int)current );
app_thread_t *thread = app_threads [current].app_thread;
app_threads_sync.unlock ();
socket_base_t *s = thread->create_socket (type_);
if (!s)
return NULL;
term_sync.lock ();
sockets++;
term_sync.unlock ();
printf( " s->get_thread:thread_slot %i\n\n", s->get_thread
()->get_thread_slot() );
return s;
}
zmq.cpp
// 0MQ sockets.
if (items_ [i].socket) {
// Get the app_thread the socket is living in. If there are two
// sockets in the same pollset with different app threads,
fail.
zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
printf( " s->get_thread:thread_slot %i\n", s->get_thread
()->get_thread_slot() );
if (app_thread) {
printf( " app_thread:thread_slot %i\n",
app_thread->get_thread_slot() );
if (app_thread != s->get_thread ()) {
free (pollfds);
errno = EFAULT;
return -1;
}
}
else
app_thread = s->get_thread ();
nsockets++;
continue;
}
On Thu, Mar 18, 2010 at 11:37 PM, Martin Sustrik <sustrik at 250bpm.com> wrote:
> Matthew,
>
> Yes, it looks like 0MQ believes that the two sockets belong to different
> threads. Unfortunately, the problem doesn't reproduce on my box.
>
> 1. What OS you are running on?
>
> 2. Have a look at dispatcher.cpp, lines 146-154. The code there is called
> when socket is created. It checks whether there's another socket belonging
> to the same thread and if so, it chooses to share the same app_thread_t
> object. Something probably goes wrong there. Can you check it? Maybe current
> thread ID as returned by thread_t::id() is different even if called twice
> from the same thread? Maybe thread ID comparison function, thread_t::equal()
> is broken and returns false even if the two thread IDs are the same?
>
> Martin
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20100319/69589d8b/attachment.htm>
More information about the zeromq-dev
mailing list