The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

#-- Pragmas --------------------------------------------------------------------

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    t/lib
    ../lib
);

# ENVIRONMENT ------------------------------------------------------------------

use Test::More;

BEGIN {
    plan skip_all => 'Unknown base directory of Kafka server'
        unless defined $ENV{KAFKA_BASE_DIR};
}

#-- verify load the module

BEGIN {
    eval 'use Test::NoWarnings';    ## no critic
    plan skip_all => 'because Test::NoWarnings required for testing' if $@;
}

plan 'no_plan';

#-- load the modules -----------------------------------------------------------

use Kafka qw(
    $COMPRESSION_GZIP
    $COMPRESSION_NONE
    $COMPRESSION_SNAPPY
    $DEFAULT_MAX_BYTES
    $DEFAULT_MAX_NUMBER_OF_OFFSETS
    $RECEIVE_LATEST_OFFSET
);
use Kafka::Cluster;
use Kafka::Connection;
use Kafka::Consumer;
use Kafka::MockIO;
use Kafka::Producer;
use Kafka::TestInternals qw(
    $topic
);

#-- setting up facilities ------------------------------------------------------

STDOUT->autoflush;

my $cluster = Kafka::Cluster->new(
    kafka_dir           => $ENV{KAFKA_BASE_DIR},    # WARNING: must match the settings of your system
    replication_factor  => 1,
);

#-- declarations ---------------------------------------------------------------

my ( $port, $connect, $partition, $producer, $consumer, $offsets, $messages );

#-- Global data ----------------------------------------------------------------

$partition = $Kafka::MockIO::PARTITION;;

# INSTRUCTIONS -----------------------------------------------------------------

#-- Connecting to the Kafka server port (for example for node_id = 0)
( $port ) =  $cluster->servers;

#-- Connecting to the Kafka server port

$connect = Kafka::Connection->new(
    host    => 'localhost',
    port    => $port,
);

#-- Preparing data

$producer = Kafka::Producer->new(
    Connection      => $connect,
);

$consumer = Kafka::Consumer->new(
    Connection  => $connect,
);

my $messages_to_send;
push @$messages_to_send, "Message #$_" foreach ( 1..3 );

my @compession_codecs = (
    [ $COMPRESSION_NONE,    'NONE' ],
    [ $COMPRESSION_GZIP,    'GZIP' ],
    [ $COMPRESSION_SNAPPY,  'SNAPPY' ],
);

foreach my $codec ( @compession_codecs )
{
    # Sending a series of messages
    $producer->send(
        $topic,
        $partition,
        $messages_to_send,
        undef,
        $codec->[0],
    );
}

# Get a list of valid offsets up max_number before the given time
$offsets = $consumer->offsets(
    $topic,
    $partition,
    $RECEIVE_LATEST_OFFSET,         # time
    $DEFAULT_MAX_NUMBER_OF_OFFSETS, # max_number
);
if( $offsets ) {
    foreach my $offset ( @$offsets ) {
        note "Received offset: $offset";
    }
}

foreach my $return_all ( 0, 1 ) {
    foreach my $start_offset ( $offsets->[1] .. ( $offsets->[0] - 1 ) ) {
        my $compression_codec = int( $start_offset / scalar( @compession_codecs ) );

        # Consuming messages
        $messages = $consumer->fetch(
            $topic,
            $partition,
            $start_offset,
            $DEFAULT_MAX_BYTES, # Maximum size of MESSAGE(s) to receive
            $return_all,
        );
        if ( $messages ) {
            note '--------------------';
            note "Start offset = $start_offset, return_all = $return_all, codec when sending = ".$compession_codecs[ $compression_codec ]->[1];
            foreach my $message ( @$messages ) {
                if( $message->valid ) {
                    note 'consumed offset: ', $message->offset;
                } else {
                    diag 'error          : ', $message->error;
                }

                if ( $message->offset == $start_offset ) {
                    pass 'Starting Message is present';
                } elsif ( $message->offset > $start_offset ) {
                    pass 'additional message is present';
                } else {    # $message->offset < $start_offset
                    if ( $return_all ) {
                        if ( $compression_codec != $COMPRESSION_NONE ) {
                            pass 'returned redundant message';
                        } else {
                            fail 'returned invalid data';
                        }
                    } else {
                        fail 'returned invalid data';
                    }
                }
            }
        }
    }
}

# POSTCONDITIONS ---------------------------------------------------------------

$cluster->close;