The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Apr-22 10:50 (EDT)
# Function: info about tasks
#
# $Id: TaskInfo.pm,v 1.1 2010/11/01 18:41:57 jaw Exp $

package AC::MrGamoo::Job::TaskInfo;
use AC::MrGamoo::Debug 'job_taskinfo';
use AC::MrGamoo::PeerList;
use AC::Misc;
use strict;

our @ISA = 'AC::MrGamoo::Job::Info';

my $MAXRETRY = 2;

sub new {
    my $class = shift;
    my $job   = shift;

    return bless { @_ }, $class;
}

sub pend {
    my $me  = shift;
    my $job = shift;

    return if $me->{replaced};
    return if $me->{finished};

    # create instance, put on pending queue
    my $t = AC::MrGamoo::Job::Task->new($job, $me, $me->{server});
    return unless $t;
    $me->{instance}{ $t->{id} } = $t;

    return;
}

sub finished {
    my $me   = shift;
    my $t    = shift;
    my $job  = shift;

    delete $me->{instance}{ $t->{id} };

    $me->{finished} = 1;
    my $outfiles = $me->{outfile};
    my $server   = $t->{server};

    debug("task finished $me->{id} on $server");
    # copy files
    for my $fi (@$outfiles){
        # add to file_info - file is now on one server
        debug("  outfile $fi->{filename}");
        $job->{file_info}{ $fi->{filename} } = {
            filename	=> $fi->{filename},
            location	=> [ $server ],
        };
        $job->{server_info}{$server}{has_files}{$fi->{filename}} = 1;
        # QQQ - optionally leave final files?
        push @{$job->{tmp_file}}, { filename => $fi->{filename}, server => $server };

        # add to copy_pending
        foreach my $s ( @{$fi->{dst}} ){
            next if $job->{server_info}{$s}{has_files}{$fi->{filename}};
            my $c = AC::MrGamoo::Job::XferInfo->new( $job,
                id		=> unique(),
                filename	=> $fi->{filename},
                dst		=> $s,
               );
            next unless $c;
            $c->pend($job);
            debug("    => pending copy for $s");
        }
    }
}

sub failed {
    my $me   = shift;
    my $t    = shift;
    my $job  = shift;

    delete $me->{instance}{ $t->{id} };

    my $server = $me->{server};
    my $status = get_peer_status_from_id($server);
    if( $status != 200 ){
        # replan tasks
        $job->_replan_server($server, 'task', $me);
        return;
    }

    if( $me->{retries} ++ > $MAXRETRY ){
        # replan tasks
        $me->replan($job);
        return;
    }

    # retry
    debug("retry task");
    $me->pend($job);
}

################################################################

sub replan {
    my $me  = shift;
    my $job = shift;

    return if $me->{replaced};

    return $job->abort( reason => "too many failed tasks. out of replan options.")
      if $me->{replaces};

    return $me->_replan_altserver($job) if $me->{altserver};

    if( $me->{phase} eq 'reduce' ){
        verbose("cannot replan task. no altserver");
        $job->abort(reason => "cannot replan task. no alternate server available");
        return;
    }

    $me->_replan_map($job);
}

sub _replan_altserver {
    my $me  = shift;
    my $job = shift;

    $me->{server} = $me->{altserver};
    delete $me->{retries};
    delete $me->{altserver};

    debug("replanning task to new server");
    $me->pend($job);
}

sub _replan_map {
    my $me  = shift;
    my $job = shift;

    # remove task
    # divy files among servers
    # create new tasks
    # rediddle next phase

    my %newplan;	# server => @files

    $me->{replaced} = 1;

    unless( $me->{altplan} ){
        verbose("no alt task available - aborting");
        $job->abort(reason => "cannot replan task. no alternate available");
        return;
    }

    # divy files
    for my $f (@{$me->{infile}}){
        # alt loc for this file?
        my $loc = $me->{altplan}{$f};

        unless($loc){
            verbose("file unavailable - aborting ($f)");
            $job->abort(reason => "file unavailable: $f");
            return;
        }
        push @{$newplan{$loc}}, $f;
    }

    my @new;
    for my $as (keys %newplan){
        my $newid = unique();
        my $oldid = $me->{id};

        my $new = AC::MrGamoo::Job::TaskInfo->new($job,
            id		=> $newid,
            phase	=> $me->{phase},
            infile	=> $newplan{$as},
            replaces	=> $oldid,
            outfile	=> [ map {
                (my $f = $_->{filename}) =~ s/$oldid/$newid/;
                { dst => $_->{dst}, filename => $f, }
            } @{$me->{outfile}} ],
            server	=> $as,
        );
        debug("replan map $oldid => $newid on $as");

        # keep plan up to date
        $job->{plan}{taskidx}{$newid} = $new;
        push @{$job->{plan}{taskplan}[0]{task}}, $new;

        # move to pending queue
        $new->pend($job) if $job->{phase_no} == 0;

        push @new, $new;
    }

    $me->_replan_replace_files( $job, @new );

}

sub _replan_replace_files {
    my $me  = shift;
    my $job = shift;
    my @new = shift;

    my $oldid = $me->{id};
    my $curphase = 0;	# map
    my $nxtphase = 1;	# reduce/0

    # remove old task's files, add new tasks' files
    for my $ti ( @{$job->{plan}{taskplan}[$nxtphase]{task}} ){
        my @infile;
        for my $file (@{$ti->{infile}}){
            if( $file =~ /$oldid/ ){
                for my $new (@new){
                    my $newid = $new->{id};
                    (my $n = $file) =~ s/$oldid/$newid/;
                    push @infile, $n;
                }
            }else{
                push @infile, $file;
            }
        }
        $ti->{infile} = \@infile;
    }
}

1;