The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

#-- Pragmas --------------------------------------------------------------------

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    t/lib
    ../lib
);

# ENVIRONMENT ------------------------------------------------------------------

use Test::More;

#-- verify load the module

BEGIN {
    eval 'use Test::Exception';     ## no critic
    plan skip_all => "because Test::Exception required for testing" if $@;
}

BEGIN {
    eval 'use Test::TCP';           ## no critic
    plan skip_all => "because Test::TCP required for testing" if $@;
}

BEGIN {
    eval 'use Test::NoWarnings';    ## no critic
    plan skip_all => 'because Test::NoWarnings required for testing' if $@;
}

plan 'no_plan';

#-- load the modules -----------------------------------------------------------

use IO::Socket::INET;
use Net::EmptyPort qw(
    empty_port
);
use POSIX ':signal_h';
use Socket qw(
    AF_INET
    AF_INET6
    PF_INET
    PF_INET6
    inet_aton
    inet_ntop
);
use Sub::Install;
use Sys::SigAction qw(
    set_sig_handler
);
use Time::HiRes qw();

use Kafka qw(
    $IP_V4
    $IP_V6
    $KAFKA_SERVER_PORT
    $REQUEST_TIMEOUT
);
use Kafka::IO;
use Kafka::TestInternals qw(
    @not_posint
    @not_posnumber
    @not_string
);

#-- setting up facilities ------------------------------------------------------

# See Kafka::IO
use constant DEBUG  => 0;
#use constant DEBUG  => 1;
#use constant DEBUG  => 2;

Kafka::IO->debug_level( DEBUG ) if DEBUG;

STDOUT->autoflush;

#-- declarations ---------------------------------------------------------------

my ( $server, $port, $io, $sig_handler, $marker_signal_handling, $original, $timer, $timeout, $sent, $resp, $test_message, $inet_aton, $hostname );

$inet_aton = inet_aton( '127.0.0.1' );  # localhost
$hostname = gethostbyaddr( $inet_aton, AF_INET );

my $server_code = sub {
    my ( $port ) = @_;

    my $sock = IO::Socket::INET->new(
        LocalPort   => $port,
        LocalAddr   => $hostname,
        Proto       => 'tcp',
        Listen      => 5,
        Type        => SOCK_STREAM,
        ReuseAddr   => 1,
    ) or die "Cannot open server socket $hostname:$port : $!";

    $SIG{TERM} = sub { exit };

    while ( my $remote = $sock->accept ) {
        while ( my $line = <$remote> ) {
            print { $remote } $line;
        }
    }
};

sub debug_msg {
    my ( $message ) = @_;

    return if Kafka::IO->debug_level != 2;

    diag '[ time = ', Time::HiRes::time(), ' ] ', $message;
}

#-- Global data ----------------------------------------------------------------

my $server_port = empty_port( $KAFKA_SERVER_PORT );
$server = Test::TCP->new(
    code    => $server_code,
    port    => $server_port,
);
$port = $server->port;
ok $port, "server port = $port";
wait_port( $port );

$test_message = "Test message\n";

# INSTRUCTIONS -----------------------------------------------------------------

# NOTE: In the process of Kafka::IO->new we are working with alarm clock internally

#-- ALRM handler

debug_msg( 'ALRM handler verification' );

# cancel the previous timer
alarm 0;

$sig_handler = set_sig_handler( SIGALRM ,sub {
        ++$marker_signal_handling;
        debug_msg( 'SIGALRM: signal handler triggered' );
    }
);
ok( !defined( $marker_signal_handling ), 'marker signal handling not defined' );

throws_ok {
    debug_msg( "ALRM handler: host => 'something bad'" );
    $io = Kafka::IO->new(
        host    => 'something bad',
        port    => $port,
        timeout => $REQUEST_TIMEOUT,
    );
} 'Kafka::Exception::IO', 'error thrown';

