The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package TAP::Harness::ClusterMaster;
use strict;
use vars qw($VERSION @ISA);
use IO::Socket;
use IO::Select;
use TAP::Parser::ResultFactory;
use TAP::Parser::Iterator::ClusterSlave;
use TAP::Harness;
@ISA = qw(TAP::Harness);

=head1 NAME

TAP::Harness::ClusterMaster - Run tests across remote hosts with Slaves

=head1 VERSION

Version 0.01

=cut

$VERSION = '0.01';

=head1 DESCRIPTION

This is a simple test harness which allows tests to be run on remote hosts via
slaves and results automatically aggregated and output to STDOUT.

=head1 SYNOPSIS

 use TAP::Harness::ClusterMaster;
 my $harness = TAP::Harness::ClusterMaster->new( \%args );
 $harness->runtests(@tests);

=cut

our $COOKIE                          = 'cookie';
our $DEFAULT_SLAVE_STARTUP_CALLBACK  = sub {};
our $DEFAULT_SLAVE_TEARDOWN_CALLBACK = sub {};

sub new {
    my ($class, @args) = @_;
    my $self = $class->SUPER::new(@args);
    $self->{multiplexer_class} = 'TAP::Parser::Multiplexer::ClusterSockets';
    $self->{slave_startup_callback}  = $DEFAULT_SLAVE_STARTUP_CALLBACK;
    $self->{slave_teardown_callback} = $DEFAULT_SLAVE_TEARDOWN_CALLBACK;
    $self->{credentials}             = "$COOKIE - " . time;
    print STDERR "SLAVE CREDENTIALS: '" . $self->{credentials} . "'\n";
    return $self;
}

sub slave_startup_callback {
    my ($self, $new_callback) = @_;
    if ($new_callback) {
        $self->{slave_startup_callback} = $new_callback;
    }
    return $self->{slave_startup_callback};
}

sub slave_teardown_callback {
    my ($self, $new_callback) = @_;
    if ($new_callback) {
        $self->{slave_teardown_callback} = $new_callback;
    }
    return $self->{slave_teardown_callback};
}

sub start_listening_for_slaves {
    my ($self, $jobs) = @_;
    my $server;

    my $retry = 0;
    while (!$server and $retry++ < 5) {
        $server = IO::Socket::INET->new(
            Proto     => 'tcp',
            LocalPort => $TAP::Harness::ClusterMaster::LISTEN_PORT,
            Listen    => $jobs,
            ReuseAddr => 1,
            Timeout   => 0,
            Blocking  => 0
        );
    }

    if (!$server) {
        die "Unable to create server";
    }

    $self->{'master_listen_port'} = $server->sockport;

    return $server;
}

sub detect_new_slaves {
    my ($self, $server) = @_;
    my @new_connections = ();

    while (my $connection = $server->accept) {
        my $select = IO::Select->new($connection);
        my @ready_to_read = $select->can_read(3);

        # Connections should have responded by now
        if (@ready_to_read) {
            # This isn't perfect, but I want this to be simple and am choosing
            # this risk.  A client could connect and send data without a line
            # return.  That would cause us to hang here.  Also, they could send
            # us an extraordinarily huge amount of data on that first line,
            # which would result in a potential out-of-memory fault.  This is
            # the price of simplistic I/O.
            my $credentials = $connection->getline;
            $credentials =~ s/[\n\r]//g;

            # Only connections with the right credentials are accepted
            if ($credentials eq $self->{credentials}) {
                $connection->blocking(0);
                push @new_connections, $connection;
            }
        }
    }

    return @new_connections;
}

sub aggregate_tests {
    my ( $self, $aggregate, @tests ) = @_;

    my $jobs      = $self->jobs;
    my $scheduler = $self->make_scheduler(@tests);

    # #12458
    local $ENV{HARNESS_IS_VERBOSE} = 1
      if $self->formatter->verbosity > 0;

    # Formatter gets only names.
    $self->formatter->prepare( map { $_->description } $scheduler->get_all );

    my $server             = $self->start_listening_for_slaves($jobs);
    my $slave_startup_data = $self->slave_startup_callback->($self, $aggregate, @tests);

    $self->callback('after_runtests' => sub {
                        my $aggregate = shift;
                        $self->slave_teardown_callback->($self, $aggregate, $slave_startup_data)
                   });

    my @slaves;
    while (!(@slaves = $self->detect_new_slaves($server))) {
        sleep(1);
    }
    my $mux = $self->_construct($self->multiplexer_class, @slaves );
    my $time_of_last_update = time;

    RESULT: {
        if (time > $time_of_last_update + 60) {
            $time_of_last_update = time;
            print STDERR "Waiting for response from slaves with credentials " . $self->{credentials} . "\n";
        }

        # Add slave sockets to multiplexer
        if (@slaves < $jobs) {
            my @new_slaves = $self->detect_new_slaves($server);
            if (@new_slaves) {
                $mux->add_sockets(@new_slaves);
                push @slaves, @new_slaves;
            }
        }

        # Keep multiplexer topped up
        FILL:
        while ( $mux->parsers < @slaves ) {
            my $job = $scheduler->get_job;

            # If we hit a spinner stop filling and start running.
            last FILL if !defined $job || $job->is_spinner;

            $job->{socket} = $mux->first_free_socket;
            my ( $parser, $session ) = $self->make_parser($job);
            $mux->add( $parser, [ $session, $job ] );
        }

        my ( $parser, $stash, $result ) = $mux->next;
        if (defined($stash)) {
            my ( $session, $job ) = @$stash;
            if (defined $result && ref $result->raw && $result->raw == TAP::Parser::Iterator::ClusterSlave::SLAVE_DISCONNECTED) {
                $result = undef;
                @slaves = grep {$_ != $parser->{socket}} @slaves;
                $parser->exit(255);
                $session->result(
                    TAP::Parser::ResultFactory->make_result({
                        'type' => 'unknown',
                        'raw' => 'CRITICAL ERROR: Slave process disconnected prematurely!'
                    })
                );
            }
            if ( defined $result ) {
                if (
                    !(
                        ref $result->raw &&
                        (
                            $result->raw == TAP::Parser::Iterator::ClusterSlave::SLAVE_NOT_READY_FOR_READ ||
                            $result->raw == TAP::Parser::Iterator::ClusterSlave::SLAVE_DISCONNECTED
                        )
                     )
                ) {
                    $time_of_last_update = time;
                    $self->_do_with_autoflush_on( sub { $session->result($result) } );
                    $self->_bailout($result) if $result->is_bailout;
                }
            }
            else {
                # End of parser. Automatically removed from the mux.
                $time_of_last_update = time;
                $self->_do_with_autoflush_on( sub { $self->finish_parser( $parser, $session ) } );
                $self->_after_test( $aggregate, $job, $parser );
                $job->finish;
            }
            redo RESULT;
        }
    }

    return;
}

sub _do_with_autoflush_on {
    my($self, $sub) = @_;

    my $output_fh = $self->formatter->stdout;
    my $orig_autoflush = $|;
    my $orig_fh = select $output_fh;
    local $| = 1;
    select $orig_fh;

    &$sub;
}

sub _get_parser_args {
    my $self = shift;
    my ($job) = @_;
    my $args = $self->SUPER::_get_parser_args(@_);

    $args->{iterator} = TAP::Parser::Iterator::ClusterSlave->new(
        socket      => $job->{socket},
        credentials => $self->{credentials},
        source      => delete $args->{source},
        switches    => $args->{switches},
    );

    return $args;
}

1;

__END__