dan smith
Wed Jan 23 01:42:42 CET 2013


Yes, the code bellow replicates the problem with some number crunching
instead of equation solving but the symptom is the same. On my quadcore
laptop for NEQUATION = 500 the speedup is perfect.

For the very important NEQUATION = 8 case the serial version is faster

We need to solve millions of small equations that are changing in time so I
cannot solve more of them in a thread just one. Obviously there is a fixed
cost somewhere but I am not able to figure out what is that, still 0mq is
new to me. Looks like the latency is high if I send just few messages that
I do not understand.

I use 64 bit Intel compiler.

I will appreciate any suggestion.

// pairTest.cpp : Defines the entry point for the console application.

#include "stdafx.h"
#include <zmq.h>
#include <zmq_utils.h>
#include <stdio.h>
#include <stdlib.h>
#include <vector>
#include <windows.h>
#include <process.h>
#include <assert.h>

static int roundtrip_count = 0 ;
#define NEQUATION 8
#define NCYCLE 1

class ComputingThread


ComputingThread(void * ctx, const int& id) : m_ctx(ctx), m_id(id)
char connText[128];

this->m_socketSend = zmq_socket(this->m_ctx, ZMQ_PAIR);
sprintf(connText,"inproc://terminate_%d", this->m_id);

int rc = zmq_bind(this->m_socketSend,connText);

static unsigned __stdcall ThreadStaticEntryPoint(void * pThis)
ComputingThread * pthX = (ComputingThread*)pThis;
int ret = pthX->worker();
return ret;

int worker()

int rc;
int j;
char connText[128];

this->m_socketReceive = zmq_socket(this->m_ctx, ZMQ_PAIR);

sprintf(connText,"inproc://connect_%d", this->m_id);

rc = zmq_connect(this->m_socketReceive,connText);

zmq_msg_t msg;

rc = zmq_msg_init(&msg);

rc = zmq_recvmsg(this->m_socketReceive, &msg, 0);
int messageSize = zmq_msg_size(&msg) ;
if(messageSize == 0)
break ;
int ic1 = 0 ;
int ic2 = 0 ;
double sum = 0.0 ;
for(ic1 = 0 ; ic1 < NCYCLE ; ic1++)
for(ic2 = 0 ; ic2 < 100000 ; ic2++)
int v = rand() % 100;
sum = sum + sin(float(v)) + sqrt(float(v) + cos(sqrt(float(v)))) ;



zmq_msg_t msgTerm;
rc = zmq_msg_init (&msgTerm);
rc = zmq_sendmsg (this->m_socketSend, &msgTerm, 0);

rc = zmq_msg_close (&msg);

rc = zmq_close (this->m_socketReceive);
rc = zmq_close (this->m_socketSend);

return 0;


void * m_ctx ;
int m_id ;

void *m_socketReceive;
void *m_socketSend;


int main(int argc, const char* const argv[])
std::vector<ComputingThread * > computingThreads ;
std::vector< HANDLE> handles ;
std::vector<void *> socketsSend ;
std::vector<void *> socketsReceive ;
std::vector<zmq_msg_t> messages ;

void *watch = NULL;

int nequation = NEQUATION ;

int nthread = 8 ;
unsigned long elapsed = 0;

printf("\nSerial Started...\n") ;

watch = zmq_stopwatch_start();

int iequation = 0;
for(iequation = 0 ; iequation < nequation ; iequation++)

int ic1 = 0 ;
int ic2 = 0 ;
double sum = 0.0 ;
for(ic1 = 0 ; ic1 < NCYCLE ; ic1++)
for(ic2 = 0 ; ic2 < 100000 ; ic2++)
int v = rand() % 100;
sum = sum + sin(float(v)) + sqrt(float(v) + cos(sqrt(float(v)))) ;



unsigned long serialElapsed = zmq_stopwatch_stop(watch) / 2 ;

printf("\nSerial Done. Took %ld microsec.\n", serialElapsed) ;

void *ctx = NULL;
void * socket = NULL ;

int rc = 0 ;

ctx = zmq_init(1);

// create sockets
int ithread = 0 ;
for(ithread = 0 ; ithread < nthread ; ithread++)
void * socket = zmq_socket(ctx, ZMQ_PAIR);

socket = zmq_socket(ctx, ZMQ_PAIR);


// connections
for(ithread = 0 ; ithread < nthread ; ithread++)
void * socket = socketsSend[ithread];
char connectionString[128];
rc = zmq_bind(socket, connectionString);


for(ithread = 0 ; ithread < nthread; ithread++)
ComputingThread * cthr = new ComputingThread(ctx, ithread);
HANDLE localHandle = (HANDLE) _beginthreadex(NULL, 0,
cthr, 0, NULL);
if (localHandle == 0)
printf ("error in _beginthreadex %d\n",ithread);
return -1;

for(ithread = 0 ; ithread < nthread ; ithread++)

char connectionString[128];
void * socket = socketsReceive[ithread];
rc = zmq_connect (socket,connectionString);


int i = 0 ;
printf("\nParallel Started...\n") ;

int nfinish = nthread ;
zmq_msg_t msgTerm;
rc = zmq_msg_init (&msgTerm);

watch = zmq_stopwatch_start();

// send messages
int messageCounter = 0 ;

for(iequation = 0 ; iequation < nequation ; iequation++)
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
rc = zmq_msg_init_size(&msg, 8);
memset (zmq_msg_data (&msg), 'A', 8);
ithread = messageCounter % nthread ;
messageCounter++ ;
void * socket = socketsSend[ithread];
rc = zmq_sendmsg (socket, &msg, 0);

// terminate threads
for(ithread = 0; ithread < nthread ; ithread++)
zmq_msg_t msgTerm;
rc = zmq_msg_init (&msgTerm);
void * socket = socketsSend[ithread];
rc = zmq_sendmsg (socket, &msgTerm, 0);


int flags[16] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};

while(nfinish > 0)
for(ithread = 0; ithread < nthread ; ithread++)
void * recv = socketsReceive[ithread];
rc = zmq_recvmsg (recv, &msgTerm, 0);
int messageSize = zmq_msg_size (&msgTerm) ;
if(messageSize == 0)
flags[ithread] = 0;
nfinish-- ;
break ;



unsigned long parallelElapsed = zmq_stopwatch_stop(watch) / 2 ;

printf("\nParallel Done. Took %ld microsec.\n", parallelElapsed) ;

double speedup = float(serialElapsed) / parallelElapsed ;

printf("\nspeedup = %g\n", speedup) ;

DWORD rc2 = WaitForMultipleObjects( nthread, &handles[0], true, INFINITE);
if (rc2 == WAIT_FAILED)
printf ("error in WaitForMultipleObject\n");
return -1;
assert( ( rc2 >= WAIT_OBJECT_0 ) && ( rc2 <= ( WAIT_OBJECT_0 + nthread - 1
) ) );

//close handles
for(ithread = 0; ithread < nthread ; ithread++)
HANDLE localHandle = handles[ithread];
BOOL rcc = CloseHandle (localHandle);
if (rcc == 0)
printf ("error in CloseHandle\n");
return -1;


for(ithread = 0 ; ithread < nthread ; ithread++)
void * socket = socketsSend[ithread];
rc = zmq_close (socket);
socket = socketsReceive[ithread];
rc = zmq_close (socket);



return 0;

