The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    t/lib
    ../lib
);

use Test::More;

BEGIN {
    plan skip_all => 'Unknown base directory of Kafka server'
        unless $ENV{KAFKA_BASE_DIR};
}

BEGIN {
    eval 'use Test::NoWarnings';    ## no critic
    plan skip_all => 'because Test::NoWarnings required for testing' if $@;
}

plan 'no_plan';

use Const::Fast;
use Data::Dumper;
use List::Util qw(
    min
    shuffle
);
use POSIX ':sys_wait_h';
use Time::HiRes ();
use Try::Tiny;

use Kafka qw(
    $BLOCK_UNTIL_IS_COMMITTED
    $MESSAGE_SIZE_OVERHEAD
    $RECEIVE_LATEST_OFFSETS
    $RETRY_BACKOFF
);
use Kafka::Cluster;
use Kafka::Connection;
use Kafka::Consumer;
use Kafka::Internals qw(
    $MAX_CORRELATIONID
);
use Kafka::MockIO;
use Kafka::Producer;

STDOUT->autoflush;

# Restrictions:
# All pairs clients work with a common topic
# Each pair of clients (Producer + Consumer) works with a separate partition
# All messages have the same length

const my $PAIRS_CLIENTS         => 10;
const my $MAX_DATA              => 20 * 1024 *1024; # MBs
const my $MAX_DATA_RECORS       => 1_000_000;
const my $MSG_LEN               => int( $MAX_DATA / $MAX_DATA_RECORS );
const my $MAX_MSGS_SENT         => min( 10_000, int( $MAX_CORRELATIONID / ( $MSG_LEN + $MESSAGE_SIZE_OVERHEAD ) ) ),
const my $MAX_MSGS_RECV         => min( 10_000, int( $MAX_CORRELATIONID / ( $MSG_LEN + $MESSAGE_SIZE_OVERHEAD ) ) ),
const my $CLIENT_MSGS           => int( $MAX_DATA_RECORS / $PAIRS_CLIENTS );
const my $SECS_TO_WAIT          => 600;

const my $AUTO_CREATE_TOPICS    => 'true';
const my $REPLICATION_FACTOR    => 3;

const my $KAFKA_BASE_DIR        => $ENV{KAFKA_BASE_DIR};    # WARNING: must match the settings of your system
const my $TOPIC                 => $Kafka::MockIO::TOPIC;
const my $HOST                  => 'localhost';

my ( @msg_pool, $cluster, $port, @clients, $pid, $ppid, $connection, $producer, $consumer, $is_ready );

$SIG{USR1} = sub { ++$is_ready };

sub create_msg_pool {
    my @chars               = ( " ", "A" .. "Z", "a" .. "z", 0 .. 9, qw(! @ $ % ^ & *) );

    note 'generation of messages can take a while';
    $msg_pool[ $MAX_MSGS_SENT - 1 ] = undef;
    foreach my $i ( 0 .. ( $MAX_MSGS_SENT - 1 ) ) {
        $msg_pool[ $i ] = join( q{}, @chars[ map { rand( scalar( @chars ) ) } ( 1 .. $MSG_LEN ) ] );
    }
    note 'generation of messages complited';
}

sub sending {
    my ( $partition ) = @_;

    my $msgs_to_send    = int( rand( $MAX_MSGS_SENT ) + 1 );
    my $first_msg_idx   = int( rand( $MAX_MSGS_SENT - $msgs_to_send ) );
    my $last_msg_idx    = $first_msg_idx + $msgs_to_send - 1;
    my $messages        = [ @msg_pool[ $first_msg_idx .. $last_msg_idx ] ];

    eval {
        $producer->send(
            $TOPIC,
            $partition,
            $messages,
        );
    };
    if ( $@ ) {
        diag "'send' FATAL error: $@";
        return 0;
    } else {
        return $msgs_to_send;
    }
}

sub next_offset {
    my ( $partition ) = @_;

    my $offsets;
    eval {
        $offsets = $consumer->offsets(
            $TOPIC,
            $partition,
            $RECEIVE_LATEST_OFFSETS,             # time
        );
    };
    if ( $@ ) {
        fail "'offsets' FATAL error: $@";
        return;
    } else {
        if ( $offsets ) {
            return $offsets->[0];
        }
        if ( !$offsets ) {
            fail 'offsets are not received';
            return;
        }
    }
}

