The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

#-- Pragmas --------------------------------------------------------------------

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    t/lib
    ../lib
);

# ENVIRONMENT ------------------------------------------------------------------

use Test::More;

BEGIN {
    plan skip_all => 'Unknown base directory of Kafka server'
        unless defined $ENV{KAFKA_BASE_DIR};
}

#-- verify load the module

BEGIN {
    eval 'use Test::NoWarnings';    ## no critic
    plan skip_all => 'because Test::NoWarnings required for testing' if $@;
}

plan 'no_plan';

#-- load the modules -----------------------------------------------------------

use Config::IniFiles;
use Const::Fast;
use Data::Dumper;
use File::Copy;
use FindBin qw(
    $Bin
);
use File::Spec::Functions qw(
    catdir
    catfile
);
use List::Util qw(
    min
    shuffle
);
use POSIX ':sys_wait_h';

use Kafka qw(
    $BLOCK_UNTIL_IS_COMMITTED
    $MESSAGE_SIZE_OVERHEAD
    $RECEIVE_LATEST_OFFSET
);
use Kafka::Cluster;
use Kafka::Connection;
use Kafka::Consumer;
use Kafka::Internals qw(
    $MAX_CORRELATIONID
);
use Kafka::MockIO;
use Kafka::Producer;

#-- setting up facilities ------------------------------------------------------

STDOUT->autoflush;

const my $KAFKA_PROPERTIES_FILE => 'server.properties';

my ( $properties_file, $bak_properties_file );
{
    my $start_dir           = ( substr( $Bin, -1 ) eq 't' ) ? $Bin : catdir( $Bin, 't' );
    my $kafka_config_dir    = catdir( $start_dir, 'config' );
    $properties_file        = catfile( $kafka_config_dir, $KAFKA_PROPERTIES_FILE );
}
$bak_properties_file = $properties_file.'.bak';

unlink $bak_properties_file;
copy( $properties_file, $bak_properties_file );

#-- declarations ---------------------------------------------------------------

# Restrictions:
# All pairs clients work with a common topic
# Each pair of clients (Producer + Consumer) works with a separate partition
# All messages have the same length

const my $PAIRS_CLIENTS         => 10;
const my $MAX_DATA              => 20 * 1024 *1024; # MBs
const my $MAX_DATA_RECORS       => 1_000_000;
const my $MSG_LEN               => int( $MAX_DATA / $MAX_DATA_RECORS );
const my $MAX_MSGS_SENT         => min( 10_000, int( $MAX_CORRELATIONID / ( $MSG_LEN + $MESSAGE_SIZE_OVERHEAD ) ) ),
const my $MAX_MSGS_RECV         => min( 10_000, int( $MAX_CORRELATIONID / ( $MSG_LEN + $MESSAGE_SIZE_OVERHEAD ) ) ),
const my $CLIENT_MSGS           => int( $MAX_DATA_RECORS / $PAIRS_CLIENTS );
const my $SECS_TO_WAIT          => 300;

const my $AUTO_CREATE_TOPICS    => 'true';
const my $REPLICATION_FACTOR    => 3;

const my $KAFKA_BASE_DIR        => $ENV{KAFKA_BASE_DIR};    # WARNING: must match the settings of your system
const my $TOPIC                 => $Kafka::MockIO::TOPIC;
const my $HOST                  => $Kafka::Cluster::HOST;

const my $INI_SECTION           => 'GENERAL';

my ( @msg_pool, $cluster, $port, @clients, $pid, $ppid, $connection, $producer, $consumer, $is_ready );

$SIG{USR1} = sub { ++$is_ready };

sub create_msg_pool {
    my @chars               = ( " ", "A" .. "Z", "a" .. "z", 0 .. 9, qw(! @ $ % ^ & *) );

    note 'generation of messages can take a while';
    $msg_pool[ $MAX_MSGS_SENT - 1 ] = undef;
    foreach my $i ( 0 .. ( $MAX_MSGS_SENT - 1 ) ) {
        $msg_pool[ $i ] = join( q{}, @chars[ map { rand( scalar( @chars ) ) } ( 1 .. $MSG_LEN ) ] );
    }
    note 'generation of messages complited';
}

sub setup {
    if ( !( my $cfg = Config::IniFiles->new(
            -file       => $properties_file,
            -fallback   => $INI_SECTION,
        ) ) ) {
        restore_properties();
        my $error = q{};
        map { $error .= "\n$_" } @Config::IniFiles::errors;
        BAIL_OUT "$properties_file error: $error";
    }
    else {
        $cfg->setval( $INI_SECTION, 'auto.create.topics.enable'     => $AUTO_CREATE_TOPICS );
        $cfg->setval( $INI_SECTION, 'default.replication.factor'    => $REPLICATION_FACTOR );
        $cfg->RewriteConfig( $properties_file );
    }
}