debug_msg( "ALRM handler: host => $hostname" );
eval {
    $io = Kafka::IO->new(
        host    => $hostname,
        port    => $port,
        timeout => $REQUEST_TIMEOUT,
    );
};
SKIP: {
    skip "gethostbyname( '$hostname' ) takes too long: $@" if $@;

    isa_ok( $io, 'Kafka::IO' );

    ok( !defined( $marker_signal_handling ), 'marker signal handling not defined' );
    # signal handler triggered
    kill ALRM => $$;
    is $marker_signal_handling, 1, 'the signal handler to be reset to the previous value';

    #-- ALRM timer

    # Kafka::IO->new is badly ended before 'timer' and before 'timeout'

    # cancel the previous timer
    alarm 0;

    $SIG{ALRM} = sub {
        ++$marker_signal_handling;
        debug_msg( 'SIGALRM: signal handler triggered' );
    };
    $timer      = 10;
    $timeout    = $timer;

    debug_msg( "Kafka::IO->new is badly ended before 'timer' and before 'timeout'" );
    debug_msg( "timer = $timer, timeout = $timeout, host => 'something bad'" );
    $marker_signal_handling = 0;
    eval {
        alarm $timer;
        eval {
            $io = Kafka::IO->new(
                host    => 'something bad',
                port    => $port,
                timeout => $timeout,
            );
        };
        alarm 0;
    };
    ok !$marker_signal_handling, 'signal handler is not triggered';

    # Kafka::IO->new is correctly ended before 'timer' and before 'timeout'

    # cancel the previous timer
    alarm 0;

    $SIG{ALRM} = sub {
        ++$marker_signal_handling;
        debug_msg( 'SIGALRM: signal handler triggered' );
    };
    $timer      = 10;
    $timeout    = $timer + 5;

    $original = \&Kafka::IO::_gethostbyname;
    Sub::Install::reinstall_sub( {
        code    => sub {
            my $self = shift;

            $self->{af} = AF_INET;
            $self->{pf} = PF_INET;
            $self->{ip} = '127.0.0.1';

            debug_msg( '_gethostbyname called (without sleep)' );

            return $self->{ip};
        },
        into    => 'Kafka::IO',
        as      => '_gethostbyname',
    } );

    debug_msg( "Kafka::IO->new is correctly ended before 'timer' and before 'timeout'" );
    debug_msg( "timer = $timer, timeout = $timeout, host => 'something bad'" );
    $marker_signal_handling = 0;
    eval {
        alarm $timer;
        eval {
            $io = Kafka::IO->new(
                host    => 'something bad',
                port    => $port,
                timeout => $timeout,
            );

            ok !$marker_signal_handling, 'signal handler is not triggered yet';
            # 'sleep' to be interrupted by an external signal
            sleep $timeout * 2;
        };
        alarm 0;
    };
    is $marker_signal_handling, 1, 'signal handler is triggered';

    Sub::Install::reinstall_sub( {
        code    => $original,
        into    => 'Kafka::IO',
        as      => '_gethostbyname',
    } );

    # Kafka::IO->new is correctly ended after 'timer' and before 'timeout'

    # cancel the previous timer
    alarm 0;

    $SIG{ALRM} = sub {
        ++$marker_signal_handling;
        debug_msg( 'SIGALRM: signal handler triggered' );
    };
    $timer      = 10;
    $timeout    = $timer + 10;

    Sub::Install::reinstall_sub( {
        code    => sub {
            my $self = shift;

            $self->{af} = AF_INET;
            $self->{pf} = PF_INET;
            $self->{ip} = '127.0.0.1';

            debug_msg( '_gethostbyname called (sleep ', $timer + 5, ')' );
            # 'sleep' should not be interrupted by an external signal
            sleep $timer + 5;

            return $self->{ip};
        },
        into    => 'Kafka::IO',
        as      => '_gethostbyname',
    } );

    debug_msg( "Kafka::IO->new is correctly ended after 'timer' and before 'timeout'" );
    debug_msg( "timer = $timer, timeout = $timeout, host => $hostname" );
    $marker_signal_handling = 0;
    eval {
        alarm $timer;
        eval {
            $io = Kafka::IO->new(
                host    => $hostname,
                port    => $port,
                timeout => $timeout,
            );
        };
        alarm 0;
    };
    is $marker_signal_handling, 1, 'signal handler is triggered';

    Sub::Install::reinstall_sub( {
        code    => $original,
        into    => 'Kafka::IO',
        as      => '_gethostbyname',
    } );

    debug_msg( "external 'alarm' tested" );

    #-- _is_alive

    $io = Kafka::IO->new(
        host    => $hostname,
        port    => $port,
        timeout => $REQUEST_TIMEOUT,
    );

    ok $io->_is_alive, 'socket alive';

    #-- close

    ok $io->{socket}, 'socket defined';
    $io->close;
    ok !$io->{socket}, 'socket not defined';

    #-- _is_alive

    ok !$io->_is_alive, 'socket not alive';

    #-- send

    $io = Kafka::IO->new(
        host    => $hostname,
        port    => $port,
        timeout => $REQUEST_TIMEOUT,
    );

    lives_ok { $sent = $io->send( $test_message ); } 'expecting to live';
    is $sent, length( $test_message ), 'sent '.length( $test_message ).' bytes';

    #-- receive

    lives_ok { $resp = $io->receive( length( $test_message ) ); } 'expecting to live';
    is( $$resp, $test_message, 'receive OK' );

    # ip_version

    foreach my $ip_version ( undef, $IP_V4 ) {
        $io = Kafka::IO->new(
            host        => '127.0.0.1',
            port        => $port,
            ip_version  => $ip_version,
        );
        is $io->{af}, AF_INET, 'af OK';
        is $io->{pf}, PF_INET, 'pf OK';
        is $io->{ip}, '127.0.0.1', 'ip OK';
    }

    foreach my $ip_version ( undef, $IP_V4 ) {
        $io = Kafka::IO->new(
            host        => 'localhost',
            port        => $port,
            ip_version  => $ip_version,
        );
        is $io->{af}, AF_INET, 'af OK';
        is $io->{pf}, PF_INET, 'pf OK';
        is $io->{ip}, inet_ntop( AF_INET, scalar( gethostbyname( 'localhost' ) ) ), 'ip OK';
    }

    my $host = '127.0.0.1';
    throws_ok {
        Kafka::IO->new(
            host        => $host,
            port        => $port,
            ip_version  => $IP_V6,
        );
    } 'Kafka::Exception::IO', "bad ip_version for $host";

    $host = 'localhost';
    dies_ok {
        Kafka::IO->new(
            host        => $host,
            port        => $port,
            ip_version  => $IP_V6,
        );
    } "bad ip_version for $host";

    #-- close connection

    undef $server;
    ok $io, 'IO exists';
    ok !$io->_is_alive, 'socket not alive';
#    throws_ok { $sent = $io->send( $test_message ); } 'Kafka::Exception::IO', 'error thrown';
}

# POSTCONDITIONS ---------------------------------------------------------------

undef $server;