[zeromq-dev] Efficiency for very few short messages

dan smith dan25311 at gmail.com
Wed Jan 23 01:42:42 CET 2013


Jason,

Yes, the code bellow replicates the problem with some number crunching
instead of equation solving but the symptom is the same. On my quadcore
laptop for NEQUATION = 500 the speedup is perfect.

For the very important NEQUATION = 8 case the serial version is faster
twice.

We need to solve millions of small equations that are changing in time so I
cannot solve more of them in a thread just one. Obviously there is a fixed
cost somewhere but I am not able to figure out what is that, still 0mq is
new to me. Looks like the latency is high if I send just few messages that
I do not understand.

I use 64 bit Intel compiler.

I will appreciate any suggestion.



// pairTest.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <zmq.h>
#include <zmq_utils.h>
#include <stdio.h>
#include <stdlib.h>
#include <vector>
#include <windows.h>
#include <process.h>
#include <assert.h>

static int roundtrip_count = 0 ;
#define NEQUATION 8
#define NCYCLE 1

class ComputingThread
{

public:

ComputingThread(void * ctx, const int& id) : m_ctx(ctx), m_id(id)
{
char connText[128];


this->m_socketSend = zmq_socket(this->m_ctx, ZMQ_PAIR);
sprintf(connText,"inproc://terminate_%d", this->m_id);

int rc = zmq_bind(this->m_socketSend,connText);

}
static unsigned __stdcall ThreadStaticEntryPoint(void * pThis)
{
ComputingThread * pthX = (ComputingThread*)pThis;
int ret = pthX->worker();
return ret;
}

int worker()
{

int rc;
int j;
char connText[128];


this->m_socketReceive = zmq_socket(this->m_ctx, ZMQ_PAIR);

sprintf(connText,"inproc://connect_%d", this->m_id);

rc = zmq_connect(this->m_socketReceive,connText);

zmq_msg_t msg;

rc = zmq_msg_init(&msg);


while(true)
{
rc = zmq_recvmsg(this->m_socketReceive, &msg, 0);
int messageSize = zmq_msg_size(&msg) ;
if(messageSize == 0)
break ;
int ic1 = 0 ;
int ic2 = 0 ;
double sum = 0.0 ;
for(ic1 = 0 ; ic1 < NCYCLE ; ic1++)
{
for(ic2 = 0 ; ic2 < 100000 ; ic2++)
{
int v = rand() % 100;
sum = sum + sin(float(v)) + sqrt(float(v) + cos(sqrt(float(v)))) ;
}

}


}

zmq_msg_t msgTerm;
rc = zmq_msg_init (&msgTerm);
rc = zmq_sendmsg (this->m_socketSend, &msgTerm, 0);
zmq_msg_close(&msgTerm);


rc = zmq_msg_close (&msg);

rc = zmq_close (this->m_socketReceive);
rc = zmq_close (this->m_socketSend);



return 0;

}

private:
void * m_ctx ;
int m_id ;

void *m_socketReceive;
void *m_socketSend;


};




int main(int argc, const char* const argv[])
{
std::vector<ComputingThread * > computingThreads ;
std::vector< HANDLE> handles ;
std::vector<void *> socketsSend ;
std::vector<void *> socketsReceive ;
std::vector<zmq_msg_t> messages ;

void *watch = NULL;

int nequation = NEQUATION ;

int nthread = 8 ;
unsigned long elapsed = 0;

printf("\nSerial Started...\n") ;

watch = zmq_stopwatch_start();

int iequation = 0;
for(iequation = 0 ; iequation < nequation ; iequation++)
{

int ic1 = 0 ;
int ic2 = 0 ;
double sum = 0.0 ;
for(ic1 = 0 ; ic1 < NCYCLE ; ic1++)
{
for(ic2 = 0 ; ic2 < 100000 ; ic2++)
{
int v = rand() % 100;
sum = sum + sin(float(v)) + sqrt(float(v) + cos(sqrt(float(v)))) ;
}

}

}

unsigned long serialElapsed = zmq_stopwatch_stop(watch) / 2 ;

printf("\nSerial Done. Took %ld microsec.\n", serialElapsed) ;


void *ctx = NULL;
void * socket = NULL ;

int rc = 0 ;




ctx = zmq_init(1);

// create sockets
int ithread = 0 ;
for(ithread = 0 ; ithread < nthread ; ithread++)
{
void * socket = zmq_socket(ctx, ZMQ_PAIR);
socketsSend.push_back(socket);

socket = zmq_socket(ctx, ZMQ_PAIR);
socketsReceive.push_back(socket);


}


// connections
for(ithread = 0 ; ithread < nthread ; ithread++)
{
void * socket = socketsSend[ithread];
char connectionString[128];
sprintf(connectionString,"inproc://connect_%d",ithread);
rc = zmq_bind(socket, connectionString);

}


for(ithread = 0 ; ithread < nthread; ithread++)
{
ComputingThread * cthr = new ComputingThread(ctx, ithread);
computingThreads.push_back(cthr);
HANDLE localHandle = (HANDLE) _beginthreadex(NULL, 0,
ComputingThread::ThreadStaticEntryPoint,
cthr, 0, NULL);
if (localHandle == 0)
{
printf ("error in _beginthreadex %d\n",ithread);
return -1;
}
handles.push_back(localHandle);
}


for(ithread = 0 ; ithread < nthread ; ithread++)
{

char connectionString[128];
sprintf(connectionString,"inproc://terminate_%d",ithread);
void * socket = socketsReceive[ithread];
rc = zmq_connect (socket,connectionString);


}



int i = 0 ;
printf("\nParallel Started...\n") ;

int nfinish = nthread ;
zmq_msg_t msgTerm;
rc = zmq_msg_init (&msgTerm);


watch = zmq_stopwatch_start();

// send messages
int messageCounter = 0 ;

for(iequation = 0 ; iequation < nequation ; iequation++)
{
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
rc = zmq_msg_init_size(&msg, 8);
memset (zmq_msg_data (&msg), 'A', 8);
ithread = messageCounter % nthread ;
messageCounter++ ;
void * socket = socketsSend[ithread];
rc = zmq_sendmsg (socket, &msg, 0);
zmq_msg_close(&msg);
}


// terminate threads
for(ithread = 0; ithread < nthread ; ithread++)
{
zmq_msg_t msgTerm;
rc = zmq_msg_init (&msgTerm);
void * socket = socketsSend[ithread];
rc = zmq_sendmsg (socket, &msgTerm, 0);
zmq_msg_close(&msgTerm);

}


int flags[16] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};

