The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

#-- Pragmas --------------------------------------------------------------------

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    t/lib
    ../lib
);

# ENVIRONMENT ------------------------------------------------------------------

use Test::More;

#-- verify load the module

BEGIN {
    eval 'use Test::Exception';     ## no critic
    plan skip_all => "because Test::Exception required for testing" if $@;
}

BEGIN {
    eval 'use Test::NoWarnings';    ## no critic
    plan skip_all => 'because Test::NoWarnings required for testing' if $@;
}

plan 'no_plan';

#-- load the modules -----------------------------------------------------------

use Const::Fast;
use Params::Util qw(
    _ARRAY0
    _HASH
);
use Sub::Install;

use Kafka qw(
    $BLOCK_UNTIL_IS_COMMITTED
    $COMPRESSION_GZIP
    $COMPRESSION_NONE
    $COMPRESSION_SNAPPY
    $ERROR_MISMATCH_ARGUMENT
    $NOT_SEND_ANY_RESPONSE
    $REQUEST_TIMEOUT
    $WAIT_WRITTEN_TO_LOCAL_LOG
);
use Kafka::Cluster;
use Kafka::Connection;
use Kafka::MockIO;
use Kafka::Producer;
use Kafka::TestInternals qw(
    @not_empty_string
    @not_isint
    @not_nonnegint
    @not_number
    @not_right_object
    @not_string_array
    $topic
);

#-- setting up facilities ------------------------------------------------------

#-- declarations ---------------------------------------------------------------

# WARNING: must match the settings of your system
const my $KAFKA_BASE_DIR    => $ENV{KAFKA_BASE_DIR};

my ( $port, $connect, $partition, $producer, $response );

sub new_ERROR_MISMATCH_ARGUMENT {
    my ( $field, @bad_values ) = @_;

    foreach my $bad_value ( @bad_values ) {
        undef $producer;
        throws_ok {
            $producer = Kafka::Producer->new(
                Connection      => $connect,
                CorrelationId   => undef,
                ClientId        => 'producer',
                RequiredAcks    => $WAIT_WRITTEN_TO_LOCAL_LOG,
                Timeout         => $REQUEST_TIMEOUT * 1000, # This provides a maximum time (ms) the server can await the receipt of the number of acknowledgements in RequiredAcks
                $field          => $bad_value,
            );
        } 'Kafka::Exception::Producer', 'error thrown';
    }
}

sub send_ERROR_MISMATCH_ARGUMENT {
    my ( $topic, $partition, $messages, $key, $compression_codec ) = @_;

    $producer = Kafka::Producer->new(
        Connection  => $connect,
    );
    undef $response;
    throws_ok {
        $response = $producer->send(
            $topic,
            $partition,
            $messages,
            $key,
            $compression_codec,
        );
    } 'Kafka::Exception', 'error thrown';
}

sub communication_error {
    my ( $module, $name ) = @_;

    my $method_name = "${module}::${name}";
    my $method = \&$method_name;

    $connect = Kafka::Connection->new(
        host        => 'localhost',
        port        => $port,
    );
    $producer = Kafka::Producer->new(
        Connection  => $connect,
    );

    Sub::Install::reinstall_sub( {
        code    => sub {
            my ( $self ) = @_;
            $self->_error( $ERROR_MISMATCH_ARGUMENT );
        },
        into    => $module,
        as      => $name,
    } );

    undef $response;
    throws_ok {
        $response = $producer->send(
            $topic,
            $partition,
            'Single message',
        );
    } 'Kafka::Exception', 'error thrown';

    Sub::Install::reinstall_sub( {
        code    => $method,
        into    => $module,
        as      => $name,
    } );
}

#-- Global data ----------------------------------------------------------------

$partition = $Kafka::MockIO::PARTITION;;

# INSTRUCTIONS -----------------------------------------------------------------

testing();
testing( $KAFKA_BASE_DIR ) if $KAFKA_BASE_DIR;

sub testing {
    my ( $kafka_base_dir ) = @_;

    if ( $kafka_base_dir ) {
        #-- Connecting to the Kafka server port (for example for node_id = 0)
        ( $port ) =  Kafka::Cluster->new( kafka_dir => $KAFKA_BASE_DIR, does_not_start => 1 )->servers;
    } else {
        $port = $Kafka::MockIO::KAFKA_MOCK_SERVER_PORT;
        Kafka::MockIO::override();
    }

#-- Connecting to the Kafka server port

    $connect = Kafka::Connection->new(
        host    => 'localhost',
        port    => $port,
    );

#-- simple start

    $producer = Kafka::Producer->new(
        Connection  => $connect,
    );
    isa_ok( $producer, 'Kafka::Producer' );

    undef $producer;
    ok !$producer, 'producer object is destroyed';

#-- new

# Connection
    new_ERROR_MISMATCH_ARGUMENT( 'Connection', @not_right_object );

# CorrelationId
    new_ERROR_MISMATCH_ARGUMENT( 'CorrelationId', @not_isint );

# ClientId
    new_ERROR_MISMATCH_ARGUMENT( 'ClientId', @not_empty_string );

# RequiredAcks
    new_ERROR_MISMATCH_ARGUMENT( 'RequiredAcks', @not_isint );

# Timeout
    new_ERROR_MISMATCH_ARGUMENT( 'Timeout', @not_number );

#-- send

# topic
    send_ERROR_MISMATCH_ARGUMENT( $_, $partition, 'Some value', 'Some key' )
        foreach @not_empty_string;

# partition
    send_ERROR_MISMATCH_ARGUMENT( $topic, $_, 'Some value', 'Some key' )
        foreach @not_isint;

# messages
    foreach my $bad_message (
            grep( { !_ARRAY0( $_ ) } @not_empty_string ),
            @not_string_array,
        ) {
        send_ERROR_MISMATCH_ARGUMENT( $topic, $partition, $bad_message, 'Some key' );
    }

# key
    send_ERROR_MISMATCH_ARGUMENT( $topic, $partition, 'Some value', $_ )
        foreach @not_empty_string;

# compression_codec
    send_ERROR_MISMATCH_ARGUMENT( $topic, $partition, 'Some value', 'Some key', $_ )
        foreach ( @not_isint, $COMPRESSION_NONE - 1, $COMPRESSION_SNAPPY + 1 );
# Valid values for $compression_codec checked in the test *_consumer.t

#-- ProduceRequest

    for my $mode (
        $NOT_SEND_ANY_RESPONSE,
        $WAIT_WRITTEN_TO_LOCAL_LOG,
        $BLOCK_UNTIL_IS_COMMITTED,
        ) {

        $producer = Kafka::Producer->new(
            Connection      => $connect,
            RequiredAcks    => $mode,
        );
        isa_ok( $producer, 'Kafka::Producer' );

        # Sending a single message
        $response = $producer->send(
            $topic,
            $partition,
            'Single message'            # message
        );
        ok _HASH( $response ), 'response is received';

        # Sending a series of messages
        $response = $producer->send(
            $topic,
            $partition,
            [                           # messages
                'The first message',
                'The second message',
                'The third message',
            ]
        );
        ok _HASH( $response ), 'response is received';
    }

#-- Response to errors in communication modules

# Kafka::IO
    communication_error( 'Kafka::IO', 'send' );

# Kafka::Connection
    communication_error( 'Kafka::Connection', 'receive_response_to_request' );

#-- finish
    Kafka::MockIO::restore()
        unless $kafka_base_dir;
}

# POSTCONDITIONS ---------------------------------------------------------------