package ElasticSearch::TestServer;
$ElasticSearch::TestServer::VERSION = '0.68';
use strict;
use warnings;
use ElasticSearch();
use POSIX 'setsid';
use IO::Socket();
use File::Temp 0.22 ();
use File::Spec::Functions qw(catfile);
use YAML qw(DumpFile);
use File::Path qw(rmtree);
use parent 'ElasticSearch';
=head1 NAME
ElasticSearch::TestServer - Start an ElasticSearch cluster for testing
=head1 SYNOPSIS
use ElasticSearch::TestServer;
$ENV{ES_HOME} = '/path/to/elasticsearch';
$ENV{ES_TRANSPORT} = 'http';
my $es = ElasticSearch::TestServer->new(
home => '/path/to/elasticsearch',
instances => 3,
transport => 'http',
ip => '127.0.0.1',
trace_calls => 'logfile',
port => '9200',
config => { values to override}
);
=head1 DESCRIPTION
ElasticSearch::TestServer is a utility module which will start an
ElasticSearch cluster intended for testing, and shut the cluster
down at the end, even if your code exits abnormally.
By default, it uses C<http> transport, the C<local> gateway, and
starts 3 instances on C<localhost>, starting with C<port> 9200 if
the C<transport> is C<http>, C<httplite>, C<httptiny>, C<curl>, C<aehttp>
or 9500 if C<thrift>.
It is a subclass of L<ElasticSearch>, so C<< ElasticSearch::TestServer->new >>
returns an ElasticSearch instance.
=cut
#===================================
sub new {
#===================================
my $class = shift;
my %params = (
home => $ENV{ES_HOME},
transport => $ENV{ES_TRANSPORT} || 'http',
instances => 3,
ip => '127.0.0.1',
ref $_[0] eq 'HASH' ? %{ shift() } : @_
);
my $home = delete $params{home} or die <<NO_HOME;
************************************************************
ElasticSearch home directory not specified
Please either set \$ENV{ES_HOME} or pass a value
for 'home' to new()
************************************************************
NO_HOME
my $transport = $params{transport};
my $port = delete $params{port}
|| ( $transport eq 'thrift' ? 9500 : 9200 );
my $instances = delete $params{instances};
my $plugin = $ElasticSearch::Transport::Transport{$transport}
or die "Unknown transport '$transport'";
eval "require $plugin" or die $@;
$plugin->_make_sync if $plugin->can('_make_sync');
my $protocol = $plugin->protocol;
my %config = (
cluster => { name => 'es_test' },
gateway => { type => 'local', expected_nodes => $instances },
network => { host => 'localhost' },
"$protocol.port" => "$port-" . ( $port + $instances - 1 ),
%{ $params{config} || {} }
);
my $ip = $config{network}{host} = delete $params{ip};
my @servers = map {"$ip:$_"} $port .. $port + $instances - 1;
my @publish = map {"$ip:$_"} 9300 .. 9300 + $instances - 1;
$config{'discovery.zen.ping.unicast.hosts'} = \@publish;
foreach (@servers) {
if ( IO::Socket::INET->new($_) ) {
die <<RUNNING;
************************************************************
There is already a server running on $_.
Please shut it down before starting the test server
************************************************************
RUNNING
}
}
my $server = $servers[0];
print "Starting test server installed in $home\n";
my $cmd = catfile( $home, 'bin', 'elasticsearch' );
my $pid_file = File::Temp->new;
my $blank_config = File::Temp->new( SUFFIX => '.yml' );
my $config_path = $blank_config->filename();
my $dir = '';
my $dirname = '';
my $PIDs = [];
unless ( $config{path}{data} ) {
$dir = File::Temp->newdir(
'elastic_XXXXX',
CLEANUP => 0,
TMPDIR => 1
);
$dirname = $config{path}{data} = $dir->dirname;
}
my $old_SIGINT = $SIG{INT};
my $new_SIGINT = sub {
$class->_shutdown_servers( $PIDs, $dirname );
if ( ref $old_SIGINT eq 'CODE' ) {
return $old_SIGINT->();
}
exit(1);
};
$SIG{INT} = $new_SIGINT;
DumpFile( $blank_config->filename, \%config );
for ( 1 .. $instances ) {
print "Starting test node $_\n";
my $int_caught = 0;
local $SIG{INT} = sub { $int_caught++; };
defined( my $pid = fork ) or die "Couldn't fork a new process: $!";
if ( $pid == 0 ) {
die "Can't start a new session: $!" if setsid == -1;
exec( $cmd, '-p', $pid_file->filename,
'-Des.config=' . $config_path );
}
else {
sleep 1;
open my $pid_fh, '<', $pid_file->filename;
my $pid = <$pid_fh>;
die "ES is running, but no PID found" unless $pid;
chomp $pid;
push @$PIDs, $pid;
}
$new_SIGINT->() if $int_caught;
}
print "Waiting for servers to warm up\n";
my $timeout = 20;
while (@servers) {
if ( IO::Socket::INET->new( $servers[0] ) ) {
print "Node running on $servers[0]\n";
shift @servers;
}
else {
sleep 1;
}
$timeout--;
last if $timeout == 0;
}
if (@servers) {
eval { $class->_shutdown_servers( $PIDs, $dirname ) };
die "Couldn't start $instances nodes for transport $transport";
}
my $es = eval {
$class->SUPER::new(
%params,
servers => $server,
trace_calls => $params{trace_calls},
transport => $transport,
pids => $PIDs,
tmpdir => $dirname,
);
};
unless ($es) {
my $error = $@;
$class->_shutdown_servers( $PIDs, $dirname );
die $error;
}
my $attempts = 20;
while (1) {
eval { @{ $es->refresh_servers } == $instances } && last;
die("**** Couldn't connect to ElasticSearch at $server ****\n")
unless --$attempts;
print "Connection failed. Retrying\n";
sleep 1;
}
print "Connected\n";
return $es;
}
#===================================
sub pids {
#===================================
my $self = shift;
if (@_) {
$self->{_pids} = shift;
}
return $self->{_pids};
}
#===================================
sub tmpdir {
#===================================
my $self = shift;
if (@_) {
$self->{_tmpdir} = shift;
}
return $self->{_tmpdir};
}
#===================================
sub _shutdown_servers {
#===================================
my ( $self, $PIDs, $dir ) = @_;
local $?;
$PIDs = $self->pids unless defined $PIDs;
$dir = $self->tmpdir unless defined $dir;
return unless $PIDs;
kill 9, @$PIDs;
sleep 1;
while (1) { last if wait == -1 }
if ( defined $dir ) {
rmtree( $dir, { error => \my $error } );
}
undef $dir;
}
sub DESTROY { shift->_shutdown_servers; }
=head1 AUTHOR
Clinton Gormley, E<lt>clinton@traveljury.comE<gt>
=head1 COPYRIGHT AND LICENSE
Copyright (C) 2011 by Clinton Gormley
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.7 or,
at your option, any later version of Perl 5 you may have available.
=cut
1