sub restore_properties {
    unlink $properties_file;
    rename $bak_properties_file, $properties_file;
}

sub sending {
    my ( $partition ) = @_;

    my $msgs_to_send    = int( rand( $MAX_MSGS_SENT ) + 1 );
    my $first_msg_idx   = int( rand( $MAX_MSGS_SENT - $msgs_to_send ) );
    my $last_msg_idx    = $first_msg_idx + $msgs_to_send - 1;
    my $messages        = [ @msg_pool[ $first_msg_idx .. $last_msg_idx ] ];

    eval {
        $producer->send(
            $TOPIC,
            $partition,
            $messages,
        );
    };
    if ( $@ ) {
        fail "'send' FATAL error: $@";
        return 0;
    } else {
        return $msgs_to_send;
    }
}

sub next_offset {
    my ( $partition ) = @_;

    my $offsets;
    eval {
        $offsets = $consumer->offsets(
            $TOPIC,
            $partition,
            $RECEIVE_LATEST_OFFSET,             # time
        );
    };
    if ( $@ ) {
        fail "'offsets' FATAL error: $@";
        return;
    } else {
        if ( $offsets ) {
            return $offsets->[0];
        }
        if ( !$offsets ) {
            fail 'offsets are not received';
            return;
        }
    }
}

sub fetching {
    my ( $partition ) = @_;

    my $next_offset = next_offset( $partition );
    return 0 unless $next_offset;

    my $msgs_to_receive     = min( $next_offset, int( rand( $MAX_MSGS_RECV ) + 1 ) );
    my $start_offset        = int( rand( $next_offset - $msgs_to_receive ) );
    my $bytes_to_receive    = $msgs_to_receive * ( $MSG_LEN + $MESSAGE_SIZE_OVERHEAD );

    eval {
        $consumer->fetch(
            $TOPIC,
            $partition,
            $start_offset,
            $bytes_to_receive,  # Maximum size of MESSAGE(s) to receive
        );
    };
    if ( $@ ) {
        fail "'fetch' FATAL error: $@";
        return;
    } else {
        return $msgs_to_receive;
    }
}

sub is_died {
    my ( $pid ) = @_;

    my $rc = waitpid( $pid, &WNOHANG );
    return( ( $rc == 0 ) ? 0 : 1 );
}

sub wait_child_ready {
    my $secs_to_wait = $SECS_TO_WAIT;

    my $all_alive;
    my $alive_pid;
    while ( $secs_to_wait-- ) {
        $all_alive = 1;
        foreach my $pid ( @clients ) {
            if ( is_died( $pid ) ) {
                $all_alive = 0;
                $alive_pid = $pid;
                last;
            }
        }
        last if $all_alive;
        sleep 1;
    }
    unless ( $all_alive ) {
        kill 'KILL' => @clients;
        fail "too long a wait for ready $alive_pid, waitpid = ".waitpid( $alive_pid, &WNOHANG );
    }

    return $all_alive;
}

sub wait_parent_ready {
    my $secs_to_wait = $SECS_TO_WAIT;
    my $ok = 1;
    until ( $is_ready ) {
        unless ( $secs_to_wait-- ) {
            fail 'too long a wait for a parent';
            $ok = 0;
            last;
        }
        sleep 1;
    }
    return $ok;
}

sub wait_child_success {
    my $secs_to_wait = $SECS_TO_WAIT;
    my $all_died;
    my $alive_pid;
    while ( $secs_to_wait-- ) {
        $all_died = 1;
        foreach my $pid ( @clients ) {
            unless ( is_died( $pid ) ) {
                $all_died = 0;
                $alive_pid = $pid;
                pass 'There are clients working';
                last;
            }
        }
        last if $all_died;
        sleep 1;
    }
    unless ( $all_died ) {
        kill 'KILL' => @clients;
        fail "too long a wait for success $alive_pid, waitpid = ".waitpid( $alive_pid, &WNOHANG );
    }
    return $all_died;
}

sub note_nonfatals {
    my $arr_ref = $connection->nonfatal_errors();

    if ( @$arr_ref ) {
        note "Non-fatal errors:";
    }

    foreach my $txt ( @$arr_ref ) {
        note "\t$txt";
    }
}

