The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

#-- Pragmas --------------------------------------------------------------------

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    t/lib
    ../lib
);

# ENVIRONMENT ------------------------------------------------------------------

use Test::More;

#-- verify load the module

BEGIN {
    eval 'use Test::NoWarnings';    ## no critic
    plan skip_all => 'because Test::NoWarnings required for testing' if $@;
}

BEGIN {
    eval 'use Test::Exception';     ## no critic
    plan skip_all => "because Test::Exception required for testing" if $@;
}

plan 'no_plan';

#-- load the modules -----------------------------------------------------------

use Const::Fast;
#use Data::Dumper;
use Sub::Install;

use Kafka qw(
    %ERROR

    $ERROR_NO_ERROR
    $ERROR_UNKNOWN
    $ERROR_OFFSET_OUT_OF_RANGE
    $ERROR_INVALID_MESSAGE
    $ERROR_UNKNOWN_TOPIC_OR_PARTITION
    $ERROR_INVALID_MESSAGE_SIZE
    $ERROR_LEADER_NOT_AVAILABLE
    $ERROR_NOT_LEADER_FOR_PARTITION
    $ERROR_REQUEST_TIMED_OUT
    $ERROR_BROKER_NOT_AVAILABLE
    $ERROR_REPLICA_NOT_AVAILABLE
    $ERROR_MESSAGE_SIZE_TOO_LARGE
    $ERROR_STALE_CONTROLLER_EPOCH_CODE
    $ERROR_OFFSET_METADATA_TOO_LARGE_CODE

    $ERROR_CANNOT_GET_METADATA
    $ERROR_LEADER_NOT_FOUND
    $ERROR_MISMATCH_ARGUMENT

    $ERROR_CANNOT_BIND
    $ERROR_CANNOT_RECV
    $ERROR_CANNOT_SEND
    $ERROR_SEND_NO_ACK
    $MIN_BYTES_RESPOND_HAS_DATA
    $RECEIVE_EARLIEST_OFFSETS
    $REQUEST_TIMEOUT
    $SEND_MAX_RETRIES
    $WAIT_WRITTEN_TO_LOCAL_LOG
);
use Kafka::Connection qw(
    %RETRY_ON_ERRORS
);
use Kafka::Consumer;
use Kafka::Producer;

use Kafka::Internals qw(
    $APIKEY_OFFSET
    $APIKEY_PRODUCE
    $PRODUCER_ANY_OFFSET
);
use Kafka::MockProtocol qw(
    encode_metadata_response
    encode_offset_response
    encode_produce_response
);
use Kafka::Protocol qw(
    encode_metadata_request
    encode_offset_request
    encode_produce_request
);

use Kafka::MockIO;

#-- setting up facilities ------------------------------------------------------

Kafka::MockIO::override();
#$Kafka::Connection::DEBUG = 1;

#-- declarations ---------------------------------------------------------------

const my $host              => $Kafka::MockIO::KAFKA_MOCK_HOSTNAME;
const my $port              => $Kafka::MockIO::KAFKA_MOCK_SERVER_PORT;
const my $topic             => $Kafka::MockIO::TOPIC;
const my $partition         => $Kafka::MockIO::PARTITION;
const my $CorrelationId     => 0;

my $decoded_produce_request = {
    ApiKey                              => $APIKEY_PRODUCE,
    CorrelationId                       => $CorrelationId,
    ClientId                            => 'producer',
    RequiredAcks                        => $MIN_BYTES_RESPOND_HAS_DATA,
    Timeout                             => $REQUEST_TIMEOUT * 1000, # ms
    topics                              => [
        {
            TopicName                   => $topic,
            partitions                  => [
                {
                    Partition           => $partition,
                    MessageSet              => [
                        {
                            Offset          => $PRODUCER_ANY_OFFSET,
                            Key             => q{},
                            Value           => 'Hello!',
                        },
                    ],
                },
            ],
        },
    ],
};
my $normal_encoded_produce_request = encode_produce_request( $decoded_produce_request );

my $decoded_offset_request = {
    ApiKey                              => $APIKEY_OFFSET,
    CorrelationId                       => $CorrelationId,
    ClientId                            => 'consumer',
    topics                              => [
        {
            TopicName                   => $topic,
            partitions                  => [
                {
                    Partition           => $partition,
                    Time                => $RECEIVE_EARLIEST_OFFSETS,
                    MaxNumberOfOffsets  => 1,
                },
            ],
        },
    ],
};
my $normal_encoded_offset_request = encode_offset_request( $decoded_offset_request );

