[zeromq-dev] rc == 0 (./zmq/mutex.hpp:94)

Aamir M aamirjvm at gmail.com
Mon Jul 13 23:51:45 CEST 2009


Hi Martin,

I applied the patch and it fixed the bug! Thanks!

I also tested this problem in x86 by including C++ compiler flag
-DZMQ_FORCE_MUTEXES ... as expected, the error occurs on x86 as well.
And it is resolved by applying your patch.

Attached is a cleaned-up version of the example code. Is there any
practical difference between putting msg_out inside the while loop
versus outside (other than possible performance improvement by putting
it outside)? Is there any pitfall to be aware of if msg_out is
declared outside the while loop in broker_loop and then re-used?

On a related note ....

Do you know off-hand what kind of performance gains are possible by
using the inline atomic assembly instructions instead of pthread
mutexes?

Let us say that the performance gain is significant and let us further
say that I can find the resources to get a PowerPC lock-free version
of atomic_bitmap.hpp, atomic_counter.hpp and atomic_ptr.hpp ... Would
you guys have any way of incorporating the ppc inline assembly into
the 0MQ trunk? There could be testing problems if none of the 0MQ
developers has a PowerPC chip to work on? .. You could potentially use
a PlayStation3 with Linux to do the testing ... PS3 is a relatively
cheap PowerPC environment, though setting up the build environment
might be a pain.

Thanks,
Aamir

On Mon, Jul 13, 2009 at 5:29 PM, Martin Hurton<hurtonm at gmail.com> wrote:
> Hi Aamir,
>
> Can you please try the patch attached.
>
> Regards,
> Martin
>
> On Mon, Jul 13, 2009 at 9:00 PM, Aamir M<aamirjvm at gmail.com> wrote:
>> The attachment got scrubbed out again... Here is the code:
>>
>> // BEGIN CODE
>>
>> #include <zmq.hpp>
>> #include <pthread.h>
>> #include <assert.h>
>> #include <stdio.h>
>> #include <unistd.h>
>>
>> pthread_t listener_thread;
>> pthread_t broker_thread;
>>
>> void* listener_loop( void* arg )
>> {
>>        zmq::dispatcher_t* dispatcher = (zmq::dispatcher_t*)arg;
>>        zmq::locator_t locator( NULL );
>>        zmq::api_thread_t* api = zmq::api_thread_t::create( dispatcher, &locator );
>>
>>        api->create_queue("Q2", zmq::scope_local);
>>        api->bind( "E2", "Q2", NULL, NULL );
>>
>>        zmq::message_t msg;
>>
>>        int qid;
>>        while(true)
>>        {
>>                qid = api->receive( &msg, true );
>>                printf("%s", (char*)msg.data());
>>        }
>> }
>>
>> void* broker_loop( void* arg )
>> {
>>
>>        zmq::dispatcher_t* dispatcher = (zmq::dispatcher_t*)arg;
>>        zmq::locator_t locator( NULL );
>>        zmq::api_thread_t* api = zmq::api_thread_t::create( dispatcher, &locator );
>>
>>
>>        int eid2 = api->create_exchange("E2", zmq::scope_process,
>>                                                                   NULL, NULL, 0, NULL, zmq::style_data_distribution );
>>
>>        api->create_queue("Q1", zmq::scope_local);
>>        api->bind( "E1", "Q1", NULL, NULL );
>>
>>        int qid;
>>        zmq::message_t msg_in;
>>        //zmq::message_t msg_out( 64 );
>>    while(true)
>>        {
>>                qid = api->receive( &msg_in, true );
>>
>>                zmq::message_t msg_out( 64 );
>>                api->send( eid2, msg_out, false );
>>        }
>>
>> }
>>
>>
>> int main()
>> {
>>        zmq::locator_t locator( NULL );
>>        zmq::dispatcher_t dispatcher( 4 );
>>        zmq::api_thread_t* api = zmq::api_thread_t::create( &dispatcher, &locator );
>>
>>        int eid1 = api->create_exchange("E1", zmq::scope_process,
>>                                                                   NULL, NULL, 0, NULL, zmq::style_data_distribution );
>>
>>        int rc;
>>
>>        rc = pthread_create( &broker_thread, NULL, &broker_loop, &dispatcher );
>>        assert( rc == 0 );
>>        sleep(1); /// wait for exchange E2 to be created
>>
>>        rc = pthread_create( &listener_thread, NULL, &listener_loop, &dispatcher );
>>        assert( rc == 0 );
>>
>>        rc = pthread_create( &listener_thread, NULL, &listener_loop, &dispatcher );
>>        assert( rc == 0 );
>>
>>        printf("go\n");
>>        while(true)
>>        {
>>                zmq::message_t msg( 8 );
>>                sprintf( (char*)msg.data(), "%s\n", "hello");
>>                api->send( eid1, msg );
>>        }
>>
>> }
>>
>> // END CODE
>>
>> On Mon, Jul 13, 2009 at 2:57 PM, Aamir M<aamirjvm at gmail.com> wrote:
>>> Hi Martin & Martin,
>>>
>>> I've come up with some stand-alone code that I think reproduces the
>>> mutex error I was describing (please see attached C++ code). If you
>>> compile and run this code (requires Linux, 0MQ 1.0.0), you should
>>> eventually see this output:
>>>
>>> go
>>> Invalid argument (./zmq/mutex.hpp:97)
>>> Invalid argument (./zmq/mutex.hpp:97)
>>> Aborted
>>>
>>> The offending line of code appears to be zmq_mutex_error.cpp:50:
>>>
>>> zmq::message_t msg_out( 64 );
>>>
>>> If I comment out this line and instead uncomment line 45 (which simply
>>> moves the declaration of msg_out to outside of the while loop) then
>>> the mutex error does not occur. I'm a little hazy on why this is
>>> happening and what the proper 0MQ library usage should be in this
>>> case.
>>>
>>> Thanks,
>>> Aamir
>>>
>>>
>>>
>>> On Mon, Jul 13, 2009 at 11:34 AM, Martin Sustrik<sustrik at fastmq.com> wrote:
>>>> Martin Hurton wrote:
>>>>>
>>>>> Hi Aamir,
>>>>>
>>>>> Please apply the attached patch to 0.6.1 tree and let us know what's
>>>>> printed when the assertion fails.
>>>>
>>>> Actually, I've applied similar patch to v1.0.0, so you may alternatively use
>>>> version 1.0.0 instead of 0.6.1.
>>>>
>>>> Martin
>>>>
>>>>
>>>
>>
>
-------------- next part --------------
#include <zmq.hpp>
#include <pthread.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>

