The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    t/lib
    ../lib
);

use Test::More;

BEGIN {
    eval 'use Test::Exception';     ## no critic
    plan skip_all => "because Test::Exception required for testing" if $@;
}

BEGIN {
    eval 'use Test::NoWarnings';    ## no critic
    plan skip_all => 'because Test::NoWarnings required for testing' if $@;
}

plan 'no_plan';

use Const::Fast;
use Params::Util qw(
    _ARRAY
    _HASH
);
use Sub::Install;

use Kafka qw(
    $COMPRESSION_GZIP
    $COMPRESSION_NONE
    $COMPRESSION_SNAPPY
    $DEFAULT_MAX_BYTES
    $DEFAULT_MAX_NUMBER_OF_OFFSETS
    $DEFAULT_MAX_WAIT_TIME
    $ERROR_CANNOT_RECV
    $MESSAGE_SIZE_OVERHEAD
    $MIN_BYTES_RESPOND_IMMEDIATELY
    $RECEIVE_EARLIEST_OFFSET
    $RECEIVE_LATEST_OFFSETS
    $RETRY_BACKOFF
);
use Kafka::Cluster;
use Kafka::Connection;
use Kafka::Consumer;
use Kafka::MockIO;
use Kafka::Producer;
use Kafka::TestInternals qw(
    @not_empty_string
    @not_isint
    @not_nonnegint
    @not_posint
    @not_posnumber
    @not_right_object
    $topic
);



STDOUT->autoflush;

# WARNING: must match the settings of your system
const my $KAFKA_BASE_DIR => $ENV{KAFKA_BASE_DIR};

my ( $port, $connect, $partition, $consumer, $offsets, $messages );

sub new_ERROR_MISMATCH_ARGUMENT {
    my ( $field, @bad_values ) = @_;

    foreach my $bad_value ( @bad_values ) {
        undef $consumer;
        throws_ok {
            $consumer = Kafka::Consumer->new(
                Connection          => $connect,
                ClientId            => undef,
                MaxWaitTime         => $DEFAULT_MAX_WAIT_TIME,
                MinBytes            => $MIN_BYTES_RESPOND_IMMEDIATELY,
                MaxBytes            => $DEFAULT_MAX_BYTES,
                MaxNumberOfOffsets  => $DEFAULT_MAX_NUMBER_OF_OFFSETS,
                $field              => $bad_value,
            );
        } 'Kafka::Exception::Consumer', 'error thrown';
    }
}

sub fetch_ERROR_MISMATCH_ARGUMENT {
    my ( $topic, $partition, $offset, $max_size ) = @_;

    $consumer = Kafka::Consumer->new(
        Connection  => $connect,
    );
    undef $messages;
    throws_ok {
        $messages = $consumer->fetch(
            $topic,
            $partition,
            $offset,
            $max_size,
        );
    } 'Kafka::Exception::Consumer', 'error thrown';
}

sub offsets_ERROR_MISMATCH_ARGUMENT {
    my ( $topic, $partition, $time, $max_number ) = @_;

    $consumer = Kafka::Consumer->new(
        Connection  => $connect,
    );
    undef $offsets;
    throws_ok {
        $messages = $consumer->offsets(
            $topic,
            $partition,
            $time,
            $max_number,
        );
    } 'Kafka::Exception::Consumer', 'error thrown';
}

sub communication_error {
    my ( $module, $name ) = @_;

    my $method_name = "${module}::${name}";
    my $method = \&$method_name;
    $connect = Kafka::Connection->new(
        host            => 'localhost',
        port            => $port,
        RETRY_BACKOFF   => $RETRY_BACKOFF * 2,
    );

    Sub::Install::reinstall_sub( {
        code    => sub {
            my ( $self ) = @_;
            $self->_error( $ERROR_CANNOT_RECV );
        },
        into    => $module,
        as      => $name,
    } );

    # fetch
    $consumer = Kafka::Consumer->new(
        Connection  => $connect,
    );

    undef $messages;
    throws_ok {
        $messages = $consumer->fetch(
            $topic,
            $partition,
            0,
        );
    } 'Kafka::Exception::Connection', 'error thrown';

    # offsets
    $consumer = Kafka::Consumer->new(
        Connection  => $connect,
    );

    undef $offsets;
    throws_ok {
        $offsets = $consumer->offsets(
            $topic,
            $partition,
            $RECEIVE_LATEST_OFFSETS,
        );
    } 'Kafka::Exception::Connection', 'error thrown';

    Sub::Install::reinstall_sub( {
        code    => $method,
        into    => $module,
        as      => $name,
    } );
}

$partition = $Kafka::MockIO::PARTITION;;

testing();
testing( $KAFKA_BASE_DIR ) if $KAFKA_BASE_DIR;

