package Makefile::Parallel::Scheduler::PBS;
use base qw(Makefile::Parallel::Scheduler);
use strict;
use warnings;
use Cwd;
use Proc::Reliable;
use Data::Dumper;
use Time::Interval;
sub launch{
my ($self, $job, $debug) = @_;
my $me = `whoami`;
chomp($me);
# Create the PBS script
open F, ">/tmp/$me#$job->{rule}{id}.sh";
# Number of cpus
$job->{cpus} ||= 1;
if ($job->{action}[0]{shell}) {
print F "#!/bin/sh\n";
if ($self->{mail}) {
print F "#PBS -m e -M $self->{mail}\n";
}
print F "#PBS -l walltime=$job->{walltime}\n";
print F "#PBS -l nodes=1:ppn=$job->{cpus}\n";
print F "#\n";
print F "cd " . cwd() . "\n";
print F join("\n",map { $_->{shell}?$_->{shell}:() } @{$job->{action} });
close F;
}
elsif ($job->{action}[0]{perl}) {
print F "#!/usr/bin/perl\n";
print F "#PBS -l walltime=$job->{walltime}\n";
print F "#PBS -l nodes=1:ppn=$job->{cpus}\n";
print F "#\n";
print F "chdir qq{" . cwd() . "};\n";
print F $job->{perl};
print F $job->{action}[0]{perl};
close F;
}
else {
die "Action type not supported.\n";
}
# If we are in debug mode, copy the file to log dir
`cp /tmp/$me#$job->{rule}{id}.sh log/$me#$job->{rule}{id}.sh` if $debug;
# Launch the process
sleep 1;
my $proc = Proc::Reliable->new();
my ($stdout, $stderr, $status, undef) = $proc->run("qsub /tmp/$me#$job->{rule}{id}.sh");
# Only save the id
$stdout =~ /^(\d+)/;
$job->{proc} = $1;
}
sub poll {
my ($self, $job, $logger) = @_;
# I do not have any idea how we can have a job running without
# this proc information, but the truth is that that is
# happening :-S
return 0 unless $job->{proc};
my $me = `whoami`;
chomp($me);
my $proc = Proc::Reliable->new();
my (undef, undef, $status, undef) = $proc->run("qstat $job->{proc}");
if($status == 39168){
unlink "/tmp/$me#$job->{rule}{id}.sh";
return 0;
}
$logger->warn("qstat failed with code $status while polling for $job->{rule}{id}") if $status;
return 1;
}
sub interrupt {
my ($self, $job) = @_;
`qdel $job->{proc}`;
}
sub get_id {
my ($self, $job) = @_;
$job->{proc};
}
sub can_run {
my ($self) = @_;
return 1;
}
sub clean {
my ($self, $queue) = @_;
my $me = `whoami`;
chomp($me);
for my $job (@{$queue}) {
my $id = $job->{rule}->{id};
# If there are expands, expand it in the lazy way :D
$id .= '*' if($job->{rule}{vars});
# Remove the remporary files
`rm -f $me#$id.sh.o*`;
`rm -f $me#$id.sh.e*`;
}
}
sub get_dead_job_info {
my ($self, $job) = @_;
my $proc = Proc::Reliable->new();
$proc->num_tries(3);
$proc->time_btw_tries(1);
my ($stdout, $stderr, $status, undef) = $proc->run("tracejob $job->{proc}");
# Parse exit_status
if($stdout =~ /Exit_status\=(-?\d+)/m) {
$job->{exitstatus} = $1;
}
if($stdout =~ /resources_used\.walltime\=(\d+)\:(\d+)\:(\d+)/m) {
my $time = $1 * 3600;
$time += $2 * 60;
$time += $3;
$job->{realtime} = parseInterval(seconds => $time, Small => 1);
}
}
1;