my $data_exchange->{ $ERROR_NO_ERROR } = {
    decoded_metadata_request    => {
            CorrelationId           => $CorrelationId,
            ClientId                => q{},
            topics                  => [
                                        $topic,
            ],
    },
    decoded_metadata_response   => {
        CorrelationId               => $CorrelationId,
        Broker                      => [
            {
                NodeId                  => 2,
                Host                    => $host,
                Port                    => $port + 2,
            },
            {
                NodeId                  => 0,
                Host                    => $host,
                Port                    => $port,
            },
            {
                NodeId                  => 1,
                Host                    => $host,
                Port                    => $port + 1,
            },
        ],
        TopicMetadata               => [
            {
                ErrorCode               => 0,
                TopicName               => $topic,
                PartitionMetadata       => [
                    {
                        ErrorCode           => 0,
                        Partition           => $partition,
                        Leader              => 2,
                        Replicas            => [
                                                   2,
                                                   0,
                                                   1,
                        ],
                        Isr                 => [
                                                   2,
                                                   0,
                                                   1,
                        ],
                    },
                ],
            },
        ],
    },
};
my $normal_encoded_metadata_request  = encode_metadata_request( $data_exchange->{ $ERROR_NO_ERROR }->{decoded_metadata_request} );
my $normal_encoded_metadata_response = encode_metadata_response( $data_exchange->{ $ERROR_NO_ERROR }->{decoded_metadata_response} );

#-- NON-FATAL errors

$data_exchange->{ $ERROR_LEADER_NOT_FOUND } = {
    decoded_metadata_response   => {
        CorrelationId               => $CorrelationId,
        Broker                      => [
            {
                NodeId                  => 2,
                Host                    => $host,
                Port                    => $port + 2,
            },
            {
                NodeId                  => 0,
                Host                    => $host,
                Port                    => $port,
            },
            {
                NodeId                  => 1,
                Host                    => $host,
                Port                    => $port + 1,
            },
        ],
        TopicMetadata               => [
            {
                ErrorCode               => 0,
                TopicName               => $topic,
                PartitionMetadata       => [
                    {
                        ErrorCode           => 0,
                        Partition           => $partition,
# reason for error $ERROR_LEADER_NOT_FOUND
                        Leader              => 3,
                        Replicas            => [
                                                   2,
                                                   0,
                                                   1,
                        ],
                        Isr                 => [
                                                   2,
                                                   0,
                                                   1,
                        ],
                    },
                ],
            },
        ],
    },
};

$data_exchange->{RETRY_ON_ERRORS} = {
    decoded_produce_response   => {
        CorrelationId                           => $CorrelationId,
        topics                                  => [
            {
                TopicName                       => $topic,
                partitions                      => [
                    {
                        Partition               => $partition,
# reason for error 'RETRY_ON_ERRORS'
                        ErrorCode               => 0,
                        Offset                  => 0,
                    },
                ],
            },
        ],
    },
};

our ( $replaced_method, $skip_calls );
sub Kafka_IO_error {
    my $method_name         = shift;
    $skip_calls             = shift;
    my $expected_error_code = shift;
    my $expected_nonfatals  = shift;
    my $decoded_request     = shift;

    my $replaced_method_name = 'Kafka::IO::'.$method_name;
    $replaced_method = \&$replaced_method_name;

    Sub::Install::reinstall_sub(
        {
            code    => sub {
                if ( $main::skip_calls ) {
                    --$main::skip_calls;
                    return $main::replaced_method->( @_ );
                } else {
                    my ( $self ) = @_;
                    $self->_error( $ERROR_MISMATCH_ARGUMENT );  # any exception
                }
            },
            into    => 'Kafka::IO',
            as      => $method_name,
        }
    );

    my $connection = Kafka::Connection->new(
        host                => $host,
        port                => $port,
        CorrelationId       => $CorrelationId,
    );

    is scalar( @{ $connection->nonfatal_errors } ), 0, 'non-fatal errors are not fixed';
    eval { $connection->receive_response_to_request( $decoded_request ); };
    my $result_error = $@;
    isa_ok( $result_error, 'Kafka::Exception::Connection' );
    is $result_error->code, $expected_error_code, 'non-fatal error: '.$ERROR{ $expected_error_code };
    # because connection is available, but you can not send a request for metadata
    is scalar( @{ $connection->nonfatal_errors } ), $expected_nonfatals, "$expected_nonfatals non-fatal errors are fixed";

    Sub::Install::reinstall_sub(
        {
            code    => $replaced_method,
            into    => 'Kafka::IO',
            as      => $method_name,
        }
    );
}

#-- Global data ----------------------------------------------------------------

my ( $connection, $error );

# INSTRUCTIONS -----------------------------------------------------------------

#-- Connecting to the Kafka mocked server port

#-- Connection

$connection = Kafka::Connection->new(
    host                => $host,
    port                => $port,
    CorrelationId       => $CorrelationId,
);

#-- $ERROR_LEADER_NOT_FOUND

Kafka::MockIO::add_special_case(
    {
        $normal_encoded_metadata_request => encode_metadata_response( $data_exchange->{ $ERROR_LEADER_NOT_FOUND }->{decoded_metadata_response} ),
    }
);

is scalar( @{ $connection->nonfatal_errors } ), 0, 'non-fatal errors are not fixed';
eval { $connection->receive_response_to_request( $decoded_produce_request ); };
$error = $@;
isa_ok( $error, 'Kafka::Exception::Connection' );
is $error->code, $ERROR_LEADER_NOT_FOUND, 'non-fatal error: '.$ERROR{ $ERROR_LEADER_NOT_FOUND };
is scalar( @{ $connection->nonfatal_errors } ), $SEND_MAX_RETRIES, 'non-fatal errors are fixed';

