The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 13:02 (EST)
# Function: m/r task (parent side)
#
# $Id: Task.pm,v 1.10 2011/01/14 20:58:26 jaw Exp $

package AC::MrGamoo::Task;
use AC::MrGamoo::Debug 'task';
use AC::MrGamoo::Submit::Compile;
use AC::MrGamoo::Submit::Request;
use AC::MrGamoo::Task::Running;
use AC::MrGamoo::PeerList;
use AC::MrGamoo::Config;
use AC::DC::IO::Forked;
use JSON;
use strict;

my $TSTART  = $^T;
my $TIMEOUT = 3600;
my $MAXREQ  = 2;
my $MAXRUNNING = 7;	# tune me!
my %REGISTRY;
my $msgid = $$;


################################################################

# schedule periodic "cronjob"
AC::DC::Sched->new(
    info	=> "task periodic",
    freq	=> 5,
    func	=> \&periodic,
   );

################################################################

sub new {
    my $class = shift;
    # %ACPMRMTaskCreate

    my $me = bless {
        request		=> { @_ },
    }, $class;
    debug("new task $me->{request}{taskid}");

    my $task = $me->{request}{taskid};
    return problem("cannot create task: no task id")   unless $task;
    if( $REGISTRY{$task} ){
        verbose("ignoring duplicate request task $task");
        # will cause a 200 OK, so the requestor will not retry
        return $REGISTRY{$task};
    }

    $me->{options} = decode_json( $me->{request}{options} ) if $me->{request}{options};
    $me->{initres} = from_json( $me->{request}{initres}, {allow_nonref => 1} ) if $me->{request}{initres};

    # compile
    eval {
        my $mr = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
        # merge job config + opts.
        $mr->set_config($me->{options});
        $mr->set_initres($me->{initres});
        $me->{R} = AC::MrGamoo::Submit::Request->new( $mr );
        $me->{R}{config}{jobid}  = $me->{request}{jobid};
        $me->{R}{config}{taskid} = $me->{request}{taskid};
        $me->{mr} = $mr;
    };
    if(my $e = $@){
        problem("cannot compile task: $e");
        return;
    }

    # measure
    for my $file (@{$me->{request}{infile}}){
        my $s = (stat(conf_value('basedir') . '/' . $file))[7];
        $me->{_inputsize} += $s
    }

    debug("input size: $me->{_inputsize}");

    # print STDERR "Task: ", dumper($me), "\n";
    $REGISTRY{$task} = $me;
    return $me;
}

sub start {
    my $me = shift;

    # if too many tasks are running, queue
    my $nrun = 0;
    for my $t (values %REGISTRY){
        $nrun ++ if $t->{io};
    }

    if( $nrun >= $MAXRUNNING ){
        $me->{_queueprio}    = $^T - $TSTART + $me->{_inputsize} / 1_000_000;
        $me->{status}{phase} = 'QUEUED';
        $me->{status}{amt}   = 0;
        debug("queue $me->{request}{phase} task $me->{request}{jobid}/$me->{request}{taskid} prio $me->{_queueprio}");
        return 1;
    }

    $me->_start();
}

sub _start {
    my $me = shift;

    debug("start $me->{request}{phase} task $me->{request}{jobid}/$me->{request}{taskid}");

    my $io = AC::DC::IO::Forked->new(
        \&AC::MrGamoo::Task::Running::_start_task, [ $me ],
        info	=> "task $me->{request}{jobid}/$me->{request}{taskid}",
       );

    $me->{io} = $io;
    $io->timeout_rel($TIMEOUT);
    $io->set_callback('timeout',  \&_timeout);
    $io->set_callback('read',     \&_read,     $me);
    $io->set_callback('shutdown', \&_shutdown, $me);

    $io->start();
}

sub abort {
    my $me = _find(shift, @_);

    return unless $me;
    debug("abort task $me->{request}{taskid}");
    $me->{io}->shut() if $me->{io};
    return 1;
}


sub _find {
    my $me = shift;
    return $me if ref $me;

    my %p = @_;
    my $task = $p{taskid};
    $me = $REGISTRY{$task};

    return $me;
}

sub _timeout {
    my $io = shift;
    $io->shut();
}

sub _shutdown {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    # send status to master
    $me->_send_status();

    my $task = $me->{request}{taskid};
    delete $REGISTRY{$task};

    delete $me->{io};

    periodic(1);	# try to start another task
}

sub _send_status_done {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    $me->{_status_underway} --;
}

sub _read {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    debug("read child $me->{request}{taskid}: $evt->{data}.");
    # read status msg from child
    $io->{rbuffer} .= $evt->{data};

    my @l = split /^/m, $io->{rbuffer};
    $io->{rbuffer} = '';
    for my $l (@l){
        unless( $l =~ /\n/ ){
            $io->{rbuffer} = $l;
            last;
        }

        debug("got status $l");
        chomp($l);
        my($phase, $amt) = split /\s+/, $l;

        $me->{status}{phase} = $phase;
        $me->{status}{amt}   = $amt;
        $me->{status}{fail}  = 1 if $phase eq 'FAILED';

        # send status to master
        $me->_send_status();
    }
}

sub _send_status {
    my $me = shift;

    # don't kill the master with too many requests
    # return if $me->{_status_underway} >= $MAXREQ;

    my($addr, $port) = get_peer_addr_from_id( $me->{request}{master} );
    return unless $addr;

    debug("sending task status update $me->{request}{taskid} => $me->{status}{phase}");
    my $x = AC::MrGamoo::API::Client->new( $addr, $port, "task $me->{request}{taskid}", {
        type		=> 'mrgamoo_taskstatus',
        msgidno		=> $msgid++,
        want_reply	=> 0,
    }, {
        jobid		=> $me->{request}{jobid},
        taskid		=> $me->{request}{taskid},
        phase		=> $me->{status}{phase},
        progress	=> $me->{status}{amt},
    } );

    return unless $x;

    $me->{_status_underway} ++;
    $x->set_callback('shutdown', \&_send_status_done, $me);

    $x->start();

}

sub attr {
    my $me = shift;
    my $bk = shift;
    my $p  = shift;

    return $bk->{attr}{$p} if $bk && $bk->{attr}{$p};
    return $me->{options}{$p};
}

sub report {

    my $txt;

    for my $t (values %REGISTRY){
        $txt .= "$t->{request}{jobid} $t->{request}{taskid} $t->{status}{phase}\n";
    }

    return $txt;
}

sub periodic {
    my $quick = shift;

    # how many tasks are running?
    my $nrun = 0;
    for my $t (values %REGISTRY){
        $nrun ++ if $t->{io};
    }

    return if $quick && $nrun >= $MAXRUNNING;

    # queued? send status, maybe start

    for my $t (sort { $a->{_queueprio} <=> $b->{_queueprio} } values %REGISTRY){
        next if $t->{io};

        $t->_send_status() unless $quick;

        if( $nrun < $MAXRUNNING ){
            $t->_start();
            $nrun ++;
        }
    }
}


1;