sub fetching {
    my ( $partition ) = @_;

    my $next_offset = next_offset( $partition );
    return 0 unless $next_offset;

    my $msgs_to_receive     = min( $next_offset, int( rand( $MAX_MSGS_RECV ) + 1 ) );
    my $start_offset        = int( rand( $next_offset - $msgs_to_receive ) );
    my $bytes_to_receive    = $msgs_to_receive * ( $MSG_LEN + $MESSAGE_SIZE_OVERHEAD );

    Time::HiRes::sleep( 1.5 );
    my $error;
    try {
        $consumer->fetch(
            $TOPIC,
            $partition,
            $start_offset,
            $bytes_to_receive,  # Maximum size of MESSAGE(s) to receive
        );
    } catch {
        $error = $_;
    };
    if ( $error ) {
        fail "'fetch' FATAL error: $error";
        return;
    } else {
        return $msgs_to_receive;
    }
}

sub is_died {
    my ( $pid ) = @_;

    my $rc = waitpid( $pid, &WNOHANG );
    return( ( $rc == 0 ) ? 0 : 1 );
}

sub wait_child_ready {
    my $secs_to_wait = $SECS_TO_WAIT;

    my $all_alive;
    my $alive_pid;
    while ( $secs_to_wait-- ) {
        $all_alive = 1;
        foreach my $pid ( @clients ) {
            if ( is_died( $pid ) ) {
                $all_alive = 0;
                $alive_pid = $pid;
                last;
            }
        }
        last if $all_alive;
        sleep 1;
    }
    unless ( $all_alive ) {
        kill 'KILL' => @clients;
        fail "too long a wait for ready $alive_pid, waitpid = ".waitpid( $alive_pid, &WNOHANG );
    }

    return $all_alive;
}

sub wait_parent_ready {
    my $secs_to_wait = $SECS_TO_WAIT;
    my $ok = 1;
    until ( $is_ready ) {
        unless ( $secs_to_wait-- ) {
            fail 'too long a wait for a parent';
            $ok = 0;
            last;
        }
        sleep 1;
    }
    return $ok;
}

sub wait_child_success {
    my $secs_to_wait = $SECS_TO_WAIT;
    my $all_died;
    my $alive_pid;
    while ( $secs_to_wait-- ) {
        $all_died = 1;
        foreach my $pid ( @clients ) {
            unless ( is_died( $pid ) ) {
                $all_died = 0;
                $alive_pid = $pid;
                pass 'There are clients working';
                last;
            }
        }
        last if $all_died;
        sleep 1;
    }
    unless ( $all_died ) {
        kill 'KILL' => @clients;
        fail "too long a wait for success $alive_pid, waitpid = ".waitpid( $alive_pid, &WNOHANG );
    }
    return $all_died;
}

sub note_nonfatals {
    my $arr_ref = $connection->nonfatal_errors();

    if ( @$arr_ref ) {
        note "Non-fatal errors:";
    }

    foreach my $txt ( @$arr_ref ) {
        note "\t$txt";
    }
}

sub create_client {
    my ( $client_type, $partition ) = @_;

    if ( $pid = fork ) {                # herein the parent
        return $pid;
    } elsif ( defined $pid ) {          # herein the child process
        $is_ready = 0;
        $ppid = getppid();
        note "Started '$client_type', pid = $$, partition = $partition";

        $connection = Kafka::Connection->new(
            host                    => $HOST,
            port                    => $port,
            timeout                 => 30,
            AutoCreateTopicsEnable  => 1,
            RETRY_BACKOFF           => $RETRY_BACKOFF * 2,
            dont_load_supported_api_versions => 1,
        );

        if ( $client_type eq 'producer' ) {
            $producer = Kafka::Producer->new(
                Connection  => $connection,
                # Require verification the number of messages sent and recorded
                RequiredAcks    => $BLOCK_UNTIL_IS_COMMITTED,
                Timeout         => 30,
            );
        } elsif ( $client_type eq 'consumer' ) {
            $consumer = Kafka::Consumer->new(
                Connection  => $connection,
                MaxWaitTime => 30,
            );
        } else {
            # nothing to do now
        }

        my $count = 0;
        if ( wait_parent_ready() ) {
            if ( $client_type eq 'producer' ) {
                while ( $count < $CLIENT_MSGS ) {
                    $count += sending( $partition );
                }
            } elsif ( $client_type eq 'consumer' ) {
                while ( $count < $CLIENT_MSGS ) {
                    $count += fetching( $partition );
                }
            } else {
                # nothing to do now
            }
        }

        note "Finished '$client_type', pid = $$, partition = $partition";
        note_nonfatals();

        exit;
    } else {
        fail 'An unexpected error (fork 1)';
    }
}

