The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

#-- Pragmas --------------------------------------------------------------------

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    t/lib
    ../lib
);

# ENVIRONMENT ------------------------------------------------------------------

use Test::More;

#-- verify load the module

BEGIN {
    eval 'use Test::NoWarnings';    ## no critic
    plan skip_all => 'because Test::NoWarnings required for testing' if $@;
}

BEGIN {
    eval 'use Test::Exception';     ## no critic
    plan skip_all => "because Test::Exception required for testing" if $@;
}

plan 'no_plan';

#-- load the modules -----------------------------------------------------------

use Const::Fast;
use Params::Util qw(
    _ARRAY
    _HASH
);
use Sub::Install;

use Kafka qw(
    $COMPRESSION_GZIP
    $COMPRESSION_NONE
    $COMPRESSION_SNAPPY
    $DEFAULT_MAX_BYTES
    $DEFAULT_MAX_NUMBER_OF_OFFSETS
    $DEFAULT_MAX_WAIT_TIME
    $ERROR_MISMATCH_ARGUMENT
    $MESSAGE_SIZE_OVERHEAD
    $MIN_BYTES_RESPOND_IMMEDIATELY
    $RECEIVE_EARLIEST_OFFSETS
    $RECEIVE_LATEST_OFFSET
);
use Kafka::Cluster;
use Kafka::Connection;
use Kafka::Consumer;
use Kafka::MockIO;
use Kafka::Producer;
use Kafka::TestInternals qw(
    @not_empty_string
    @not_isint
    @not_nonnegint
    @not_posint
    @not_right_object
    $topic
);

#-- setting up facilities ------------------------------------------------------

#-- declarations ---------------------------------------------------------------

# WARNING: must match the settings of your system
const my $KAFKA_BASE_DIR    => $ENV{KAFKA_BASE_DIR};

my ( $port, $connect, $partition, $consumer, $offsets, $messages );

sub new_ERROR_MISMATCH_ARGUMENT {
    my ( $field, @bad_values ) = @_;

    foreach my $bad_value ( @bad_values ) {
        undef $consumer;
        throws_ok {
            $consumer = Kafka::Consumer->new(
                Connection          => $connect,
                CorrelationId       => undef,
                ClientId            => undef,
                MaxWaitTime         => $DEFAULT_MAX_WAIT_TIME,
                MinBytes            => $MIN_BYTES_RESPOND_IMMEDIATELY,
                MaxBytes            => $DEFAULT_MAX_BYTES,
                MaxNumberOfOffsets  => $DEFAULT_MAX_NUMBER_OF_OFFSETS,
                $field              => $bad_value,
            );
        } 'Kafka::Exception::Consumer', 'error thrown';
    }
}

sub fetch_ERROR_MISMATCH_ARGUMENT {
    my ( $topic, $partition, $offset, $max_size ) = @_;

    $consumer = Kafka::Consumer->new(
        Connection  => $connect,
    );
    undef $messages;
    throws_ok {
        $messages = $consumer->fetch(
            $topic,
            $partition,
            $offset,
            $max_size,
        );
    } 'Kafka::Exception::Consumer', 'error thrown';
}

sub offsets_ERROR_MISMATCH_ARGUMENT {
    my ( $topic, $partition, $time, $max_number ) = @_;

    $consumer = Kafka::Consumer->new(
        Connection  => $connect,
    );
    undef $offsets;
    throws_ok {
        $messages = $consumer->offsets(
            $topic,
            $partition,
            $time,
            $max_number,
        );
    } 'Kafka::Exception::Consumer', 'error thrown';
}

sub communication_error {
    my ( $module, $name ) = @_;

    my $method_name = "${module}::${name}";
    my $method = \&$method_name;
    $connect = Kafka::Connection->new(
        host        => 'localhost',
        port        => $port,
    );

    Sub::Install::reinstall_sub( {
        code    => sub {
            my ( $self ) = @_;
            $self->_error( $ERROR_MISMATCH_ARGUMENT );
        },
        into    => $module,
        as      => $name,
    } );

# fetch
    $consumer = Kafka::Consumer->new(
        Connection  => $connect,
    );

    undef $messages;
    throws_ok {
        $messages = $consumer->fetch(
            $topic,
            $partition,
            0,
        );
    } 'Kafka::Exception::Connection', 'error thrown';

# offsets
    $consumer = Kafka::Consumer->new(
        Connection  => $connect,
    );

    undef $offsets;
    throws_ok {
        $offsets = $consumer->offsets(
            $topic,
            $partition,
            $RECEIVE_LATEST_OFFSET,
        );
    } 'Kafka::Exception::Connection', 'error thrown';

    Sub::Install::reinstall_sub( {
        code    => $method,
        into    => $module,
        as      => $name,
    } );
}

#-- Global data ----------------------------------------------------------------

$partition = $Kafka::MockIO::PARTITION;;

# INSTRUCTIONS -----------------------------------------------------------------

testing();
testing( $KAFKA_BASE_DIR ) if $KAFKA_BASE_DIR;