#define	BREAK_ME

pthread_t listener_thread;
pthread_t broker_thread;

void* listener_loop( void* arg )
{
	zmq::dispatcher_t* dispatcher = (zmq::dispatcher_t*)arg;
	zmq::locator_t locator( NULL );
	zmq::api_thread_t* api = zmq::api_thread_t::create( dispatcher, &locator );

	api->create_queue("Q", zmq::scope_local);
	api->bind( "E2", "Q", NULL, NULL );

	zmq::message_t msg;

	int qid;
	while(true)
	{
		qid = api->receive( &msg, true );
		printf("%s", (char*)msg.data());
	}
}

void* broker_loop( void* arg )
{

	zmq::dispatcher_t* dispatcher = (zmq::dispatcher_t*)arg;
	zmq::locator_t locator( NULL );
	zmq::api_thread_t* api = zmq::api_thread_t::create( dispatcher, &locator );
	
	
	int eid2 = api->create_exchange("E2", zmq::scope_process, 
			NULL, NULL, 0, NULL, zmq::style_data_distribution );

	api->create_queue("Q", zmq::scope_local);
	api->bind( "E1", "Q", NULL, NULL );
	
	int qid;
	zmq::message_t msg_in;

#ifndef BREAK_ME
	zmq::message_t msg_out( 64 );
#endif
	while(true)
	{
#ifdef BREAK_ME    
		zmq::message_t msg_out( 64 );
#endif		
		qid = api->receive( &msg_in, true );

		sprintf( (char*)msg_out.data(), "hello %s", (char*)msg_in.data());

		api->send( eid2, msg_out, false );
	}

}


int main()
{
	zmq::locator_t locator( NULL );
	zmq::dispatcher_t dispatcher( 4 );
	zmq::api_thread_t* api = zmq::api_thread_t::create( &dispatcher, &locator );

	int eid1 = api->create_exchange("E1", zmq::scope_process, 
			NULL, NULL, 0, NULL, zmq::style_data_distribution );

	int rc;

	rc = pthread_create( &broker_thread, NULL, &broker_loop, &dispatcher );
	assert( rc == 0 );
	sleep(1); /// wait for exchange E2 to be created

	rc = pthread_create( &listener_thread, NULL, &listener_loop, &dispatcher );
	assert( rc == 0 );

	rc = pthread_create( &listener_thread, NULL, &listener_loop, &dispatcher );
	assert( rc == 0 );
	
	printf("go\n");
	int i;
	while(true)
	{
		zmq::message_t msg( 8 );
		sprintf( (char*)msg.data(), "%d\n", i);
		api->send( eid1, msg, false );
		i++;
	}

}



More information about the zeromq-dev mailing list