The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Algorithm::TokenBucket;

use 5.006;

our $VERSION = 0.36;

use warnings;
use strict;

BEGIN {
    eval { require Time::HiRes; import Time::HiRes 'time' } # use if available
}

=head1 NAME

Algorithm::TokenBucket - Token bucket rate limiting algorithm

=head1 SYNOPSIS
    
    use Algorithm::TokenBucket;

    # configure a bucket to limit a stream up to 100 items per hour
    # with bursts of 5 items max
    my $bucket = new Algorithm::TokenBucket 100 / 3600, 5;

    # wait till we're allowed to process 3 items
    until ($bucket->conform(3)) {
        sleep 0.1;
        # do things
    }
    
    # process 3 items because we now can
    process(3);

    # leak (flush) bucket
    $bucket->count(3);  # or, e.g. $bucket->count(1) for 1..3;

    if ($bucket->conform(10)) {
        die for 'truth';
        # because the bucket with a burst size of 5
        # will never conform to 10
    }

    my $time = Time::HiRes::time;
    while (Time::HiRes::time - $time < 7200) {  # two hours
        # be bursty
        if ($bucket->conform(5)) {
            process(5);
            $bucket->count(5);
        }
    }
    # we're likely to have processed 200 items (and hogged CPU, btw)

    Storable::store $bucket, 'bucket.stored';
    my $bucket1 = new Algorithm::TokenBucket
            @{Storable::retrieve('bucket.stored')};

=head1 DESCRIPTION

Token bucket algorithm is a flexible way of imposing a rate limit
against a stream of items. It is also very easy to combine several
rate-limiters in an C<AND> or C<OR> fashion.

Each bucket has a memory footprint of constant size because the
algorithm is based on C<information rate>. This was my main motivation to
implement it. Other rate limiters on CPAN keep track of I<ALL> incoming
items in memory. It allows them to be precisely accurate.

FYI, C<conform>, C<count>, C<information rate>, C<burst size> terms are
shamelessly borrowed from http://linux-ip.net/gl/tcng/node62.html
page.

=head1 INTERFACE

=cut

use fields qw/info_rate burst_size _tokens _last_check_time/;

=head2 METHODS

=over 4

=item new($$;$$)

The constructor takes as parameters at least C<rate of information> in
items per second and C<burst size> in items. It can also take current
token counter and last check time but this usage is reserved for
restoring a saved bucket, beware. See L</state>.

=cut

sub new {
    my $class   = shift;
    fields::new($class)->_init(@_);
}

sub _init {
    my Algorithm::TokenBucket $self = shift;

    @$self{qw/info_rate burst_size _tokens _last_check_time/} = @_;
    $self->{_last_check_time} ||= time;
    $self->{_tokens} ||= 0;

    return $self;
}

=item state()

This method returns the state of the bucket as a list. Use it for storing purposes.
Buckets also natively support freezing and thawing with L<Storable> by
providing STORABLE_* callbacks.

=cut

sub state {
    my Algorithm::TokenBucket $self = shift;

    return @$self{qw/info_rate burst_size _tokens _last_check_time/};
}

use constant PACK_FORMAT => "d4";   # "F4" is not 5.6 compatible

sub STORABLE_freeze {
    my ( $self, $cloning ) = @_;
    return pack(PACK_FORMAT(),$self->state);
}

sub STORABLE_thaw {
    my ( $self, $cloning, $state ) = @_;
    return $self->_init(unpack(PACK_FORMAT(),$state));
}

sub _token_flow {
    my Algorithm::TokenBucket $self = shift;

    my $time = time;

    $self->{_tokens}        += ($time - $self->{_last_check_time}) * $self->{info_rate};
    $self->{_tokens} > $self->{burst_size} and $self->{_tokens} = $self->{burst_size};

    $self->{_last_check_time}= $time;
}

=item conform($)

This sub checks if the bucket contains at least I<N> tokens. In that
case it is allowed to transmit (or process) I<N> items (not
exactly right because I<N> can be fractional) from the stream. A bucket never
conforms to an I<N> greater than C<burst size>.

