[zeromq-dev] Bug in rate control?
Martin Sustrik
sustrik at 250bpm.com
Sun Dec 27 14:59:08 CET 2009
Hi Steven,
> 2009/12/26 Martin Sustrik <sustrik at 250bpm.com
> <mailto:sustrik at 250bpm.com>>
>
> Hi Steven,
>
> It seems there's a problem with OpenPGM rate control. When I am
> calling pgm_send in a tight loop everything works well till rate
> limit is reached. From that point on rate limit is always
> reported as exceeded. However, if I pause the execution for a
> while, everything works OK afterwards. Maybe a rounding error?
>
>
> I updated the unit tests for the rate control engine and things seem
> to be ok for both second and millisecond resolution bucket fill
> rates. Are there specific values you are seeing problems with?
>
>
> Or it could be very a unstable clock, try using the TSC instead of
> gettimeofday().
I don't know what exactly is the problem.
I suspect that when pgm_rate_check is called in very tight loop and the
limit is exceeded (i.e. the function returns FALSE) the bucket is filled
by increments of zero - presumably because of a rounding issue - and
thus it is not being re-filled at all.
Calculating refill increment based on the time of last _successful_ call
to pgm_rate_check seems to solve the issue:
gboolean
pgm_rate_check (
rate_t* bucket,
const guint data_size,
const gboolean is_nonblocking
)
{
/* pre-conditions */
g_assert (NULL != bucket);
g_assert (data_size > 0);
if (0 == bucket->rate_per_sec)
return TRUE;
g_static_mutex_lock (&bucket->mutex);
pgm_time_t now = pgm_time_update_now();
pgm_time_t time_since_last_rate_check = now -
bucket->last_rate_check;
gint old_rate_limit = bucket->rate_limit;
bucket->rate_limit += (double)bucket->rate_per_sec *
pgm_to_secsf((double)time_since_last_rate_check);
if (RESOLUTION_SECOND == bucket->rate_resolution) {
/* per second */
if (bucket->rate_limit > bucket->rate_per_sec)
bucket->rate_limit = bucket->rate_per_sec;
} else {
/* per milli-second */
if (bucket->rate_limit > (bucket->rate_per_sec / 1000))
bucket->rate_limit = bucket->rate_per_sec / 1000;
}
const gint new_rate_limit = bucket->rate_limit - (
bucket->iphdr_len + data_size );
if (is_nonblocking && new_rate_limit < 0) {
bucket->rate_limit = old_rate_limit;
g_static_mutex_unlock (&bucket->mutex);
return FALSE;
}
bucket->last_rate_check = now;
bucket->rate_limit = new_rate_limit;
if (bucket->rate_limit < 0) {
gint sleep_amount;
do {
g_thread_yield();
now = pgm_time_update_now();
time_since_last_rate_check = now -
bucket->last_rate_check;
sleep_amount = (double)bucket->rate_per_sec *
(double)pgm_to_secs((double)time_since_last_rate_check);
} while (sleep_amount + bucket->rate_limit < 0);
bucket->rate_limit += sleep_amount;
bucket->last_rate_check = now;
}
g_static_mutex_unlock (&bucket->mutex);
return TRUE;
}
Martin
More information about the zeromq-dev
mailing list