sub testing {
    my ( $kafka_base_dir ) = @_;

    my $no_api_versions = 0;

    if ( $kafka_base_dir ) {
        #-- Connecting to the Kafka server port (for example for node_id = 0)
        ( $port ) =  Kafka::Cluster->new( kafka_dir => $KAFKA_BASE_DIR, reuse_existing => 1 )->servers;
    } else {
        $port = $Kafka::MockIO::KAFKA_MOCK_SERVER_PORT;
        Kafka::MockIO::override();
        $no_api_versions = 1; # no API versions support in Mock protocol
    }

    $connect = Kafka::Connection->new(
        host            => 'localhost',
        port            => $port,
        RETRY_BACKOFF   => $RETRY_BACKOFF * 2,
        dont_load_supported_api_versions => $no_api_versions,
    );

    #-- simple start

    $consumer = Kafka::Consumer->new(
        Connection  => $connect,
    );
    isa_ok( $consumer, 'Kafka::Consumer' );

    undef $consumer;
    ok !$consumer, 'consumer object is destroyed';

    #-- new

    new_ERROR_MISMATCH_ARGUMENT( 'Connection', @not_right_object );
    new_ERROR_MISMATCH_ARGUMENT( 'ClientId', @not_empty_string );
    new_ERROR_MISMATCH_ARGUMENT( 'MaxWaitTime', @not_posnumber );
    new_ERROR_MISMATCH_ARGUMENT( 'MinBytes', @not_nonnegint );
    new_ERROR_MISMATCH_ARGUMENT( 'MaxBytes', @not_posint, $MESSAGE_SIZE_OVERHEAD - 1 );
    new_ERROR_MISMATCH_ARGUMENT( 'MaxNumberOfOffsets', @not_posint );

    #-- fetch

    fetch_ERROR_MISMATCH_ARGUMENT( $_, $partition, 0, $DEFAULT_MAX_BYTES )
        foreach @not_empty_string;

    fetch_ERROR_MISMATCH_ARGUMENT( $topic, $_, 0, $DEFAULT_MAX_BYTES )
        foreach @not_isint;

    fetch_ERROR_MISMATCH_ARGUMENT( $topic, $partition, $_, $DEFAULT_MAX_BYTES )
        foreach @not_nonnegint;

    fetch_ERROR_MISMATCH_ARGUMENT( $topic, $partition, 0, $_ )
        foreach grep( { defined $_ } @not_nonnegint ), $MESSAGE_SIZE_OVERHEAD - 1;

    #-- offsets

    offsets_ERROR_MISMATCH_ARGUMENT( $_, $partition, $RECEIVE_EARLIEST_OFFSET, $DEFAULT_MAX_NUMBER_OF_OFFSETS )
        foreach @not_empty_string;

    offsets_ERROR_MISMATCH_ARGUMENT( $topic, $_, $RECEIVE_EARLIEST_OFFSET, $DEFAULT_MAX_NUMBER_OF_OFFSETS )
        foreach @not_isint;

    offsets_ERROR_MISMATCH_ARGUMENT( $topic, $partition, $_, $DEFAULT_MAX_NUMBER_OF_OFFSETS )
        foreach @not_isint, -3;

    offsets_ERROR_MISMATCH_ARGUMENT( $topic, $partition, $RECEIVE_EARLIEST_OFFSET, $_ )
        foreach grep( { defined $_ } @not_posint );

    #-- Preparing data

    my $producer = Kafka::Producer->new(
        Connection      => $connect,
    );

    foreach my $compression_codec (
            $COMPRESSION_GZIP,
            $COMPRESSION_NONE,
            $COMPRESSION_SNAPPY,
        )
    {
        # Sending a series of messages
        $producer->send(
            $topic,
            $partition,
            [                           # messages
# WARN: Next commented messages lead to error under kafka 0.10.0.0 with $COMPRESSION_SNAPPY
# also look at https://issues.apache.org/jira/browse/KAFKA-3789
#                'The first message',
#                'The second message',
#                'The third message',
                'Message #1',
                'Message #2',
                'Message #3',
            ],
            undef,
            $compression_codec,
        );
    }

    $consumer = Kafka::Consumer->new(
        Connection  => $connect,
    );
    isa_ok( $consumer, 'Kafka::Consumer' );

    #-- OffsetRequest

# According to Apache Kafka documentation:
# The Offset API describes the valid offset range available for a set of topic-partitions.
# The response contains the starting offset of each segment for the requested partition
# as well as the "log end offset" i.e. the offset of the next message that would be appended to the given partition.

    # Get a list of valid offsets up max_number before the given time
    $offsets = $consumer->offsets(
        $topic,
        $partition,
# According to Apache Kafka documentation:
# There are two special values.
# Specify -1 ($RECEIVE_LATEST_OFFSETS) to receive the latest offset (this will only ever return one offset).
# Specify -2 ($RECEIVE_EARLIEST_OFFSET) to receive the earliest available offsets.
#        $RECEIVE_EARLIEST_OFFSET,      # time
        $RECEIVE_LATEST_OFFSETS,         # time
        $DEFAULT_MAX_NUMBER_OF_OFFSETS, # max_number
    );
    ok _ARRAY( $offsets ), 'offsets are received';
    if ( $offsets ) {
        foreach my $offset ( @$offsets ) {
            note "Received offset: $offset";
        }
    }

    #-- FetchRequest

    # Consuming messages
    $messages = $consumer->fetch(
        $topic,
        $partition,
        0,                  # offset
        $DEFAULT_MAX_BYTES  # Maximum size of MESSAGE(s) to receive
    );
    ok _ARRAY( $messages ), 'messages are received';
    if ( $messages ) {
        foreach my $message ( @$messages ) {
#            note '--------------------';
            if ( $message->valid ) {
#                note 'key                : ', $message->key;
#                note 'payload            : ', $message->payload;
#                note 'offset             : ', $message->offset;
#                note 'next_offset        : ', $message->next_offset;
#                note 'HighwaterMarkOffset: ', $message->HighwaterMarkOffset;
            } else {
                diag 'error              : ', $message->error;
            }
        }
    }

    #-- Response to errors in communication modules

    communication_error( 'Kafka::IO', 'send' );

    communication_error( 'Kafka::Connection', 'receive_response_to_request' );

    Kafka::MockIO::restore()
        unless $kafka_base_dir;
}