create_msg_pool();

ok defined( Kafka::Cluster::data_cleanup() ), 'data directory cleaned';

$cluster = Kafka::Cluster->new(
    replication_factor  => $REPLICATION_FACTOR,
    partition           => $PAIRS_CLIENTS,
    properties          => {
        'auto.create.topics.enable' => $AUTO_CREATE_TOPICS,
    },
);
isa_ok( $cluster, 'Kafka::Cluster' );

#-- Connecting to the Kafka server port (for example for node_id = 0)
( $port ) = $cluster->servers;

foreach my $i ( 1 .. $PAIRS_CLIENTS ) {
    foreach my $client_type ( 'producer', 'consumer' ) {
        my $pid = create_client( $client_type, $i - 1 );
        ok $pid, "client started: $pid";
        push @clients, $pid;    # partitions are 0 based
    }
}

if ( wait_child_ready() ) {
    sleep 1;
    kill 'USR1' => @clients;
}
wait_child_success();

$cluster->close;

ok defined( Kafka::Cluster::data_cleanup() ), 'data directory cleaned';

#-- Inside a process competition

$cluster = Kafka::Cluster->new(
    replication_factor  => $REPLICATION_FACTOR,
    partition           => $PAIRS_CLIENTS,
    properties          => {
        'auto.create.topics.enable' => $AUTO_CREATE_TOPICS,
    },
);
isa_ok( $cluster, 'Kafka::Cluster' );
#-- Connecting to the Kafka server port (for example for node_id = 0)
( $port ) = $cluster->servers;

$connection = Kafka::Connection->new(
    host                    => $HOST,
    port                    => $port,
    AutoCreateTopicsEnable  => 1,
    RETRY_BACKOFF           => $RETRY_BACKOFF * 2,
    dont_load_supported_api_versions => 1,
);
$producer = Kafka::Producer->new(
    Connection      => $connection,
    # Require verification the number of messages sent and recorded
    RequiredAcks    => $BLOCK_UNTIL_IS_COMMITTED,
);
$consumer = Kafka::Consumer->new(
    Connection      => $connection,
);

@clients = ();

foreach my $i ( 1 .. $PAIRS_CLIENTS ) {
    foreach my $client_type ( 'producer', 'consumer' ) {
        push @clients,
            {
                client_type => $client_type,
                partition   => $i - 1,          # partitions are 0 based
                count       => 0,
            }
        ;
    }
}

@clients = shuffle @clients;

my $tm = time;
my $nonfatals = 0;
while ( scalar @clients ) {
    my $i           = int( rand scalar @clients );
    my $client_type = $clients[ $i ]->{client_type};
    my $partition   = $clients[ $i ]->{partition};
    my $count       = $clients[ $i ]->{count};

    if ( $count == 0 ) {
        note "Started '$client_type', partition = $partition";
    }

    if ( $client_type eq 'producer' ) {
        $clients[ $i ]->{count} += sending( $partition );
    } else {    # consumer
        $clients[ $i ]->{count} += fetching( $partition );
    }
    $count = $clients[ $i ]->{count};

    if ( ( my $new_nonfatals = $connection->nonfatal_errors() ) != $nonfatals ) {
        note "New non-fatal errors fixed: '$client_type', partition = $partition";
        note_nonfatals();
        $nonfatals = $new_nonfatals;
    }

    if ( $count > $CLIENT_MSGS ) {
        note "Finished '$client_type', partition = $partition";

        #-- total
        if ( $client_type eq 'producer' ) {
            is $count, next_offset( $partition ), "total number of recorded messages matches the number of messages sent ($count)";
        }

        splice( @clients, $i, 1 );
    }

    if ( $tm < time ) {
        pass 'There are clients working';
        $tm = time;
    }
}
note_nonfatals();

$cluster->close;