It returns a boolean value.

=cut

sub conform {
    my Algorithm::TokenBucket $self = shift;
    my $size = shift;

    $self->_token_flow;

    return $self->{_tokens} >= $size;
}

=item count($)

This sub removes I<N> (or all if there are less than I<N> available) tokens from the bucket.
Does not return a meaningful value.

=cut

sub count {
    my Algorithm::TokenBucket $self = shift;
    my $size = shift;

    $self->_token_flow;

    ($self->{_tokens} -= $size) < 0 and $self->{_tokens} = 0;
}

=item until($)

This sub returns the number of seconds until I<N> tokens can be removed from the bucket.
It's especially useful in multitasking environments like L<POE> where you
cannot busy-wait. One can safely schedule the next conform($N) check in until($N)
seconds instead of checking repeatedly.

Note that until() does not take into account C<burst size>. That means
a bucket will not conform to I<N> even after sleeping for until($N)
seconds if I<N> is greater than C<burst size>.

=cut

sub until {
    my Algorithm::TokenBucket $self = shift;
    my $size = shift;

    $self->_token_flow;

    if ( $self->{_tokens} >= $size ) {
        # can conform() right now
        return 0;
    } else {
        my $needed = $size - $self->{_tokens};
        return ( $needed / $self->{info_rate} );
    }
}

=item get_token_count()

Returns the current number of tokens in the bucket. This method may be
useful for inspection or debugging purposes. You should not examine
the state of the bucket for rate limiting purposes.

This number will frequently be fractional so it's not exactly a
"count".

=cut

sub get_token_count {
    my Algorithm::TokenBucket $self = shift;
    $self->_token_flow;
    return $self->{_tokens};
}

1;
__END__

=back

=head1 EXAMPLES

Think a rate limiter for a mail sending application. We'd like to
allow 2 mails per minute but no more than 20 mails per hour.
Go, go, go!

    my $rl1 = new Algorithm::TokenBucket 2/60, 1;
    my $rl2 = new Algorithm::TokenBucket 20/3600, 10;
        # "bursts" of 10 to ease the lag but $rl1 enforces
        # 2 per minute, so it won't flood

    while (my $mail = get_next_mail) {
        until ($rl1->conform(1) && $rl2->conform(1)) {
            busy_wait;
        }

        $mail->take_off;
        $rl1->count(1); $rl2->count(1);
    }

Now, let's fix the CPU-hogging example from L</SYNOPSIS> using
L</until()> method.

    my $bucket = new Algorithm::TokenBucket 100 / 3600, 5;
    my $time = Time::HiRes::time;
    while (Time::HiRes::time - $time < 7200) {  # two hours
        # be bursty
        Time::HiRes::sleep $bucket->until(5);
        if ($bucket->conform(5)) {  # should always be true
            process(5);
            $bucket->count(5);
        }
    }
    # we're likely to have processed 200 items (without hogging the CPU)

=head1 BUGS

Works unreliably for fractional rates unless Time::HiRes is present.

Documentation lacks the actual algorithm description. See links or read
the source (there are about 20 lines of sparse perl in several subs, trust me).

until($N) does not return infinity if $N is greater than C<burst
size>. Sleeping for infinity seconds is both useless and hard to
debug.

=head1 ACKNOWLEDGMENTS

Yuval Kogman contributed the L</until> method, proper L<Storable> support
and other things.

Alexey Shrub contributed the L</get_token_count> method.

=head1 COPYRIGHT AND LICENSE

This software is copyright (C) 2012 by Alex Kapranoff.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=head1 AUTHOR

Alex Kapranoff, E<lt>kappa@cpan.orgE<gt>

=head1 SEE ALSO

http://www.eecs.harvard.edu/cs143/assignments/pa1/,
http://en.wikipedia.org/wiki/Token_bucket, 
http://linux-ip.net/gl/tcng/node54.html,
http://linux-ip.net/gl/tcng/node62.html,
L<Schedule::RateLimit>, L<Algorithm::FloodControl>.

=cut