sub create_client {
    my ( $client_type, $partition ) = @_;

    if ( $pid = fork ) {                # herein the parent
        return $pid;
    } elsif ( defined $pid ) {          # herein the child process
        $is_ready = 0;
        $ppid = getppid();
        note "Started '$client_type', pid = $$, partition = $partition";

        $connection = Kafka::Connection->new(
            host                    => $HOST,
            port                    => $port,
            AutoCreateTopicsEnable  => 1,
        );

        if ( $client_type eq 'producer' ) {
            $producer = Kafka::Producer->new(
                Connection  => $connection,
            );
        } elsif ( $client_type eq 'consumer' ) {
            $consumer = Kafka::Consumer->new(
                Connection  => $connection,
            );
        } else {
            # nothing to do now
        }

        my $count = 0;
        if ( wait_parent_ready() ) {
            if ( $client_type eq 'producer' ) {
                while ( $count < $CLIENT_MSGS ) {
                    $count += sending( $partition );
                }
            } elsif ( $client_type eq 'consumer' ) {
                while ( $count < $CLIENT_MSGS ) {
                    $count += fetching( $partition );
                }
            } else {
                # nothing to do now
            }
        }

        note "Finished '$client_type', pid = $$, partition = $partition";
        note_nonfatals();

        exit;
    } else {
        fail 'An unexpected error (fork 1)';
    }
}

#-- Global data ----------------------------------------------------------------

# INSTRUCTIONS -----------------------------------------------------------------

create_msg_pool();

setup();

#-- FORK competition

$cluster = Kafka::Cluster->new(
    kafka_dir           => $KAFKA_BASE_DIR,
    replication_factor  => $REPLICATION_FACTOR,
    partition           => $PAIRS_CLIENTS,
);
isa_ok( $cluster, 'Kafka::Cluster' );

#-- Connecting to the Kafka server port (for example for node_id = 0)
( $port )   =  $cluster->servers;

foreach my $i ( 1 .. $PAIRS_CLIENTS ) {
    foreach my $client_type ( 'producer', 'consumer' ) {
        my $pid = create_client( $client_type, $i - 1 );
        ok $pid, "client started: $pid";
        push @clients, $pid;    # partitions are 0 based
    }
}

if ( wait_child_ready() ) {
    sleep 1;
    kill 'USR1' => @clients;
}
wait_child_success();

$cluster->close;

#-- Inside a process competition

$cluster = Kafka::Cluster->new(
    kafka_dir           => $KAFKA_BASE_DIR,
    replication_factor  => $REPLICATION_FACTOR,
    partition           => $PAIRS_CLIENTS,
);
isa_ok( $cluster, 'Kafka::Cluster' );
#-- Connecting to the Kafka server port (for example for node_id = 0)
( $port )   =  $cluster->servers;

$connection = Kafka::Connection->new(
    host                    => $HOST,
    port                    => $port,
    AutoCreateTopicsEnable  => 1,
);
$producer = Kafka::Producer->new(
    Connection      => $connection,
    # For sure the next verifying the number of messages sent and recorded
    RequiredAcks    => $BLOCK_UNTIL_IS_COMMITTED,
);
$consumer = Kafka::Consumer->new(
    Connection      => $connection,
);

@clients = ();

foreach my $i ( 1 .. $PAIRS_CLIENTS ) {
    foreach my $client_type ( 'producer', 'consumer' ) {
        push @clients,
            {
                client_type => $client_type,
                partition   => $i - 1,          # partitions are 0 based
                count       => 0,
            }
        ;
    }
}

@clients = shuffle @clients;

my $tm = time;
my $nonfatals = 0;
while ( scalar @clients ) {
    my $i           = int( rand scalar @clients );
    my $client_type = $clients[ $i ]->{client_type};
    my $partition   = $clients[ $i ]->{partition};
    my $count       = $clients[ $i ]->{count};

    if ( $count == 0 ) {
        note "Started '$client_type', partition = $partition";
    }

    if ( $client_type eq 'producer' ) {
        $clients[ $i ]->{count} += sending( $partition );
    } else {    # consumer
        $clients[ $i ]->{count} += fetching( $partition );
    }
    $count = $clients[ $i ]->{count};

    if ( ( my $new_nonfatals = $connection->nonfatal_errors() ) != $nonfatals ) {
        note "New non-fatal errors fixed: '$client_type', partition = $partition";
        note_nonfatals();
        $nonfatals = $new_nonfatals;
    }

    if ( $count > $CLIENT_MSGS ) {
        note "Finished '$client_type', partition = $partition";

        #-- total
        if ( $client_type eq 'producer' ) {
            is $count, next_offset( $partition ), "total number of recorded messages coincides with the number of messages sent ($count)";
        }

        splice( @clients, $i, 1 );
    }

    if ( $tm < time ) {
        pass 'There are clients working';
        $tm = time;
    }
}
note_nonfatals();

$cluster->close;

# POSTCONDITIONS ---------------------------------------------------------------

restore_properties();