The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

#-- Pragmas --------------------------------------------------------------------

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    t/lib
    ../lib
);

# ENVIRONMENT ------------------------------------------------------------------

use Test::More;

#-- verify load the module

BEGIN {
    eval 'use Test::Exception';     ## no critic
    plan skip_all => "because Test::Exception required for testing" if $@;
}

plan 'no_plan';

#-- load the modules -----------------------------------------------------------

use Const::Fast;
#use File::HomeDir;
use Params::Util qw(
    _HASH
    _INSTANCE
);
use Sub::Install;

use Kafka qw(
    $BLOCK_UNTIL_IS_COMMITTED
    $DEFAULT_MAX_NUMBER_OF_OFFSETS
    $DEFAULT_MAX_BYTES
    $DEFAULT_MAX_WAIT_TIME
    $ERROR_MISMATCH_ARGUMENT
    $ERROR_RESPOSEMESSAGE_NOT_RECEIVED
    $KAFKA_SERVER_PORT
    $MIN_BYTES_RESPOND_IMMEDIATELY
    $MIN_BYTES_RESPOND_HAS_DATA
    $NOT_SEND_ANY_RESPONSE
    $RECEIVE_EARLIEST_OFFSETS
    $RECEIVE_LATEST_OFFSET
    $RECEIVE_MAX_ATTEMPTS
    $REQUEST_TIMEOUT
    $RETRY_BACKOFF
    $SEND_MAX_ATTEMPTS
    $WAIT_WRITTEN_TO_LOCAL_LOG
);
use Kafka::Cluster;
use Kafka::Connection;
use Kafka::Internals qw(
    $APIKEY_FETCH
    $APIKEY_PRODUCE
    $APIKEY_OFFSET
    $PRODUCER_ANY_OFFSET
);
use Kafka::MockIO;
use Kafka::TestInternals qw(
    @not_array0
    @not_hash
    @not_is_like_server_list
    @not_isint
    @not_nonnegint
    @not_number
    @not_posint
    @not_string
    @not_topics_array
    $topic
);

#-- setting up facilities ------------------------------------------------------

#-- declarations ---------------------------------------------------------------

# WARNING: must match the settings of your system
#const my $KAFKA_BASE_DIR    => $ENV{KAFKA_BASE_DIR} || File::Spec->catdir( File::HomeDir->my_home, 'kafka' );
const my $KAFKA_BASE_DIR    => $ENV{KAFKA_BASE_DIR};

my ( $port, $connect, $server, $request, $response, $tmp );

sub new_ERROR_MISMATCH_ARGUMENT {
    my ( $field, @bad_values ) = @_;

    foreach my $bad_value ( @bad_values ) {
        $connect->close if $connect;
        undef $connect;
        throws_ok {
            $connect = Kafka::Connection->new(
                host                => 'localhost',
                port                => $port,
                CorrelationId       => undef,
                SEND_MAX_ATTEMPTS   => $SEND_MAX_ATTEMPTS,
                RECEIVE_MAX_ATTEMPTS => $RECEIVE_MAX_ATTEMPTS,
                RETRY_BACKOFF       => $RETRY_BACKOFF,
                $field              => $bad_value,
            );
        } 'Kafka::Exception::Connection', 'error thrown';
        ok !defined( $connect ), 'connection object is not created';
    }
}

sub is_ERROR_MISMATCH_ARGUMENT {
    my ( $function ) = @_;

    foreach my $bad_value ( @not_is_like_server_list ) {
        $connect = Kafka::Connection->new(
            host        => 'localhost',
            port        => $port,
        );

        throws_ok { $connect->$function( $bad_value->[0] ) } 'Kafka::Exception::Connection', 'error thrown';
    }
}

#-- Global data ----------------------------------------------------------------

# INSTRUCTIONS -----------------------------------------------------------------

testing();
testing( $KAFKA_BASE_DIR ) if $KAFKA_BASE_DIR;

communication_error();

