# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Apr-22 10:50 (EDT)
# Function: info about tasks
#
# $Id: TaskInfo.pm,v 1.1 2010/11/01 18:41:57 jaw Exp $
package AC::MrGamoo::Job::TaskInfo;
use AC::MrGamoo::Debug 'job_taskinfo';
use AC::MrGamoo::PeerList;
use AC::Misc;
use strict;
our @ISA = 'AC::MrGamoo::Job::Info';
my $MAXRETRY = 2;
sub new {
my $class = shift;
my $job = shift;
return bless { @_ }, $class;
}
sub pend {
my $me = shift;
my $job = shift;
return if $me->{replaced};
return if $me->{finished};
# create instance, put on pending queue
my $t = AC::MrGamoo::Job::Task->new($job, $me, $me->{server});
return unless $t;
$me->{instance}{ $t->{id} } = $t;
return;
}
sub finished {
my $me = shift;
my $t = shift;
my $job = shift;
delete $me->{instance}{ $t->{id} };
$me->{finished} = 1;
my $outfiles = $me->{outfile};
my $server = $t->{server};
debug("task finished $me->{id} on $server");
# copy files
for my $fi (@$outfiles){
# add to file_info - file is now on one server
debug(" outfile $fi->{filename}");
$job->{file_info}{ $fi->{filename} } = {
filename => $fi->{filename},
location => [ $server ],
};
$job->{server_info}{$server}{has_files}{$fi->{filename}} = 1;
# QQQ - optionally leave final files?
push @{$job->{tmp_file}}, { filename => $fi->{filename}, server => $server };
# add to copy_pending
foreach my $s ( @{$fi->{dst}} ){
next if $job->{server_info}{$s}{has_files}{$fi->{filename}};
my $c = AC::MrGamoo::Job::XferInfo->new( $job,
id => unique(),
filename => $fi->{filename},
dst => $s,
);
next unless $c;
$c->pend($job);
debug(" => pending copy for $s");
}
}
}
sub failed {
my $me = shift;
my $t = shift;
my $job = shift;
delete $me->{instance}{ $t->{id} };
my $server = $me->{server};
my $status = get_peer_status_from_id($server);
if( $status != 200 ){
# replan tasks
$job->_replan_server($server, 'task', $me);
return;
}
if( $me->{retries} ++ > $MAXRETRY ){
# replan tasks
$me->replan($job);
return;
}
# retry
debug("retry task");
$me->pend($job);
}
################################################################
sub replan {
my $me = shift;
my $job = shift;
return if $me->{replaced};
return $job->abort( reason => "too many failed tasks. out of replan options.")
if $me->{replaces};
return $me->_replan_altserver($job) if $me->{altserver};
if( $me->{phase} eq 'reduce' ){
verbose("cannot replan task. no altserver");
$job->abort(reason => "cannot replan task. no alternate server available");
return;
}
$me->_replan_map($job);
}
sub _replan_altserver {
my $me = shift;
my $job = shift;
$me->{server} = $me->{altserver};
delete $me->{retries};
delete $me->{altserver};
debug("replanning task to new server");
$me->pend($job);
}
sub _replan_map {
my $me = shift;
my $job = shift;
# remove task
# divy files among servers
# create new tasks
# rediddle next phase
my %newplan; # server => @files
$me->{replaced} = 1;
unless( $me->{altplan} ){
verbose("no alt task available - aborting");
$job->abort(reason => "cannot replan task. no alternate available");
return;
}
# divy files
for my $f (@{$me->{infile}}){
# alt loc for this file?
my $loc = $me->{altplan}{$f};
unless($loc){
verbose("file unavailable - aborting ($f)");
$job->abort(reason => "file unavailable: $f");
return;
}
push @{$newplan{$loc}}, $f;
}
my @new;
for my $as (keys %newplan){
my $newid = unique();
my $oldid = $me->{id};
my $new = AC::MrGamoo::Job::TaskInfo->new($job,
id => $newid,
phase => $me->{phase},
infile => $newplan{$as},
replaces => $oldid,
outfile => [ map {
(my $f = $_->{filename}) =~ s/$oldid/$newid/;
{ dst => $_->{dst}, filename => $f, }
} @{$me->{outfile}} ],
server => $as,
);
debug("replan map $oldid => $newid on $as");
# keep plan up to date
$job->{plan}{taskidx}{$newid} = $new;
push @{$job->{plan}{taskplan}[0]{task}}, $new;
# move to pending queue
$new->pend($job) if $job->{phase_no} == 0;
push @new, $new;
}
$me->_replan_replace_files( $job, @new );
}
sub _replan_replace_files {
my $me = shift;
my $job = shift;
my @new = shift;
my $oldid = $me->{id};
my $curphase = 0; # map
my $nxtphase = 1; # reduce/0
# remove old task's files, add new tasks' files
for my $ti ( @{$job->{plan}{taskplan}[$nxtphase]{task}} ){
my @infile;
for my $file (@{$ti->{infile}}){
if( $file =~ /$oldid/ ){
for my $new (@new){
my $newid = $new->{id};
(my $n = $file) =~ s/$oldid/$newid/;
push @infile, $n;
}
}else{
push @infile, $file;
}
}
$ti->{infile} = \@infile;
}
}
1;