The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

#-- Pragmas --------------------------------------------------------------------

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    t/lib
    ../lib
);

# ENVIRONMENT ------------------------------------------------------------------

use Test::More;

BEGIN {
    plan skip_all => 'Unknown base directory of Kafka server'
        unless $ENV{KAFKA_BASE_DIR};
}

#-- verify load the module

BEGIN {
    eval 'use Test::NoWarnings';    ## no critic
    plan skip_all => 'because Test::NoWarnings required for testing' if $@;
}

plan 'no_plan';

#-- load the modules -----------------------------------------------------------

use Kafka::Cluster;
use Kafka::Connection;

#-- setting up facilities ------------------------------------------------------

my $CLUSTER = Kafka::Cluster->new(
    kafka_dir           => $ENV{KAFKA_BASE_DIR},    # WARNING: must match the settings of your system
    replication_factor  => 1,
);

#-- Global data ----------------------------------------------------------------

#-- Connecting to the Kafka server port (for example for node_id = 0)
my ( $PORT ) =  $CLUSTER->servers;

#-- declarations ---------------------------------------------------------------

my ( $CONNECTION, $HOSTS, $IO_CACHE );

sub new_connection {
    my ( $port ) = @_;

    # connecting to the Kafka server port
    my $connection = Kafka::Connection->new(
        host    => 'localhost',
        port    => $port,
    );
    isa_ok( $connection, 'Kafka::Connection' );

    # simple communication
    is scalar( $connection->get_known_servers() ), 1, 'Known only one server';
    my ( $server ) = $connection->get_known_servers();
    ok $connection->is_server_known( $server ), 'known server';
    my $metadata = $connection->get_metadata;
    ok $metadata, 'communication OK';

    my $IO_cache = $connection->{_IO_cache};
    my @hosts = keys %$IO_cache;
    ok scalar( @hosts ), 'IO cache filled';

    is_sockets_opened( \@hosts, $IO_cache );

    return( $connection, \@hosts, $IO_cache );
}

sub is_sockets_opened {
    my ( $hosts, $IO_cache ) = @_;

    foreach my $host_port ( @$hosts ) {
        my $io = $IO_cache->{ $host_port }->{IO};
        is ref( $io ), 'Kafka::IO', 'Kafka::IO';
        my $socket = $io->{socket};
        ok defined( $socket ), 'socket exists';
        my $fn = fileno $socket;
        ok defined( $fn ), 'socket opened';
    }
}

sub is_sockets_closed {
    my ( $hosts, $IO_cache ) = @_;

    foreach my $host_port ( @$hosts ) {
        my $io = $IO_cache->{ $host_port }->{IO};
        ok !defined( $io ), 'IO (socket) closed';
    }
}

# INSTRUCTIONS -----------------------------------------------------------------

( $CONNECTION, $HOSTS, $IO_CACHE ) = new_connection( $PORT );

undef $CONNECTION;
is_sockets_opened( $HOSTS, $IO_CACHE );

( $CONNECTION, $HOSTS, $IO_CACHE ) = new_connection( $PORT );

$CONNECTION->close;
is_sockets_closed( $HOSTS, $IO_CACHE );

# POSTCONDITIONS ---------------------------------------------------------------

$CLUSTER->close;