sub testing {
    my ( $kafka_base_dir ) = @_;

    if ( $kafka_base_dir ) {
        #-- Connecting to the Kafka server port (for example for node_id = 0)
        ( $port ) =  Kafka::Cluster->new( kafka_dir => $KAFKA_BASE_DIR, does_not_start => 1 )->servers;
    }
    else {
        $port = $Kafka::MockIO::KAFKA_MOCK_SERVER_PORT;
        Kafka::MockIO::override();
    }

#-- Connecting to the Kafka server port

    $connect = Kafka::Connection->new(
        host    => 'localhost',
        port    => $port,
    );

#-- simple start

    $consumer = Kafka::Consumer->new(
        Connection  => $connect,
    );
    isa_ok( $consumer, 'Kafka::Consumer' );

    undef $consumer;
    ok !$consumer, 'consumer object is destroyed';

#-- new

# Connection
    new_ERROR_MISMATCH_ARGUMENT( 'Connection', @not_right_object );

# CorrelationId
    new_ERROR_MISMATCH_ARGUMENT( 'CorrelationId', @not_isint );

# ClientId
    new_ERROR_MISMATCH_ARGUMENT( 'ClientId', @not_empty_string );

# MaxWaitTime
    new_ERROR_MISMATCH_ARGUMENT( 'MaxWaitTime', @not_isint );

# MinBytes
    new_ERROR_MISMATCH_ARGUMENT( 'MinBytes', @not_nonnegint );

# MaxBytes
    new_ERROR_MISMATCH_ARGUMENT( 'MaxBytes', @not_posint, $MESSAGE_SIZE_OVERHEAD - 1 );

# MaxNumberOfOffsets
    new_ERROR_MISMATCH_ARGUMENT( 'MaxNumberOfOffsets', @not_posint );

#-- fetch

# topic
    fetch_ERROR_MISMATCH_ARGUMENT( $_, $partition, 0, $DEFAULT_MAX_BYTES )
        foreach @not_empty_string;

# partition
    fetch_ERROR_MISMATCH_ARGUMENT( $topic, $_, 0, $DEFAULT_MAX_BYTES )
        foreach @not_isint;

# offset
    fetch_ERROR_MISMATCH_ARGUMENT( $topic, $partition, $_, $DEFAULT_MAX_BYTES )
        foreach @not_nonnegint;

# max_size
    fetch_ERROR_MISMATCH_ARGUMENT( $topic, $partition, 0, $_ )
        foreach grep( { defined $_ } @not_nonnegint ), $MESSAGE_SIZE_OVERHEAD - 1;

#-- offsets

# topic
    offsets_ERROR_MISMATCH_ARGUMENT( $_, $partition, $RECEIVE_EARLIEST_OFFSETS, $DEFAULT_MAX_NUMBER_OF_OFFSETS )
        foreach @not_empty_string;

# partition
    offsets_ERROR_MISMATCH_ARGUMENT( $topic, $_, $RECEIVE_EARLIEST_OFFSETS, $DEFAULT_MAX_NUMBER_OF_OFFSETS )
        foreach @not_isint;

# time
    offsets_ERROR_MISMATCH_ARGUMENT( $topic, $partition, $_, $DEFAULT_MAX_NUMBER_OF_OFFSETS )
        foreach @not_isint, -3;

# max_number
    offsets_ERROR_MISMATCH_ARGUMENT( $topic, $partition, $RECEIVE_EARLIEST_OFFSETS, $_ )
        foreach grep( { defined $_ } @not_posint );

#-- Preparing data

    my $producer = Kafka::Producer->new(
        Connection      => $connect,
    );

    foreach my $compression_codec (
            $COMPRESSION_GZIP,
            $COMPRESSION_NONE,
            $COMPRESSION_SNAPPY,
        )
    {
        # Sending a series of messages
        $producer->send(
            $topic,
            $partition,
            [                           # messages
                'The first message',
                'The second message',
                'The third message',
            ],
            undef,
            $compression_codec,
        );
    }

    $consumer = Kafka::Consumer->new(
        Connection  => $connect,
    );
    isa_ok( $consumer, 'Kafka::Consumer' );

#-- OffsetRequest

# According to Apache Kafka documentation:
# The Offset API describes the valid offset range available for a set of topic-partitions.
# The response contains the starting offset of each segment for the requested partition
# as well as the "log end offset" i.e. the offset of the next message that would be appended to the given partition.

    # Get a list of valid offsets up max_number before the given time
    $offsets = $consumer->offsets(
        $topic,
        $partition,
# According to Apache Kafka documentation:
# There are two special values.
# Specify -1 ($RECEIVE_LATEST_OFFSET) to receive the latest offset (this will only ever return one offset).
# Specify -2 ($RECEIVE_EARLIEST_OFFSETS) to receive the earliest available offsets.
#        $RECEIVE_EARLIEST_OFFSETS,      # time
        $RECEIVE_LATEST_OFFSET,         # time
        $DEFAULT_MAX_NUMBER_OF_OFFSETS, # max_number
    );
    ok _ARRAY( $offsets ), 'offsets are received';
    if( $offsets ) {
        foreach my $offset ( @$offsets ) {
            note "Received offset: $offset";
        }
    }

#-- FetchRequest

    # Consuming messages
    $messages = $consumer->fetch(
        $topic,
        $partition,
        0,                  # offset
        $DEFAULT_MAX_BYTES  # Maximum size of MESSAGE(s) to receive
    );
    ok _ARRAY( $messages ), 'messages are received';
    if ( $messages ) {
        foreach my $message ( @$messages ) {
#            note '--------------------';
            if( $message->valid ) {
#                note 'key                : ', $message->key;
#                note 'payload            : ', $message->payload;
#                note 'offset             : ', $message->offset;
#                note 'next_offset        : ', $message->next_offset;
#                note 'HighwaterMarkOffset: ', $message->HighwaterMarkOffset;
            } else {
                diag 'error              : ', $message->error;
            }
        }
    }

#-- Response to errors in communication modules

# Kafka::IO
    communication_error( 'Kafka::IO', 'send' );

# Kafka::Connection
    communication_error( 'Kafka::Connection', 'receive_response_to_request' );

#-- finish
    Kafka::MockIO::restore()
        unless $kafka_base_dir;
}

# POSTCONDITIONS ---------------------------------------------------------------