=head1 NAME
DynGig::Automata::MapReduce - Sequential map/reduce automation framework.
Extends DynGig::Automata::Serial.
=cut
package DynGig::Automata::MapReduce;
use base DynGig::Automata::Serial;
use warnings;
use strict;
use Carp;
use File::Spec;
use DynGig::Automata::EZDB::Exclude;
sub new
{
my ( $class, $name ) = @_;
croak 'undefined/invalid name' if ! defined $name || ref $name;
bless DynGig::Automata::Serial->new
(
name => $name,
queue => _MapReduce->queue( File::Spec->join( 'conf', $name ) ),
);
}
sub setup
{
my $this = DynGig::Automata::Serial::setup( @_ );
$this->{_run}{exclude} ||= DynGig::Automata::EZDB::Exclude->new
(
$this->file( 'exclude' ),
table => [ DynGig::Automata::Serial::GLOBAL, $this->job() ],
);
return $this;
}
sub run
{
my ( $this, %param ) = @_;
my $error = 'invalid context';
## context
croak "$error: not defined" unless my $context = $param{context};
croak "$error: not HASH" if ref $context ne 'HASH';
for my $key ( 'global', map { $_->{name} } @{ $this->{queue} } )
{
unless ( defined $context->{$key} )
{
$context->{$key} = {};
}
elsif ( ref $context->{$key} ne 'HASH' )
{
croak "$error: $key not HASH";
}
}
$context->{transient} = {};
## prepare
$this->setup();
my $run = $this->{_run};
$run->{logger} ||= DynGig::Util::Logger->new();
$run->{context} = $context;
my $exclude = $run->{exclude};
my $logger = sub { $run->{logger}->write( @_ ) };
my %context = map { $_ => $context->{$_} } qw( transient global );
## sequence
for my $job ( @{ $this->{queue} } )
{
my $name = $job->{name};
my %param =
(
job => $name,
exclude => $exclude,
name => $this->{name},
);
map { $exclude->expire( $_ ) } DynGig::Automata::Serial::GLOBAL, $name;
my ( $status, $result ) = $this->_eval
(
$job,
param => \%param,
logger => $logger,
context => +{ %context, glocal => $context->{ $name } },
);
croak $result if $status != DynGig::Automata::Serial::OK;
}
delete $context->{transient};
}
package _MapReduce;
use warnings;
use strict;
use Carp;
use YAML::XS;
use DynGig::Util::Time;
use DynGig::Util::MapReduce;
use constant { PRECISION => 30 };
sub queue
{
my ( $class, $conf ) = @_;
my ( @queue, %job );
my $error = 'invalid queue config';
for my $param ( YAML::XS::LoadFile $conf )
{
croak $error unless $param && ref $param eq 'HASH';
my $name = $param->{name};
croak "$error: invalid/undefined name" if ! defined $name || ref $name;
croak "$error: name collision '$name'" if $job{$name};
$param->{name} = $job{$name} = "job.$name";
my %param =
(
interval => DynGig::Util::Time->rel2sec( $param->{interval} ),
job => DynGig::Util::MapReduce->new( _param( %$param ) ),
);
push @queue, { param => \%param, name => $job{$name}, code => \&_code };
}
return \@queue;
}
sub _param
{
my %param = @_;
my $error = "invalid job config $param{name}";
for my $key ( qw( batch map reduce ) )
{
my $plugin = $param{$key};
unless ( defined $plugin )
{
next if $key eq 'reduce';
croak "$error: undefined $key";
}
croak "$error: invalid $key " . ( $@ || '' ) if ref $plugin ne 'HASH'
|| ref ( $plugin->{code} = do $plugin->{code} ) ne 'CODE'
|| ref ( $plugin->{param} ||= {} ) ne 'HASH';
$plugin->{timeout} = DynGig::Util::Time->rel2sec( $plugin->{timeout} );
}
return %param;
}
sub _code
{
my %param = @_;
my $job = $param{job};
my $logger = $param{logger};
my $context = $param{context};
my $glocal = $context->{glocal};
my $global = $context->{global};
my $name = $job->name();
my $time = time;
my $delta = 0;
if ( my $interval = $param{interval} )
{
unless ( my $last = $glocal->{timestamp} )
{
$delta = $interval;
}
elsif ( ( $delta = $last + $interval - $time ) > PRECISION )
{
goto DONE;
}
elsif ( $delta < 0 )
{
&$logger( 'OVERDUE: %s for %d seconds', $name, -$delta )
if -$delta > PRECISION;
$delta = 0;
}
}
my %context = ( $name => $glocal, global => $global->{$name} ||= {} );
&$logger( 'START: %s', $name );
$job->run
(
context => \%context,
map { $_ => $param{param} } qw( batch reduce ),
);
&$logger( 'DONE: %s', $name );
$glocal->{timestamp} = $time;
DONE: $glocal->{due} = $delta;
}
=head1 NOTE
See DynGig::Automata
=cut
1;
__END__