sub testing {
    my ( $kafka_base_dir ) = @_;

    if ( $kafka_base_dir ) {
        #-- Connecting to the Kafka server port (for example for node_id = 0)
        ( $port ) =  Kafka::Cluster->new( kafka_dir => $KAFKA_BASE_DIR, does_not_start => 1 )->servers;
    } else {
        $port = $Kafka::MockIO::KAFKA_MOCK_SERVER_PORT;
        Kafka::MockIO::override();
    }

#-- simple start

    $connect = Kafka::Connection->new(
        host    => 'localhost',
        port    => $port,
    );
    isa_ok( $connect, 'Kafka::Connection' );

#-- get_known_servers
    is scalar( $connect->get_known_servers() ), 1, 'Known only one server';
    ( $server ) = $connect->get_known_servers();
    ok $connect->is_server_known( $server ), 'known server';
    # requests to the server has not yet been
    ok !$connect->is_server_connected( $server ), 'server is not alive';

    $connect->close;
    undef $connect;
    ok !$connect, 'connection object is destroyed';

#-- new

# host
    new_ERROR_MISMATCH_ARGUMENT( 'host', @not_string );

# port
    new_ERROR_MISMATCH_ARGUMENT( 'port', @not_posint );

# timeout
    new_ERROR_MISMATCH_ARGUMENT( 'timeout', grep { defined $_ } @not_number );

# broker_list
    new_ERROR_MISMATCH_ARGUMENT( 'broker_list', @not_array0 );
    new_ERROR_MISMATCH_ARGUMENT( 'broker_list', @not_is_like_server_list );

# CorrelationId
    new_ERROR_MISMATCH_ARGUMENT( 'CorrelationId', @not_isint );

# SEND_MAX_ATTEMPTS
    new_ERROR_MISMATCH_ARGUMENT( 'SEND_MAX_ATTEMPTS', @not_posint );

    lives_ok {
        $connect = Kafka::Connection->new(
            host                => 'localhost',
            port                => $port,
            # legacy, will be removed in future releases
            SEND_MAX_RETRIES    => 5,
        );
    } 'SEND_MAX_RETRIES OK';

# RECEIVE_MAX_ATTEMPTS
    new_ERROR_MISMATCH_ARGUMENT( 'RECEIVE_MAX_ATTEMPTS', @not_posint );

    lives_ok {
        $connect = Kafka::Connection->new(
            host                => 'localhost',
            port                => $port,
            # legacy, will be removed in future releases
            RECEIVE_MAX_RETRIES => 5,
        );
    } 'RECEIVE_MAX_RETRIES OK';

# RETRY_BACKOFF
    new_ERROR_MISMATCH_ARGUMENT( 'RETRY_BACKOFF', @not_posint );

#-- receive_response_to_request

#-- ProduceRequest

    $connect = Kafka::Connection->new(
        host    => 'localhost',
        port    => $port,
    );

# Here and below, the query explicitly indicate ApiKey - producer and consumer must act also

    for my $mode (
            $NOT_SEND_ANY_RESPONSE,
            $WAIT_WRITTEN_TO_LOCAL_LOG,
            $BLOCK_UNTIL_IS_COMMITTED,
            99999,
        ) {

        $request = {
            ApiKey                              => $APIKEY_PRODUCE,
            CorrelationId                       => 4,   # for example
            ClientId                            => 'producer',
            RequiredAcks                        => $mode,
            Timeout                             => $REQUEST_TIMEOUT * 1000, # ms
            topics                              => [
                {
                    TopicName                   => $topic,
                    partitions                  => [
                        {
                            Partition           => 0,
                            MessageSet              => [
                                {
                                    Offset          => $PRODUCER_ANY_OFFSET,
                                    Key             => q{},
                                    Value           => 'Hello!',
                                },
                            ],
                        },
                    ],
                },
            ],
        };

        $response = $connect->receive_response_to_request( $request );
        ok _HASH( $response ), 'response is received';
#use Data::Dumper;
#say Data::Dumper->Dump( [ $response ], [ 'produce_response' ] );
    }

#-- FetchRequest

    for my $mode (
        $MIN_BYTES_RESPOND_IMMEDIATELY,
        $MIN_BYTES_RESPOND_HAS_DATA,
        ) {

        $request = {
            ApiKey                              => $APIKEY_FETCH,
            CorrelationId                       => 0,
            ClientId                            => 'consumer',
            MaxWaitTime                         => $DEFAULT_MAX_WAIT_TIME,
            MinBytes                            => $mode,
            topics                              => [
                {
                    TopicName                   => $topic,
                    partitions                  => [
                        {
                            Partition           => 0,
                            FetchOffset         => 0,
                            MaxBytes            => $DEFAULT_MAX_BYTES,
                        },
                    ],
                },
            ],
        };

        $response = $connect->receive_response_to_request( $request );
        ok _HASH( $response ), 'response is received';
#use Data::Dumper;
#say Data::Dumper->Dump( [ $response ], [ 'fetch_response' ] );
    }

#-- OffsetRequest

    for my $mode (
        $RECEIVE_EARLIEST_OFFSETS,
        $RECEIVE_LATEST_OFFSET,
        ) {

        $request = {
            ApiKey                              => $APIKEY_OFFSET,
            CorrelationId                       => 0,
            ClientId                            => 'consumer',
            topics                              => [
                {
                    TopicName                   => $topic,
                    partitions                  => [
                        {
                            Partition           => 0,
                            Time                => $mode,
                            MaxNumberOfOffsets  => $DEFAULT_MAX_NUMBER_OF_OFFSETS,
                        },
                    ],
                },
            ],
        };

        $response = $connect->receive_response_to_request( $request );
        ok _HASH( $response ), 'response is received';
#use Data::Dumper;
#say Data::Dumper->Dump( [ $response ], [ 'offset_response' ] );
    }

#-- get_known_servers
    ok scalar( $connect->get_known_servers() ), 'Known some servers';

#-- is_server_connected
    foreach my $server ( $connect->get_known_servers() ) {
        if ( $connect->is_server_connected( $server ) ) {
            ok $connect->is_server_connected( $server ), 'server is connected';
            ok $connect->close_connection( $server ), 'close connection';
            ok !$connect->is_server_connected( $server ), 'server is not connected';
        }
    }
    is_ERROR_MISMATCH_ARGUMENT( 'is_server_connected' );

#-- is_server_alive
    foreach my $server ( $connect->get_known_servers() ) {
        ok $connect->is_server_alive( $server ), 'server is alive';
        ok $connect->close_connection( $server ), 'close connection';
        ok $connect->is_server_alive( $server ), 'server is alive';
    }
    is_ERROR_MISMATCH_ARGUMENT( 'is_server_alive' );
    throws_ok { $connect->is_server_alive( 'nothing:9999' ) } 'Kafka::Exception::Connection', 'error thrown';

#-- get_metadata
    my $metadata = $connect->get_metadata( $topic );
    ok $metadata, 'metadata received';
    ok scalar( keys %$metadata ) == 1 && exists( $metadata->{ $topic } ), "metadata for '$topic' only";
    if ( $kafka_base_dir ) {
        $metadata = $connect->get_metadata();
        ok $metadata, 'metadata received (all topics)';
        ok scalar( keys %$metadata ) > 1 && exists( $metadata->{ $topic } ), 'metadata for all topics';
        $metadata = $connect->get_metadata( $topic );
        ok $metadata, 'metadata received';
        ok scalar( keys %$metadata ) == 1 && exists( $metadata->{ $topic } ), "metadata for '$topic' only";
        ok scalar( keys %{ $connect->{_metadata} } ) > 1 && exists( $connect->{_metadata}->{ $topic } ), 'metadata for all topics present';
        delete $connect->{_metadata}->{ $topic };
        $metadata = $connect->get_metadata( $topic );
        ok scalar( keys %{ $connect->{_metadata} } ) > 1 && exists( $connect->{_metadata}->{ $topic } ), 'metadata for all topics';
        ok $metadata, 'metadata received';
        throws_ok { $connect->get_metadata( '' ) } 'Kafka::Exception::Connection', 'error thrown';
    }

#-- exists_topic_partition
    ok $connect->exists_topic_partition( $topic, 0 ), 'existing topic';
    ok !$connect->exists_topic_partition( 99999, 0 ), 'not yet existing topic';
    ok !$connect->exists_topic_partition( $topic, 99999 ), 'not yet existing topic';

#-- is_server_known
    is_ERROR_MISMATCH_ARGUMENT( 'is_server_known' );

    foreach my $server ( $connect->get_known_servers() ) {
        ok $connect->is_server_known( $server ), 'known server';
    }

#-- close_connection
    is_ERROR_MISMATCH_ARGUMENT( 'close_connection' );

#-- close
    $connect->receive_response_to_request( $request );
    $tmp = 0;
    foreach my $server ( $connect->get_known_servers() ) {
        ++$tmp if $connect->is_server_connected( $server );
    }
    ok( $tmp, 'server is alive' );
    $connect->close;
    $tmp = 0;
    foreach my $server ( $connect->get_known_servers() ) {
        ++$tmp if $connect->is_server_connected( $server );
    }
    ok !$tmp, 'server is not connected';

#-- finish
    Kafka::MockIO::restore()
        unless $kafka_base_dir;
}

