The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

#-- Pragmas --------------------------------------------------------------------

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    t/lib
    ../lib
);

# ENVIRONMENT ------------------------------------------------------------------

use Test::More;

BEGIN {
    plan skip_all => 'Unknown base directory of Kafka server'
        unless defined $ENV{KAFKA_BASE_DIR};
}

#-- verify load the module

BEGIN {
    eval 'use Test::Exception';     ## no critic
    plan skip_all => "because Test::Exception required for testing" if $@;
}

BEGIN {
    eval 'use Test::NoWarnings';    ## no critic
    plan skip_all => 'because Test::NoWarnings required for testing' if $@;
}

plan 'no_plan';

#-- load the modules -----------------------------------------------------------

use Const::Fast;

use Kafka::Cluster;
use Kafka::Connection;
use Kafka::MockIO;
use Kafka::TestInternals qw(
    $topic
);

#-- setting up facilities ------------------------------------------------------

#-- declarations ---------------------------------------------------------------

# WARNING: must match the settings of your system
const my $KAFKA_BASE_DIR    => $ENV{KAFKA_BASE_DIR};

#-- Global data ----------------------------------------------------------------

my $partition = $Kafka::MockIO::PARTITION;;

# INSTRUCTIONS -----------------------------------------------------------------

testing();

sub testing {
    #-- Connecting to the Kafka server port (for example for node_id = 0)
    my $cluster =  Kafka::Cluster->new( kafka_dir => $KAFKA_BASE_DIR, does_not_start => 1 );
    my @server_ports = $cluster->servers;
    my $port =  $server_ports[0];

#-- simple start
    my $connect = Kafka::Connection->new(
        host    => 'localhost',
        port    => $port,
    );
    isa_ok( $connect, 'Kafka::Connection' );

#-- stop leader
    my ( $leader_server, $leader_port ) = get_leader( $connect );
    ok $connect->is_server_alive( $leader_server ), 'leader is alive';
    ok $connect->is_server_connected( $leader_server ), 'leader is connected';
    $cluster->stop( $leader_port );
    ok !$connect->is_server_alive( $leader_server ), 'leader is not alive';
    ok !$connect->is_server_connected( $leader_server ), 'leader is connected';
    my ( $next_leader_server ) = get_leader( $connect );
    ok $connect->is_server_alive( $next_leader_server ), 'new leader is alive';
    ok $connect->is_server_connected( $next_leader_server ), 'new leader is connected';

#-- start previous leader
    $cluster->_remove_log_tree( $leader_port );
    $cluster->start( $leader_port );
    ok $connect->is_server_alive( $leader_server ), 'leader is alive';

#-- close
    $connect->close;
    my $tmp = 0;
    foreach my $server ( $connect->get_known_servers() ) {
        ++$tmp if $connect->is_server_connected( $server );
    }
    ok !$tmp, 'server is not connected';
}

sub get_leader {
    my ( $connect ) = @_;

    my $metadata = $connect->get_metadata( $topic );
    my $leader_id = $metadata->{ $topic }->{ $partition }->{Leader};
    my $leader_server = $connect->_find_leader_server( $leader_id );
    my ( $leader_port ) = $leader_server =~ /:(\d{1,5})$/;

    return( $leader_server, $leader_port );
}

# POSTCONDITIONS ---------------------------------------------------------------