The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# $Id$

package Mvalve::Reader;
use Moose;
use Mvalve;
use Mvalve::Const;
use Mvalve::Types;

extends 'Mvalve::Base';

has 'timeout' => (
    is => 'rw',
    isa => 'Num',
    required => 1,
    default => 60
);

has 'throttler' => (
    is       => 'rw',
    does     => 'Mvalve::Throttler',
    required => 1,
    coerce   => 1,
    handles  => [ qw(try_push fill) ],
);

has 'drop_on_emergency' => (
    is => 'rw',
    isa => 'Bool',
    required => 1,
    default => 1,
);

__PACKAGE__->meta->make_immutable;

no Moose;

sub next
{
    my $self = shift;

    my $qs    = $self->queue_set;
    my @names = $qs->as_q4m_args;

    Mvalve::trace("queue_wait with @names") if &Mvalve::Const::MVALVE_TRACE;
    my $table = $self->q_next(
        table_conds => \@names, 
        timeout     => $self->timeout + 0
    );

    if (! $table) {
        Mvalve::trace( "q_next did not return a table name, simply returning" ) if &Mvalve::Const::MVALVE_TRACE;
        return ();
    }

    Mvalve::trace( "issueing fetch on table '$table'") if &Mvalve::Const::MVALVE_TRACE;

    my $message = $self->q_fetch(table => $table);
    if (! $message) {
        Mvalve::trace( "q_fetch did not return a message, simply returning" ) if &Mvalve::Const::MVALVE_TRACE;
        return ();
    }

    my $destination = $message->header( &Mvalve::Const::DESTINATION_HEADER );

    if (&Mvalve::Const::MVALVE_TRACE && $qs->is_timed( $table )) {
        Mvalve::trace( "we should have dispatched at " . 
            scalar( localtime( $message->header( &Mvalve::Const::RETRY_HEADER )  /100_000 ) ) );
    }

    # destination is an abstract symbol representing the endpoint
    # service name. this /could/ be used by the queue consumer, but it
    # is *not* a

    my $emerg_key = [ $destination, 'emergency' ];
    if ( $qs->is_emergency( $table ) ) {
        if ($self->drop_on_emergency) {
            my $duration = $message->header(&Mvalve::Const::DURATION_HEADER)
                || $self->throttler->interval;
            $self->state_set($emerg_key, time + $duration);
        }
        else {
            $self->fill( key => $destination ) 
        }

        goto RETURN_MESSAGE;
    }

    my $avail_time = $self->state_get($emerg_key);
    if ($avail_time && time < $avail_time) {
        return (); # drop message!
    }

    if ( $qs->is_timed( $table ) ) {
        # if this is from an emergency queue or a timed queue, we go ahead
        # and allow the message, but we also update the throttler's count
        # so the next message from a normal queue would be throttled correctly
        if ($message->header(&Mvalve::Const::RETRY_HEADER)) {
            $self->state_decr( [ $destination, 'retry' ] );
        }
        $self->fill( key => $destination );

        # XXX - This is bad practive, but oh well
        goto RETURN_MESSAGE;
    }

    # otherwise, we need to check if this message is going to be throttled
    my $is_pending   = $self->is_pending( $destination );
    my $is_throttled = ! $self->try_push( key => $destination );
    Mvalve::trace( "checking if message to $destination should be throttled (pending: $is_pending, throttled: $is_throttled)" ) if &Mvalve::Const::MVALVE_TRACE;

    if ($is_throttled || $is_pending) {
        Mvalve::trace( "message", $message->id, "is being throttled") if &Mvalve::Const::MVALVE_TRACE;
        $self->defer( 
            message => $message,
            interval => $self->throttler->interval,
        );
        $self->log(
            action => "throttle",
            destination => $destination
        );
        return (); # no data for you!
    }

RETURN_MESSAGE:
    # if we got here, we can just return the data
    Mvalve::trace( "message", $message->id, "being returned") if &Mvalve::Const::MVALVE_TRACE;
    $self->log(
        action => "dequeue",
        destination => $destination,
    );
    return $message;
}

sub is_pending {
    my( $self, $destination ) = @_;

    my $retry_key = [ $destination, 'retry' ];
    my $count = $self->state_get($retry_key);
    return $count ? 1 : 0;
}

1;

__END__

=head1 NAME

Mvalve::Reader - Mvalve Reader

=head1 METHODS

=head2 next

Fetches the next available message. 

=head2 is_pending( $destination )

Checks whethere there are pending retries for that particular destination.

=head2 throttler

C<throttler> holds the Data::Throttler instance that does the dirty work of
determining if a message needs to be throttled or not

  $self->throttler( {
    module => "Data::Throttler::Memcached",
  } );

=head2 timeout

C<timeout> specifies the timeout value while we wait to read from the queue.

=head2 drop_on_emergency

C<drop_on_emergency> specifies if given a emergency message, other messages
should be dropped while the emergency message is being displayed.

By default this is C<true>.

=cut