## no critic (RequireUseStrict)
package Tapper::MCP::Master;
BEGIN {
$Tapper::MCP::Master::AUTHORITY = 'cpan:AMD';
}
{
$Tapper::MCP::Master::VERSION = '4.1.0';
}
# ABSTRACT: Wait for new testruns and start a new child when needed
use 5.010;
use Moose;
use parent "Tapper::MCP";
use Devel::Backtrace;
use File::Path;
use IO::Select;
use IO::Handle;
use Log::Log4perl;
use POSIX ":sys_wait_h";
use Try::Tiny;
use UNIVERSAL;
use constant HARNESS_ACTIVE => $ENV{HARNESS_ACTIVE};
use Tapper::Cmd::Testrun;
use Tapper::MCP::Child;
use Tapper::MCP::Net;
use Tapper::MCP::Scheduler::Controller;
use Tapper::Model 'model';
has dead_child => (is => 'rw', default => 0);
has child => (is => 'rw', isa => 'HashRef', default => sub {{}});
has consolefiles => (is => 'rw', isa => 'ArrayRef', default => sub {[]});
has readset => (is => 'rw');
has scheduler => (is => 'rw', isa => 'Tapper::MCP::Scheduler::Controller');
sub BUILD
{
my $self = shift;
$self->scheduler(Tapper::MCP::Scheduler::Controller->new());
}
sub set_interrupt_handlers
{
my ($self) = @_;
$SIG{CHLD} = sub {
$self->dead_child($self->dead_child + 1);
};
# give me a stack trace when ^C
$SIG{INT} = sub {
$SIG{INT} = 'IGNORE'; # make handler reentrant, don't handle signal twice
# stop all children
$SIG{CHLD} = 'IGNORE';
foreach my $this_child (keys %{$self->child}) {
kill 15, $self->child->{$this_child}->{pid};
}
my $backtrace = Devel::Backtrace->new(-start=>2, -format => '%I. %s');
print $backtrace;
exit -1;
};
return 0;
}
sub console_open
{
my ($self, $system, $testrunid) = @_;
return "Incomplete data given to function console_open" if not $system and defined($testrunid);
my $path = $self->cfg->{paths}{output_dir}."/$testrunid/";
File::Path::mkpath($path, {error => \my $retval}) if not -d $path;
foreach my $diag (@$retval) {
my ($file, $message) = each %$diag;
return "general error: $message\n" if $file eq '';
return "Can't create $file: $message";
}
my $net = Tapper::MCP::Net->new();
my $console;
eval{
local $SIG{ALRM} = sub { die 'Timeout'; };
alarm (5);
$console = $net->conserver_connect($system);
};
alarm 0;
return "Unable to open console for $system after 5 seconds" if $@;
return $console if not ref $console eq 'IO::Socket::INET';
$console->blocking(0);
$self->readset->add($console);
$path .= "console";
my $fh;
open($fh,">>",$path) or do {
$self->readset->remove($console);
close $console;
return "Can't open console log file $path for test on host $system:$!";
};
$self->consolefiles->[$console->fileno()] = $fh;
return $console;
}
sub console_close
{
my ($self, $console) = @_;
return 0 if not ($console and $console->can('fileno'));
close $self->consolefiles->[$console->fileno()]
or return "Can't close console file:$!";
$self->consolefiles->[$console->fileno()] = undef;
$self->readset->remove($console);
my $net = Tapper::MCP::Net->new();
$net->conserver_disconnect($console);
alarm 0;
return 0;
}
sub handle_dead_children
{
my ($self) = @_;
CHILD: while ($self->dead_child) {
$self->log->debug("Number of dead children is ".$self->dead_child);
my $dead_pid = waitpid(-1, WNOHANG); # don't use wait(); qx() sends a SIGCHLD and increases $self->deadchild, but wait() for the return value and thus our wait would block
CHILDREN_CHECK: foreach my $this_child (keys %{$self->child})
{
if ($self->child->{$this_child}->{pid} == $dead_pid) {
$self->log->debug("$this_child finished");
$self->scheduler->mark_job_as_finished( $self->child->{$this_child}->{job} );
$self->console_close( $self->child->{$this_child}->{console} );
delete $self->child->{$this_child};
last CHILDREN_CHECK;
}
}
$self->dead_child($self->dead_child - 1);
}
}
sub consolelogfrom
{
my ($self, $handle) = @_;
my ($buffer, $readsize);
my $timeout = 2;
my $maxread = 1024; # XXX configure
my $errormsg;
eval {
local $SIG{ALRM} = sub { die "Timeout ($timeout) reached" };
alarm $timeout;
$readsize = sysread($handle, $buffer, $maxread);
};
alarm 0;
$errormsg = "Error while reading console: $@" if $@;
$errormsg = "Can't read from console: $!" if not defined $readsize;
my $file = $self->consolefiles->[$handle->fileno()];
return "Can't get console file:$!" if not defined $file;
$buffer .= "*** Tapper: $errormsg ***" if $errormsg;
$readsize = syswrite($file, $buffer);
return "Can't write console data to file :$!" if not defined $readsize;
return $errormsg if $errormsg;
return 0;
}
sub notify_event
{
my ($self, $event, $message) = @_;
try
{
my $new_event = model('ReportsDB')->resultset('NotificationEvent')->new({type => $event,
message => $message,
});
$new_event->insert();
} catch {
$self->log->error("Unable notify user of event $event: $_");
};
return;
}
sub run_due_tests
{
my ($self, $job, $revive) = @_;
$self->log->debug('run_due_test');
my $system = $job->host->name;
my $id = $job->testrun->id;
$self->log->info("start testrun $id on $system");
# check if this system is already active, just for error handling
$self->handle_dead_children() if $self->child->{$system};
$self->scheduler->mark_job_as_running($job) unless $revive;
my $pid = fork();
die "fork failed: $!" if (not defined $pid);
# hello child
if ($pid == 0) {
my $child = Tapper::MCP::Child->new( $id );
my $retval;
eval {
$retval = $child->runtest_handling( $system, $revive );
};
$retval = $@ if $@;
$self->notify_event('testrun_finished', {testrun_id => $id});
if ( ($retval or $child->rerun) and $job->testrun->rerun_on_error) {
my $cmd = Tapper::Cmd::Testrun->new();
my $new_id;
eval {
$new_id = $cmd->rerun($id, {rerun_on_error => $job->testrun->rerun_on_error - 1});
};
if ($@) {
$self->log->error($@);
} else {
$self->log->debug("Restarted testrun $id with new id $new_id because ".
"an error occurred and rerun_on_error was ".
$job->testrun->rerun_on_error);
}
}
if ($retval) {
$self->log->error("Testrun $id ($system) error occurred: $retval");
} else {
$self->log->info("Testrun $id ($system) finished successfully");
}
exit 0;
} else {
my $console = $self->console_open($system, $id);
if (ref($console) eq 'IO::Socket::INET') {
$self->child->{$system}->{console} = $console;
} else {
$self->log->info("Can not open console on $system: $console");
}
$self->child->{$system}->{pid} = $pid;
$self->child->{$system}->{test_run} = $id;
$self->child->{$system}->{job} = $job;
}
return 0;
}
sub runloop
{
my ($self, $lastrun) = @_;
my $timeout = $lastrun + $self->cfg->{times}{poll_intervall} - time();
$timeout = 0 if $timeout < 0;
my @ready;
# if readset is empty, can_read immediately returns with an empty
# array; this makes runloop a CPU burn loop
if ($self->readset->count) {
@ready = $self->readset->can_read( $timeout );
} else {
sleep $timeout;
}
$self->handle_dead_children() if $self->dead_child;
HANDLE:
foreach my $handle (@ready) {
if (not $handle->opened()) {
$self->readset->remove($handle);
next HANDLE;
}
my $retval = $self->consolelogfrom($handle);
if ($retval) {
$self->log->error($retval);
$self->console_close($handle);
}
}
if (($timeout <= 0) or (not @ready)) {
my @jobs;
my $pid = open(my $fh, "-|");
if ($pid == 0) {
my @jobs = $self->scheduler->get_next_job;
print join ",", map {$_->id} @jobs;
exit;
} else {
my $ids_joined = <$fh>;
{
no warnings 'uninitialized'; # we may not have ids_joined when no test is due
foreach my $next_id (split ',', $ids_joined) {
push @jobs, model('TestrunDB')->resultset('TestrunScheduling')->find($next_id);
}
}
}
foreach my $job (@jobs) {
# (WORKAROUND) try to avoid to
# children being started close
# to each other and trying to
# reset simulataneously
sleep 2 unless HARNESS_ACTIVE;
$self->run_due_tests($job);
}
$lastrun = time();
}
return $lastrun;
}
sub prepare_server
{
my ($self) = @_;
Log::Log4perl->init($self->cfg->{files}{log4perl_cfg});
# these sets are used by select()
my $select = IO::Select->new();
return "Can't create select object:$!" if not $select;
$self->readset ($select);
return "Can't create select object:$!" if not $select;
return 0;
}
sub revive_children
{
my ($self) = @_;
my $jobs = model->resultset('TestrunScheduling')->running;
foreach my $job ($jobs->all) {
$self->run_due_tests($job, "revive");
}
}
sub run
{
my ($self) = @_;
$self->set_interrupt_handlers();
$self->prepare_server();
$self->revive_children();
my $lastrun = time();
while (1) {
$lastrun = $self->runloop($lastrun);
}
}
1;
__END__
=pod
=encoding utf-8
=head1 NAME
Tapper::MCP::Master - Wait for new testruns and start a new child when needed
=head1 SYNOPSIS
use Tapper::MCP::Master;
my $mcp = Tapper::MCP::Master->new();
$mcp->run();
=head1 Attributes
=head2 dead_child
Number of pending dead child processes.
=head2 child
Contains all information about all child processes.
=head2 consolefiles
Output files for console logs ordered by file descriptor number.
=head2 readset
IO::Select object containing all opened console file handles.
=head2
Associated Scheduler object.
=head1 FUNCTIONS
=head2 set_interrupt_handlers
Set interrupt handlers for important signals. No parameters, no return values.
@return success - 0
=head2 console_open
Open console connection for given host and appropriate console log output file
for the testrun on host. Returns console on success or an error string for
failure.
@param string - system name
@param int - testrun id
@retval success - IO::Socket::INET
@retval error - error string
=head2 console_close
Close a given console connection.
@param IO::Socket::INET - console connection socket
@retval success - 0
@retval error - error string
=head2 handle_dead_children
Each test run is handled by a child process. All information needed for
communication with this child process is kept in $self->child. Reset all these
information when the test run is finished and the child process ends.
=head2 consolelogfrom
Read console log from a handle and write it to the appropriate file.
@param file handle - read from this handle
@retval success - 0
@retval error - error string
=head2 notify_event
Inform the notification framework that an event occured in MCP.
@param string - event name
@param hash ref - message
=head2 run_due_tests
Run the tests that are due.
@param TestrunScheduling - job to run
@param boolean - are we in revive mode?
@retval success - 0
@retval error - error string
=head2 runloop
Main loop of this module. Checks for new tests and runs them. The looping
itself is put outside of function to allow testing.
=head2 prepare_server
Create communication data structures used in MCP.
@return
=head2 revive_children
Restart the children that were running before MCP was shut
down/crashed. The function expects no parameters and has no return
values.
=head2 run
Set up all needed data structures then wait for new tests.
=head1 AUTHOR
AMD OSRC Tapper Team <tapper@amd64.org>
=head1 COPYRIGHT AND LICENSE
This software is Copyright (c) 2012 by Advanced Micro Devices, Inc..
This is free software, licensed under:
The (two-clause) FreeBSD License
=cut