The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package ElasticSearch::TestServer;
$ElasticSearch::TestServer::VERSION = '0.68';
use strict;
use warnings;
use ElasticSearch();
use POSIX 'setsid';
use IO::Socket();
use File::Temp 0.22 ();
use File::Spec::Functions qw(catfile);
use YAML qw(DumpFile);
use File::Path qw(rmtree);

use parent 'ElasticSearch';

=head1 NAME

ElasticSearch::TestServer - Start an ElasticSearch cluster for testing

=head1 SYNOPSIS

    use ElasticSearch::TestServer;

    $ENV{ES_HOME} = '/path/to/elasticsearch';
    $ENV{ES_TRANSPORT} = 'http';

    my $es = ElasticSearch::TestServer->new(
        home        => '/path/to/elasticsearch',
        instances   => 3,
        transport   => 'http',
        ip          => '127.0.0.1',
        trace_calls => 'logfile',
        port        => '9200',
        config      => { values to override}
    );

=head1 DESCRIPTION

ElasticSearch::TestServer is a utility module which will start an
ElasticSearch cluster intended for testing, and shut the cluster
down at the end, even if your code exits abnormally.

By default, it uses C<http> transport, the C<local> gateway, and
starts 3 instances on C<localhost>, starting with C<port> 9200 if
the C<transport> is C<http>, C<httplite>, C<httptiny>, C<curl>, C<aehttp>
 or 9500 if C<thrift>.

It is a subclass of L<ElasticSearch>, so C<< ElasticSearch::TestServer->new >>
returns an ElasticSearch instance.

=cut

#===================================
sub new {
#===================================
    my $class  = shift;
    my %params = (
        home      => $ENV{ES_HOME},
        transport => $ENV{ES_TRANSPORT} || 'http',
        instances => 3,
        ip        => '127.0.0.1',
        ref $_[0] eq 'HASH' ? %{ shift() } : @_
    );

    my $home = delete $params{home} or die <<NO_HOME;

************************************************************
    ElasticSearch home directory not specified

    Please either set \$ENV{ES_HOME} or pass a value
    for 'home' to new()

************************************************************

NO_HOME

    my $transport = $params{transport};
    my $port      = delete $params{port}
        || ( $transport eq 'thrift' ? 9500 : 9200 );
    my $instances = delete $params{instances};
    my $plugin    = $ElasticSearch::Transport::Transport{$transport}
        or die "Unknown transport '$transport'";
    eval "require  $plugin" or die $@;
    $plugin->_make_sync if $plugin->can('_make_sync');
    my $protocol = $plugin->protocol;

    my %config = (
        cluster => { name => 'es_test' },
        gateway => { type => 'local', expected_nodes => $instances },
        network => { host => 'localhost' },
        "$protocol.port" => "$port-" . ( $port + $instances - 1 ),
        %{ $params{config} || {} }
    );

    my $ip = $config{network}{host} = delete $params{ip};
    my @servers = map {"$ip:$_"} $port .. $port + $instances - 1;
    my @publish = map {"$ip:$_"} 9300 .. 9300 + $instances - 1;
    $config{'discovery.zen.ping.unicast.hosts'} = \@publish;

    foreach (@servers) {
        if ( IO::Socket::INET->new($_) ) {
            die <<RUNNING;

************************************************************

    There is already a server running on $_.
    Please shut it down before starting the test server

************************************************************
RUNNING
        }
    }

    my $server = $servers[0];

    print "Starting test server installed in $home\n";

    my $cmd          = catfile( $home, 'bin', 'elasticsearch' );
    my $pid_file     = File::Temp->new;
    my $blank_config = File::Temp->new( SUFFIX => '.yml' );
    my $config_path  = $blank_config->filename();

    my $dir     = '';
    my $dirname = '';
    my $PIDs    = [];

    unless ( $config{path}{data} ) {
        $dir = File::Temp->newdir(
            'elastic_XXXXX',
            CLEANUP => 0,
            TMPDIR  => 1
        );
        $dirname = $config{path}{data} = $dir->dirname;
    }

    my $old_SIGINT = $SIG{INT};
    my $new_SIGINT = sub {
        $class->_shutdown_servers( $PIDs, $dirname );
        if ( ref $old_SIGINT eq 'CODE' ) {
            return $old_SIGINT->();
        }
        exit(1);
    };
    $SIG{INT} = $new_SIGINT;

    DumpFile( $blank_config->filename, \%config );

    for ( 1 .. $instances ) {
        print "Starting test node $_\n";
        my $int_caught = 0;
        local $SIG{INT} = sub { $int_caught++; };
        defined( my $pid = fork ) or die "Couldn't fork a new process: $!";
        if ( $pid == 0 ) {
            die "Can't start a new session: $!" if setsid == -1;
            exec( $cmd, '-p', $pid_file->filename,
                '-Des.config=' . $config_path );
        }
        else {
            sleep 1;
            open my $pid_fh, '<', $pid_file->filename;
            my $pid = <$pid_fh>;
            die "ES is running, but no PID found" unless $pid;
            chomp $pid;
            push @$PIDs, $pid;
        }
        $new_SIGINT->() if $int_caught;
    }

    print "Waiting for servers to warm up\n";

    my $timeout = 20;
    while (@servers) {
        if ( IO::Socket::INET->new( $servers[0] ) ) {
            print "Node running on $servers[0]\n";
            shift @servers;
        }
        else {
            sleep 1;
        }
        $timeout--;
        last if $timeout == 0;
    }
    if (@servers) {
        eval { $class->_shutdown_servers( $PIDs, $dirname ) };
        die "Couldn't start $instances nodes for transport $transport";
    }

    my $es = eval {
        $class->SUPER::new(
            %params,
            servers     => $server,
            trace_calls => $params{trace_calls},
            transport   => $transport,
            pids        => $PIDs,
            tmpdir      => $dirname,
        );
    };
    unless ($es) {
        my $error = $@;
        $class->_shutdown_servers( $PIDs, $dirname );
        die $error;
    }

    my $attempts = 20;
    while (1) {
        eval { @{ $es->refresh_servers } == $instances } && last;
        die("**** Couldn't connect to ElasticSearch at $server ****\n")
            unless --$attempts;
        print "Connection failed. Retrying\n";
        sleep 1;
    }
    print "Connected\n";

    return $es;
}

#===================================
sub pids {
#===================================
    my $self = shift;
    if (@_) {
        $self->{_pids} = shift;
    }
    return $self->{_pids};
}

#===================================
sub tmpdir {
#===================================
    my $self = shift;
    if (@_) {
        $self->{_tmpdir} = shift;
    }
    return $self->{_tmpdir};
}

#===================================
sub _shutdown_servers {
#===================================
    my ( $self, $PIDs, $dir ) = @_;

    local $?;

    $PIDs = $self->pids   unless defined $PIDs;
    $dir  = $self->tmpdir unless defined $dir;

    return unless $PIDs;

    kill 9, @$PIDs;
    sleep 1;

    while (1) { last if wait == -1 }
    if ( defined $dir ) {
        rmtree( $dir, { error => \my $error } );
    }
    undef $dir;
}

sub DESTROY { shift->_shutdown_servers; }

=head1 AUTHOR

Clinton Gormley, E<lt>clinton@traveljury.comE<gt>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2011 by Clinton Gormley

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.7 or,
at your option, any later version of Perl 5 you may have available.


=cut

1