The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

#-- Pragmas --------------------------------------------------------------------

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    t/lib
    ../lib
);

# ENVIRONMENT ------------------------------------------------------------------

use Test::More;

BEGIN {
    plan skip_all => 'Unknown base directory of Kafka server'
        unless $ENV{KAFKA_BASE_DIR};
}

#-- verify load the module

BEGIN {
    eval 'use Test::Exception';     ## no critic
    plan skip_all => "because Test::Exception required for testing" if $@;
}

BEGIN {
    eval 'use Test::NoWarnings';    ## no critic
    plan skip_all => 'because Test::NoWarnings required for testing' if $@;
}

plan 'no_plan';

#-- load the modules -----------------------------------------------------------

use Config::IniFiles;
use Const::Fast;
use File::Copy;
use FindBin qw(
    $Bin
);
use File::Spec::Functions qw(
    catdir
    catfile
);
use Params::Util qw(
    _ARRAY
    _ARRAY0
    _HASH
);

use Kafka qw(
    $BLOCK_UNTIL_IS_COMMITTED
    $DEFAULT_MAX_BYTES
    $RECEIVE_LATEST_OFFSET
);
use Kafka::Cluster qw(
    $DEFAULT_REPLICATION_FACTOR
);
use Kafka::Connection;
use Kafka::Consumer;
use Kafka::MockIO;
use Kafka::Producer;

#-- setting up facilities ------------------------------------------------------

const my $KAFKA_PROPERTIES_FILE => 'server.properties';

my ( $properties_file, $bak_properties_file );
{
    my $start_dir           = ( substr( $Bin, -1 ) eq 't' ) ? $Bin : catdir( $Bin, 't' );
    my $kafka_config_dir    = catdir( $start_dir, 'config' );
    $properties_file        = catfile( $kafka_config_dir, $KAFKA_PROPERTIES_FILE );
}
$bak_properties_file = $properties_file.'.bak';

unlink $bak_properties_file;
copy( $properties_file, $bak_properties_file );

#-- declarations ---------------------------------------------------------------

# WARNING: must match the settings of your system
const my $KAFKA_BASE_DIR        => $ENV{KAFKA_BASE_DIR};
const my $TOPIC_PATTERN         => 'stranger0';
const my $INI_SECTION           => 'GENERAL';

my ( $cluster, $port, $connection, $topic, $partition, $producer, $response, $consumer, $offsets, $messages );

sub restore_properties {
    unlink $properties_file;
    rename $bak_properties_file, $properties_file;
}

# Setting the server configuration file
sub setup {
    my ( $auto_create_topics_mode ) = @_;

    if ( !( my $cfg = Config::IniFiles->new(
            -file       => $properties_file,
            -fallback   => $INI_SECTION,
        ) ) ) {
        restore_properties();
        my $error = q{};
        map { $error .= "\n$_" } @Config::IniFiles::errors;
        BAIL_OUT "$properties_file error: $error";
    } else {
        $cfg->setval( $INI_SECTION, 'auto.create.topics.enable'     => $auto_create_topics_mode );
        $cfg->setval( $INI_SECTION, 'default.replication.factor'    => $DEFAULT_REPLICATION_FACTOR );
        $cfg->RewriteConfig( $properties_file );
    }
}

sub sending {
    return $producer->send(
        ++$topic,                   # unknown topic
        $partition,
        'Single message'            # message
    );
}

sub getting_offsets {
    return $consumer->offsets(
        ++$topic,
        $partition,
        $RECEIVE_LATEST_OFFSET,         # time
    );
}

sub fetching {
    return $consumer->fetch(
        ++$topic,
        $partition,
        0,                  # offset
        $DEFAULT_MAX_BYTES  # Maximum size of MESSAGE(s) to receive
    );
}

#-- Global data ----------------------------------------------------------------

$partition  = $Kafka::MockIO::PARTITION;;
$topic      = $TOPIC_PATTERN;

# INSTRUCTIONS -----------------------------------------------------------------

for my $auto_create_topics_enable ( 'true', 'false' ) {
    setup( $auto_create_topics_enable );

    $cluster = Kafka::Cluster->new(
        kafka_dir           => $KAFKA_BASE_DIR,
        replication_factor  => $DEFAULT_REPLICATION_FACTOR,
    );

    #-- Connecting to the Kafka server port (for example for node_id = 0)
    ( $port ) =  $cluster->servers;

    for my $AutoCreateTopicsEnable ( 0, 1 ) {
        #-- Connecting to the Kafka server port
        $connection = Kafka::Connection->new(
            host                    => 'localhost',
            port                    => $port,
            AutoCreateTopicsEnable  => $AutoCreateTopicsEnable,
        );
        $producer = Kafka::Producer->new(
            Connection      => $connection,
            # Require verification of the number of messages sent and recorded
            RequiredAcks    => $BLOCK_UNTIL_IS_COMMITTED,
        );
        $consumer = Kafka::Consumer->new(
            Connection  => $connection,
        );

        # Sending a single message
        undef $response;
        if ( $auto_create_topics_enable eq 'true' && $AutoCreateTopicsEnable ) {
            ok $connection->exists_topic_partition( $topic, $partition ), 'existing topic';
            my $next_topic = $topic;
            ++$next_topic;
            ok !$connection->exists_topic_partition( $next_topic, $partition ), 'not yet existing topic';
            lives_ok    { $response = sending() } 'expecting to live';
            ok _HASH( $response ), 'response is received';
            $connection->get_metadata( $topic );
            ok $connection->exists_topic_partition( $next_topic, $partition ), 'autocreated topic';
        } else {
# Kafka BUG "[KAFKA-1124] - Sending to a new topic (with auto.create.topics.enable) returns ERROR "
# (Fixed in Kafka 0.8.2):
            if ( $auto_create_topics_enable ne 'true' ) {
                dies_ok     { $response = sending() } 'expecting to die';
                ok !defined( $response ), 'response is not received';
            }
        }

        # Get a list of valid offsets up max_number before the given time
        undef $offsets;
        if ( $auto_create_topics_enable eq 'true' && $AutoCreateTopicsEnable ) {
            lives_ok    { $offsets = getting_offsets() } 'expecting to live';
            ok _ARRAY( $offsets ), 'offsets are received';
        } else {
# Kafka BUG "[KAFKA-1124] - Sending to a new topic (with auto.create.topics.enable) returns ERROR "
# (Fixed in Kafka 0.8.2):
            if ( $auto_create_topics_enable ne 'true' ) {
                dies_ok     { $offsets = getting_offsets() } 'expecting to die';
                ok !defined( $offsets ), 'offsets are not received';
            }
        }

        # Consuming messages
        undef $messages;
        if ( $auto_create_topics_enable eq 'true' && $AutoCreateTopicsEnable ) {
            lives_ok    { $messages = fetching() } 'expecting to live';
            ok _ARRAY0( $messages ), 'messages are received';
        } else {
# Kafka BUG "[KAFKA-1124] - Sending to a new topic (with auto.create.topics.enable) returns ERROR "
# (Fixed in Kafka 0.8.2):
            if ( $auto_create_topics_enable ne 'true' ) {
                dies_ok     { $messages = fetching() } 'expecting to die';
                ok !defined( $messages ), 'messages are not received';
            }
        }
    }

    $cluster->close;
}

# POSTCONDITIONS ---------------------------------------------------------------

restore_properties();