The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
=head1 NAME

DynGig::Automata::MapReduce - Sequential map/reduce automation framework.
Extends DynGig::Automata::Serial.

=cut
package DynGig::Automata::MapReduce;

use base DynGig::Automata::Serial;

use warnings;
use strict;
use Carp;

use File::Spec;

use DynGig::Automata::EZDB::Exclude;

sub new
{
    my ( $class, $name ) = @_;

    croak 'undefined/invalid name' if ! defined $name || ref $name;

    bless DynGig::Automata::Serial->new
    (
        name => $name,
        queue => _MapReduce->queue( File::Spec->join( 'conf', $name ) ),
    );
}

sub setup
{
    my $this = DynGig::Automata::Serial::setup( @_ );

    $this->{_run}{exclude} ||= DynGig::Automata::EZDB::Exclude->new
    (
        $this->file( 'exclude' ),
        table => [ DynGig::Automata::Serial::GLOBAL, $this->job() ],
    );

    return $this;
}

sub run
{
    my ( $this, %param ) = @_;
    my $error = 'invalid context';
##  context
    croak "$error: not defined" unless my $context = $param{context};
    croak "$error: not HASH" if ref $context ne 'HASH';

    for my $key ( 'global', map { $_->{name} } @{ $this->{queue} } )
    {
        unless ( defined $context->{$key} )
        {
            $context->{$key} = {};
        }
        elsif ( ref $context->{$key} ne 'HASH' )
        {
            croak "$error: $key not HASH";
        }
    }

    $context->{transient} = {};
##  prepare
    $this->setup();

    my $run = $this->{_run};

    $run->{logger} ||= DynGig::Util::Logger->new();
    $run->{context} = $context;

    my $exclude = $run->{exclude};
    my $logger = sub { $run->{logger}->write( @_ ) };
    my %context = map { $_ => $context->{$_} } qw( transient global );
##  sequence
    for my $job ( @{ $this->{queue} } )
    {
        my $name = $job->{name};
        my %param = 
        (
            job => $name,
            exclude => $exclude,
            name => $this->{name},
        );

        map { $exclude->expire( $_ ) } DynGig::Automata::Serial::GLOBAL, $name;

        my ( $status, $result ) = $this->_eval
        (
            $job,
            param => \%param,
            logger => $logger,
            context => +{ %context, glocal => $context->{ $name } },
        );

        croak $result if $status != DynGig::Automata::Serial::OK;
    }

    delete $context->{transient};
}

package _MapReduce;

use warnings;
use strict;
use Carp;

use YAML::XS;

use DynGig::Util::Time;
use DynGig::Util::MapReduce;

use constant { PRECISION => 30 };

sub queue
{
    my ( $class, $conf ) = @_;
    my ( @queue, %job );
    my $error = 'invalid queue config';

    for my $param ( YAML::XS::LoadFile $conf )
    {
        croak $error unless $param && ref $param eq 'HASH';

        my $name = $param->{name};

        croak "$error: invalid/undefined name" if ! defined $name || ref $name;
        croak "$error: name collision '$name'" if $job{$name};

        $param->{name} = $job{$name} = "job.$name";

        my %param =
        (
            interval => DynGig::Util::Time->rel2sec( $param->{interval} ),
            job => DynGig::Util::MapReduce->new( _param( %$param ) ),
        );

        push @queue, { param => \%param, name => $job{$name}, code => \&_code };
    }

    return \@queue;
}

sub _param
{
    my %param = @_;
    my $error = "invalid job config $param{name}";

    for my $key ( qw( batch map reduce ) )
    {
        my $plugin = $param{$key};

        unless ( defined $plugin )
        {
            next if $key eq 'reduce';
            croak "$error: undefined $key";
        }

        croak "$error: invalid $key " . ( $@ || '' ) if ref $plugin ne 'HASH'
            || ref ( $plugin->{code} = do $plugin->{code} ) ne 'CODE'
            || ref ( $plugin->{param} ||= {} ) ne 'HASH';

        $plugin->{timeout} = DynGig::Util::Time->rel2sec( $plugin->{timeout} );
    }

    return %param;
}

sub _code
{
    my %param = @_;
    my $job = $param{job};
    my $logger = $param{logger};
    my $context = $param{context};
    my $glocal = $context->{glocal};
    my $global = $context->{global};
    my $name = $job->name();
    my $time = time;
    my $delta = 0;

    if ( my $interval = $param{interval} )
    {
        unless ( my $last = $glocal->{timestamp} )
        {
            $delta = $interval;
        }
        elsif ( ( $delta = $last + $interval - $time ) > PRECISION )
        {
            goto DONE;
        }
        elsif ( $delta < 0 )
        {
            &$logger( 'OVERDUE: %s for %d seconds', $name, -$delta )
                if -$delta > PRECISION;

            $delta = 0;
        }
    }

    my %context = ( $name => $glocal, global => $global->{$name} ||= {} );

    &$logger( 'START: %s', $name );

    $job->run
    (
        context => \%context,
        map { $_ => $param{param} } qw( batch reduce ),
    );

    &$logger( 'DONE: %s', $name );

    $glocal->{timestamp} = $time;
    DONE: $glocal->{due} = $delta;
}

=head1 NOTE

See DynGig::Automata

=cut

1;

__END__