package Algorithm::TokenBucket;
use 5.006;
our $VERSION = 0.32;
use warnings;
use strict;
BEGIN {
eval { require Time::HiRes; import Time::HiRes 'time' } # use if available
}
=head1 NAME
Algorithm::TokenBucket - Token bucket rate limiting algorithm
=head1 SYNOPSIS
use Algorithm::TokenBucket;
# configure a bucket to limit a stream up to 100 items per hour
# with bursts of 5 items max
my $bucket = new Algorithm::TokenBucket 100 / 3600, 5;
# wait till we're allowed to process 3 items
until ($bucket->conform(3)) {
sleep 0.1;
# do things
}
# process 3 items because we now can
process(3);
# leak (flush) bucket
$bucket->count(3); # or, e.g. $bucket->count(1) for 1..3;
if ($bucket->conform(10)) {
die for 'truth';
# because the bucket with a burst size of 5
# will never conform to 10
}
my $time = Time::HiRes::time;
while (Time::HiRes::time - $time < 7200) { # two hours
# be bursty
if ($bucket->conform(5)) {
process(5);
$bucket->count(5);
}
}
# we're likely to have processed 200 items (and hogged CPU, btw)
Storable::store $bucket, 'bucket.stored';
my $bucket1 = new Algorithm::TokenBucket
@{Storable::retrieve('bucket.stored')};
=head1 DESCRIPTION
Token bucket algorithm is a flexible way of imposing a rate limit
against a stream of items. It is also very easy to combine several
rate-limiters in an C<AND> or C<OR> fashion.
Each bucket has a memory footprint of constant size because the
algorithm is based on C<information rate>. This was my main motivation to
implement it. Other rate limiters on CPAN keep track of I<ALL> incoming
items in memory. It allows them to be precisely accurate.
FYI, C<conform>, C<count>, C<information rate>, C<burst size> terms are
shamelessly borrowed from http://linux-ip.net/gl/tcng/node62.html
page.
=head1 INTERFACE
=cut
use fields qw/info_rate burst_size _tokens _last_check_time/;
=head2 METHODS
=over 4
=item new($$;$$)
The constructor takes as parameters at least C<rate of information> in
items per second and C<burst size> in items. It can also take current
token counter and last check time but this usage is reserved for
restoring a saved bucket, beware. See L</state>.
=cut
sub new {
my $class = shift;
fields::new($class)->_init(@_);
}
sub _init {
my Algorithm::TokenBucket $self = shift;
@$self{qw/info_rate burst_size _tokens _last_check_time/} = @_;
$self->{_last_check_time} ||= time;
$self->{_tokens} ||= 0;
return $self;
}
=item state()
This method returns the state of the bucket as a list. Use it for storing purposes.
Buckets also natively support freezing and thawing with L<Storable> by
providing STORABLE_* callbacks.
=cut
sub state {
my Algorithm::TokenBucket $self = shift;
return @$self{qw/info_rate burst_size _tokens _last_check_time/};
}
use constant PACK_FORMAT => "d4"; # "F4" is not 5.6 compatible
sub STORABLE_freeze {
my ( $self, $cloning ) = @_;
return pack(PACK_FORMAT(),$self->state);
}
sub STORABLE_thaw {
my ( $self, $cloning, $state ) = @_;
return $self->_init(unpack(PACK_FORMAT(),$state));
}
sub _token_flow {
my Algorithm::TokenBucket $self = shift;
my $time = time;
$self->{_tokens} += ($time - $self->{_last_check_time}) * $self->{info_rate};
$self->{_tokens} > $self->{burst_size} and $self->{_tokens} = $self->{burst_size};
$self->{_last_check_time}= $time;
}
=item conform($)
This sub checks if the bucket contains at least I<N> tokens. In that
case it is allowed to transmit (or process) I<N> items (not
exactly right because I<N> can be fractional) from the stream. A bucket never
conforms to an I<N> greater than C<burst size>.
It returns a boolean value.
=cut
sub conform {
my Algorithm::TokenBucket $self = shift;
my $size = shift;
$self->_token_flow;
return $self->{_tokens} >= $size;
}
=item count($)
This sub removes I<N> (or all if there are less than I<N> available) tokens from the bucket.
Does not return a meaningful value.
=cut
sub count {
my Algorithm::TokenBucket $self = shift;
my $size = shift;
$self->_token_flow;
($self->{_tokens} -= $size) < 0 and $self->{_tokens} = 0;
}
=item until($)
This sub returns the number of seconds until I<N> tokens can be removed from the bucket.
It's especially useful in multitasking environments like L<POE> where you
cannot busy-wait. One can safely schedule the next conform($N) check in until($N)
seconds instead of checking repeatedly.
Note that until() does not take into account C<burst size>. That means
a bucket will not conform to I<N> even after sleeping for until($N)
seconds if I<N> is greater than C<burst size>.
=cut
sub until {
my Algorithm::TokenBucket $self = shift;
my $size = shift;
$self->_token_flow;
if ( $self->{_tokens} >= $size ) {
# can conform() right now
return 0;
} else {
my $needed = $size - $self->{_tokens};
return ( $needed / $self->{info_rate} );
}
}
1;
__END__
=back
=head1 EXAMPLES
Think a rate limiter for a mail sending application. We'd like to
allow 2 mails per minute but no more than 20 mails per hour.
Go, go, go!
my $rl1 = new Algorithm::TokenBucket 2/60, 1;
my $rl2 = new Algorithm::TokenBucket 20/3600, 10;
# "bursts" of 10 to ease the lag but $rl1 enforces
# 2 per minute, so it won't flood
while (my $mail = get_next_mail) {
until ($rl1->conform(1) && $rl2->conform(1)) {
busy_wait;
}
$mail->take_off;
$rl1->count(1); $rl2->count(1);
}
Now, let's fix the CPU-hogging example from L</SYNOPSIS> using
L</until()> method.
my $bucket = new Algorithm::TokenBucket 100 / 3600, 5;
my $time = Time::HiRes::time;
while (Time::HiRes::time - $time < 7200) { # two hours
# be bursty
Time::HiRes::sleep $bucket->until(5);
if ($bucket->conform(5)) { # should always be true
process(5);
$bucket->count(5);
}
}
# we're likely to have processed 200 items (without hogging the CPU)
=head1 BUGS
Works unreliably for fractional rates unless Time::HiRes is present.
Documentation lacks the actual algorithm description. See links or read
the source (there are about 20 lines of sparse perl in several subs, trust me).
until($N) does not return infinity if $N is greater than C<burst
size>. Sleeping for infinity seconds is both useless and hard to
debug.
=head1 ACKNOWLEDGMENTS
Yuval Kogman contributed the L</until> method, proper L<Storable> support
and other things.
=head1 AUTHOR
Alex Kapranoff, E<lt>kappa@rambler-co.ruE<gt>
=head1 SEE ALSO
http://www.eecs.harvard.edu/cs143/assignments/pa1/,
http://en.wikipedia.org/wiki/Token_bucket,
http://linux-ip.net/gl/tcng/node54.html,
http://linux-ip.net/gl/tcng/node62.html,
L<Schedule::RateLimit>, L<Algorithm::FloodControl>.
=cut