sub communication_error {

    $port = $Kafka::MockIO::KAFKA_MOCK_SERVER_PORT;
    Kafka::MockIO::override();
    my $method = \&Kafka::IO::send;

    our $_attempt;
    Sub::Install::reinstall_sub( {
        code    => sub {
            my ( $self ) = @_;
            if ( $main::_attempt++ ) {
                $self->_error( $ERROR_MISMATCH_ARGUMENT );
            } else {
                return &$method( @_ );
            }
        },
        into    => 'Kafka::IO',
        as      => 'send',
    } );

    $connect = Kafka::Connection->new(
        host        => 'localhost',
        port        => $port,
    );

    my $errors = $connect->cluster_errors;
    ok !%$errors, 'no errors';
    eval { $response = $connect->receive_response_to_request( $request ); };
    isa_ok( $@, 'Kafka::Exception' );
    $errors = $connect->cluster_errors;
    is scalar( keys %$errors ), scalar( $connect->get_known_servers ), 'communication errors';

    Sub::Install::reinstall_sub( {
        code    => $method,
        into    => 'Kafka::IO',
        as      => 'send',
    } );

#-- $ERROR_RESPOSEMESSAGE_NOT_RECEIVED

    $method = \&Kafka::IO::receive;

    Sub::Install::reinstall_sub( {
        code    => sub {
            my ( $self, $length ) = @_;
            my $only_MessageSize;
            if ( $length == 4 ) {
                $only_MessageSize = pack( q{l>}, 0 );
            } else {
                $only_MessageSize = q{};
            }
            return \$only_MessageSize;
        },
        into    => 'Kafka::IO',
        as      => 'receive',
    } );

#    $connect = Kafka::Connection->new(
#        host        => 'localhost',
#        port        => $port,
#    );

    $request = {
        ApiKey                              => $APIKEY_FETCH,
        CorrelationId                       => 0,
        ClientId                            => 'consumer',
        MaxWaitTime                         => $DEFAULT_MAX_WAIT_TIME,
        MinBytes                            => $MIN_BYTES_RESPOND_IMMEDIATELY,
        topics                              => [
            {
                TopicName                   => $topic,
                partitions                  => [
                    {
                        Partition           => 0,
                        FetchOffset         => 0,
                        MaxBytes            => $DEFAULT_MAX_BYTES,
                    },
                ],
            },
        ],
    };

    eval { $response = $connect->receive_response_to_request( $request ); };
    isa_ok( $@, 'Kafka::Exception' );
    is $@->code, $ERROR_RESPOSEMESSAGE_NOT_RECEIVED, '$ERROR_RESPOSEMESSAGE_NOT_RECEIVED OK';

    Sub::Install::reinstall_sub( {
        code    => $method,
        into    => 'Kafka::IO',
        as      => 'receive',
    } );

    Kafka::MockIO::restore();
}

# POSTCONDITIONS ---------------------------------------------------------------