The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# $Id: Throttle.pm,v 1.4 2010/03/12 22:10:11 dk Exp $
package IO::Lambda::Throttle;
use strict;
use warnings;
use Exporter;
use IO::Lambda qw(:all);
use IO::Lambda::Mutex qw(mutex);
use Time::HiRes qw(time);
use Scalar::Util qw(weaken);
our $DEBUG = $IO::Lambda::DEBUG{throttle} || 0;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(throttle);
our %EXPORT_TAGS = ( all => \@EXPORT_OK);

sub new
{
	my ($class, $rate, $strict) = @_;
	my $self = bless {
		mutex     => IO::Lambda::Mutex-> new,
		last      => 0,
		low       => 0,
		high      => 0,
		strict    => $strict || 0,
	}, $class;
	$self-> rate($rate);
	return $self;
}

sub rate
{
	return $_[0]-> {rate} unless $#_;
	my ( $self, $rate) = @_;
	die "negative rate" if defined($rate) and $rate < 0;
	$self-> {rate} = $rate;
}

sub strict { $#_ ? $_[0]-> {strict} = $_[1] : $_[0]-> {strict} }

# warning: when called, changes internal state of an object
# returns 0 if rate limitter thinks it's ok to run now,
# otherwise returns number of seconds needed to sleep
sub next_timeout
{
	my $self = shift;
	unless ( $self-> {rate}) {
		# special case
		return 0;
	}

	my $ts = time;
	if ( $ts < $self-> {last}) {
		# warn "negative time detected\n";
		my $delta = $self-> {last} - $ts;
		$self-> {low}  -= $delta;
		$self-> {high} -= $delta;
	}
	$self-> {last} = $ts;
	warn "$ts: $self->{low}/$self->{high}\n" if $DEBUG;

	if ( $self-> {low} < $self-> {high}) {
		$self-> {low} += 1 / $self-> {rate};
		warn "case1\n" if $DEBUG;
		return 0;
	} elsif ( $self-> {low} < $ts) {
		$self-> {low}  = $ts + 1 / $self-> {rate};
		$self-> {high} = $ts + ($self->{strict} ? 1 / $self-> {rate} : 1);
		warn "case2\n" if $DEBUG;
		return 0;
	} else {
		warn "wait ", $self->{low}-$ts, "\n" if $DEBUG;
		return $self-> {low} - $ts;
	}
}

# Returns a lambda that finishes until rate-limitter allows further run.
sub lock
{
	my $self = shift;
	weaken $self;
	return $self-> {mutex}-> pipeline( 
		lambda {
			my $timeout = $self-> next_timeout;
			return unless $timeout;
			context $timeout;
			timeout {
				die "something wrong, non-zero timeout"
					if $self-> next_timeout;
				return;
			};
		} 
	);
}

# returns a lambda that is finished when all lambdas, one by one,
# are passed through a rate limitter
sub ratelimit
{
	my ($self) = @_;
	my @ret;
	return lambda {
		my @lambdas = @_;
		return unless @lambdas;
		context $self-> lock;
		tail {
			context shift @lambdas;
		tail {
			push @ret, @_;
			if ( @lambdas) {
				this-> call(@lambdas)-> start;
			} else {
				my @r = @ret;
				@ret = ();
				return @r;
			}
		}}
	};
}


sub throttle { __PACKAGE__-> new(@_)-> ratelimit }

1;

=pod

=head1 NAME

IO::Lambda::Throttle - rate-limiting facility

=head1 DESCRIPTION

Provides several interfaces for throttling control flow by imposing rate limit.

=head1 SYNPOSIS

   use IO::Lambda qw(:lambda);
   use IO::Lambda::Throttle qw(throttle);

   # execute 2 lambdas a sec - blocking
   throttle(2)-> wait(@lambdas);
   # non-blocking
   lambda {
   	context throttle(2), @lambdas;
   	tail {};
   };

   # share a rate-limiter between two sets of lambdas running in parallel
   # strictly 1 lambda in 10 seconds will be executed
   my $t = IO::Lambda::Throttle-> new(0.1);

   # throttle lambdas sequentially
   sub track
   {
      my @lambdas = @_;
      return lambda {
         context $t-> ratelimit, @lambdas; 
         tail {};
      };
   }

   # parallel two tracks - execution order will be
   # $a[0], $b[0], $a[1], $b[1], etc
   lambda {
   	context track(@a), track(@b);
	tails {}
   }-> wait;

=head1 API

=over

=item new($rate = 0, $strict = 0)

The constructor creates a new rate-limiter object. The object methods (see
below) generate lambdas that allow to execute lambdas with a given rate and
algorithm. See L<rate> and C<strict> for description.

=item rate INT

C<$rate> is given in lambda/seconds, and means infinity if is 0.

=item strict BOOL

C<$strict> selects between fair and aggressive throttling . For example, if
rate is 5 l/s, and first 5 lambdas all come within first 0.1 sec. With
C<$strict> 0, all af them will be scheduled to execution immediately, but the
6th lambda will be delayed to 1.2 sec. With C<$strict> 1, all lambdas will be
scheduled to be executed evenly with 0.2 sec delay.

=item next_timeout :: TIMEOUT

Internal function, called when code needs to determine whether lambda is
allowed to run immediately (function returns 0) or after a timeout (returns
non-zero value).  If a non-zero value is returned, it is guaranteed that after
sleeping this time, next invocation of the function will return 0.

Override the function for your own implementation of rate-limiting function.

=item lock

Returns a lambda that will execute when rate-limiter allows next execution:

    context $throttle-> lock;
    tail {
         ... do something ...
    }

The lambda can be reused.

=item ratelimit :: @lambdas -> @ret

Returns a lambda that finishes when all passed @lambdas are finished.
Executes them one by one, imposing a rate limit. Returns results of lambdas
accumulated in a list.

=item throttle($rate,$strict)

Condition version of C<< new($rate,$strict)-> ratelimit >>

=back

=head1 AUTHOR

Dmitry Karasik, E<lt>dmitry@karasik.eu.orgE<gt>.

=cut