is scalar( @{ $connection->clear_nonfatals } ), 0, 'non-fatal errors are not fixed now';
is scalar( @{ $connection->nonfatal_errors } ), 0, 'non-fatal errors are not fixed';

Kafka::MockIO::add_special_case( { $normal_encoded_metadata_request => $normal_encoded_metadata_response, } );

#-- connect IO

Kafka_IO_error(
    'new',              # method causes an error
    # skip connection with the initial preparation of metadata
    1,                  # skip calls
    $ERROR_CANNOT_BIND, # expected error code
    # because connection is not available
    $SEND_MAX_RETRIES,  # expected non-fatal errors
    $decoded_produce_request,
);

#-- send IO

Kafka_IO_error(
    'send',             # method name
    # skip sending the request for the initial preparation of metadata
    1,                  # skip calls
    $ERROR_CANNOT_SEND, # expected error code
    # because connection is available, but you can not send a request for metadata
    1,                  # expected non-fatal errors
    $decoded_offset_request,
);

#-- receive IO

Kafka_IO_error(
    'receive',          # method name
    # skip to receive a response for the initial preparation of metadata (consists of two consecutive readings from the socket)
    2,                  # skip calls
    $ERROR_SEND_NO_ACK, # expected error code
    # because connection is available, but you can not receive a response for metadata
    0,                  # expected non-fatal errors
    $decoded_produce_request,
);

Kafka_IO_error(
    'receive',          # method name
    # skip to receive a response for the initial preparation of metadata (consists of two consecutive readings from the socket)
    2,                  # skip calls
    $ERROR_CANNOT_RECV, # expected error code
    # because connection is available, but you can not receive a response for metadata
    1,                  # expected non-fatal errors
    $decoded_offset_request,
);

#-- %Kafka::Connection::RETRY_ON_ERRORS
my $partition_data = $data_exchange->{RETRY_ON_ERRORS}->{decoded_produce_response}->{topics}->[0]->{partitions}->[0];
foreach my $ErrorCode (
        $ERROR_NO_ERROR,
        $ERROR_UNKNOWN,
        $ERROR_OFFSET_OUT_OF_RANGE,
        $ERROR_INVALID_MESSAGE,
        $ERROR_UNKNOWN_TOPIC_OR_PARTITION,
        $ERROR_INVALID_MESSAGE_SIZE,
        $ERROR_LEADER_NOT_AVAILABLE,
        $ERROR_NOT_LEADER_FOR_PARTITION,
        $ERROR_REQUEST_TIMED_OUT,
        $ERROR_BROKER_NOT_AVAILABLE,
        $ERROR_REPLICA_NOT_AVAILABLE,
        $ERROR_MESSAGE_SIZE_TOO_LARGE,
        $ERROR_STALE_CONTROLLER_EPOCH_CODE,
        $ERROR_OFFSET_METADATA_TOO_LARGE_CODE,
    ) {

    $partition_data->{ErrorCode} = $ErrorCode;
    Kafka::MockIO::add_special_case(
        {
            $normal_encoded_produce_request => encode_produce_response( $data_exchange->{RETRY_ON_ERRORS}->{decoded_produce_response} ),
        }
    );

    $connection = Kafka::Connection->new(
        host                => $host,
        port                => $port,
        CorrelationId       => $CorrelationId,
    );

    is scalar( @{ $connection->nonfatal_errors } ), 0, 'non-fatal errors are not fixed';

    if ( $ErrorCode == $ERROR_NO_ERROR ) {
        lives_ok { $connection->receive_response_to_request( $decoded_produce_request ); } 'expecting to live';
    } else {
        eval { $connection->receive_response_to_request( $decoded_produce_request ); };
        $error = $@;
        isa_ok( $error, 'Kafka::Exception::Connection' );

        if ( exists $RETRY_ON_ERRORS{ $ErrorCode } ) {
            is $error->code, $ErrorCode, 'non-fatal error: '.$ERROR{ $ErrorCode };
            is scalar( @{ $connection->nonfatal_errors } ), $SEND_MAX_RETRIES, 'non-fatal errors are fixed';
        } else {
            is $error->code, $ErrorCode, 'FATAL error: '.$ERROR{ $ErrorCode };
            is scalar( @{ $connection->nonfatal_errors } ), 0, 'non-fatal errors are not fixed';
        }
    }
}

$partition_data->{ErrorCode} = $ERROR_NO_ERROR;
Kafka::MockIO::add_special_case(
    {
        $normal_encoded_produce_request => encode_produce_response( $data_exchange->{RETRY_ON_ERRORS}->{decoded_produce_response} ),
    }
);

#-- Closes and cleans up

$connection->close;

# POSTCONDITIONS ---------------------------------------------------------------

Kafka::MockIO::restore();