while(nfinish > 0)
{
for(ithread = 0; ithread < nthread ; ithread++)
{
if(flags[ithread])
{
void * recv = socketsReceive[ithread];
rc = zmq_recvmsg (recv, &msgTerm, 0);
int messageSize = zmq_msg_size (&msgTerm) ;
if(messageSize == 0)
{
flags[ithread] = 0;
nfinish-- ;
break ;
}

}
}

}


unsigned long parallelElapsed = zmq_stopwatch_stop(watch) / 2 ;

printf("\nParallel Done. Took %ld microsec.\n", parallelElapsed) ;

double speedup = float(serialElapsed) / parallelElapsed ;

printf("\nspeedup = %g\n", speedup) ;


DWORD rc2 = WaitForMultipleObjects( nthread, &handles[0], true, INFINITE);
if (rc2 == WAIT_FAILED)
{
printf ("error in WaitForMultipleObject\n");
return -1;
}
assert( ( rc2 >= WAIT_OBJECT_0 ) && ( rc2 <= ( WAIT_OBJECT_0 + nthread - 1
) ) );





//close handles
for(ithread = 0; ithread < nthread ; ithread++)
{
HANDLE localHandle = handles[ithread];
BOOL rcc = CloseHandle (localHandle);
if (rcc == 0)
{
printf ("error in CloseHandle\n");
return -1;
}


}


for(ithread = 0 ; ithread < nthread ; ithread++)
{
void * socket = socketsSend[ithread];
rc = zmq_close (socket);
socket = socketsReceive[ithread];
rc = zmq_close (socket);


}

zmq_term(ctx);

return 0;
}



On Tue, Jan 22, 2013 at 3:39 PM, Jason Smith <jason.nevar.smith at gmail.com>wrote:

> Do you have some code to share? Particularly the ZMQ socket connection and
> creation.
>
> On another thought, how is the "Finish" determined? Do the threads end, or
> do they continue to wait for another message? Is a "finished" message sent
> to the main thread?
>
>
> On 22 January 2013 00:06, dan smith <dan25311 at gmail.com> wrote:
>
>> more precisely: 'I do not know how to debug that further'
>>
>>
>>
>> On Mon, Jan 21, 2013 at 2:49 AM, dan smith <dan25311 at gmail.com> wrote:
>>
>>>
>>> Claudio,
>>>
>>> Thanks for your answer.
>>>
>>> When it comes to 8, the timing changes randomly, sometime times is less
>>> than the time needed to solve 80 equations. I did 4 runs again, the times
>>> were between 115000 and 125000. This program is very simple and each and
>>> every thread does the very same thing using the very same code, no
>>> difference between the threads. I do know how to debug that further. The
>>>  main thread just sends the pointer and the worker thread solves it after
>>> receiving it, 3 lines are relevant.The thread is a basic windows thread
>>> like in inproc_lat.
>>>
>>>
>>> On Mon, Jan 21, 2013 at 2:13 AM, Claudio Carbone <erupter at libero.it>wrote:
>>>
>>>> dan smith <dan25311 at gmail.com> wrote:
>>>> >80 times: 64614micros and 134429micros, the serial is already faster.
>>>> >
>>>> >Going down to 8: 6345 and 328286...
>>>> >
>>>>
>>>> There must be something wrong if it takes less to solve 80eqs than 8,
>>>> no matter what.
>>>> Why don't you save/print split times for each eq and each phase of the
>>>> program?
>>>> With 8 it isn't crazy to analyze the numbers.
>>>>
>>>>
>>>> Claudio
>>>> -- Sent from my ParanoidAndroid Galaxy Nexus with K-9 Mail.
>>>>
>>>> _______________________________________________
>>>> zeromq-dev mailing list
>>>> zeromq-dev at lists.zeromq.org
>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>>
>>>>
>>>
>>
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev at lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
>>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev at lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20130122/5b31688d/attachment.htm>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: pairTest.cpp
Type: text/x-c++src
Size: 6477 bytes
Desc: not available
URL: <https://lists.zeromq.org/pipermail/zeromq-dev/attachments/20130122/5b31688d/attachment.cpp>


More information about the zeromq-dev mailing list