#!/usr/bin/perl -w
#-- Pragmas --------------------------------------------------------------------
use 5.010;
use strict;
use warnings;
use lib qw(
lib
t/lib
../lib
);
# ENVIRONMENT ------------------------------------------------------------------
use Test::More;
BEGIN {
plan skip_all => 'Unknown base directory of Kafka server'
unless defined $ENV{KAFKA_BASE_DIR};
}
#-- verify load the module
BEGIN {
eval 'use Test::NoWarnings'; ## no critic
plan skip_all => 'because Test::NoWarnings required for testing' if $@;
}
plan 'no_plan';
#-- load the modules -----------------------------------------------------------
use Config::IniFiles;
use Const::Fast;
use Data::Dumper;
use File::Copy;
use FindBin qw(
$Bin
);
use File::Spec::Functions qw(
catdir
catfile
);
use List::Util qw(
min
shuffle
);
use POSIX ':sys_wait_h';
use Kafka qw(
$BLOCK_UNTIL_IS_COMMITTED
$MESSAGE_SIZE_OVERHEAD
$RECEIVE_LATEST_OFFSET
);
use Kafka::Cluster;
use Kafka::Connection;
use Kafka::Consumer;
use Kafka::Internals qw(
$MAX_CORRELATIONID
);
use Kafka::MockIO;
use Kafka::Producer;
#-- setting up facilities ------------------------------------------------------
STDOUT->autoflush;
const my $KAFKA_PROPERTIES_FILE => 'server.properties';
my ( $properties_file, $bak_properties_file );
{
my $start_dir = ( substr( $Bin, -1 ) eq 't' ) ? $Bin : catdir( $Bin, 't' );
my $kafka_config_dir = catdir( $start_dir, 'config' );
$properties_file = catfile( $kafka_config_dir, $KAFKA_PROPERTIES_FILE );
}
$bak_properties_file = $properties_file.'.bak';
unlink $bak_properties_file;
copy( $properties_file, $bak_properties_file );
#-- declarations ---------------------------------------------------------------
# Restrictions:
# All pairs clients work with a common topic
# Each pair of clients (Producer + Consumer) works with a separate partition
# All messages have the same length
const my $PAIRS_CLIENTS => 10;
const my $MAX_DATA => 20 * 1024 *1024; # MBs
const my $MAX_DATA_RECORS => 1_000_000;
const my $MSG_LEN => int( $MAX_DATA / $MAX_DATA_RECORS );
const my $MAX_MSGS_SENT => min( 10_000, int( $MAX_CORRELATIONID / ( $MSG_LEN + $MESSAGE_SIZE_OVERHEAD ) ) ),
const my $MAX_MSGS_RECV => min( 10_000, int( $MAX_CORRELATIONID / ( $MSG_LEN + $MESSAGE_SIZE_OVERHEAD ) ) ),
const my $CLIENT_MSGS => int( $MAX_DATA_RECORS / $PAIRS_CLIENTS );
const my $SECS_TO_WAIT => 300;
const my $AUTO_CREATE_TOPICS => 'true';
const my $REPLICATION_FACTOR => 3;
const my $KAFKA_BASE_DIR => $ENV{KAFKA_BASE_DIR}; # WARNING: must match the settings of your system
const my $TOPIC => $Kafka::MockIO::TOPIC;
const my $HOST => $Kafka::Cluster::HOST;
const my $INI_SECTION => 'GENERAL';
my ( @msg_pool, $cluster, $port, @clients, $pid, $ppid, $connection, $producer, $consumer, $is_ready );
$SIG{USR1} = sub { ++$is_ready };
sub create_msg_pool {
my @chars = ( " ", "A" .. "Z", "a" .. "z", 0 .. 9, qw(! @ $ % ^ & *) );
note 'generation of messages can take a while';
$msg_pool[ $MAX_MSGS_SENT - 1 ] = undef;
foreach my $i ( 0 .. ( $MAX_MSGS_SENT - 1 ) ) {
$msg_pool[ $i ] = join( q{}, @chars[ map { rand( scalar( @chars ) ) } ( 1 .. $MSG_LEN ) ] );
}
note 'generation of messages complited';
}
sub setup {
if ( !( my $cfg = Config::IniFiles->new(
-file => $properties_file,
-fallback => $INI_SECTION,
) ) ) {
restore_properties();
my $error = q{};
map { $error .= "\n$_" } @Config::IniFiles::errors;
BAIL_OUT "$properties_file error: $error";
}
else {
$cfg->setval( $INI_SECTION, 'auto.create.topics.enable' => $AUTO_CREATE_TOPICS );
$cfg->setval( $INI_SECTION, 'default.replication.factor' => $REPLICATION_FACTOR );
$cfg->RewriteConfig( $properties_file );
}
}
sub restore_properties {
unlink $properties_file;
rename $bak_properties_file, $properties_file;
}
sub sending {
my ( $partition ) = @_;
my $msgs_to_send = int( rand( $MAX_MSGS_SENT ) + 1 );
my $first_msg_idx = int( rand( $MAX_MSGS_SENT - $msgs_to_send ) );
my $last_msg_idx = $first_msg_idx + $msgs_to_send - 1;
my $messages = [ @msg_pool[ $first_msg_idx .. $last_msg_idx ] ];
eval {
$producer->send(
$TOPIC,
$partition,
$messages,
);
};
if ( $@ ) {
fail "'send' FATAL error: $@";
return 0;
} else {
return $msgs_to_send;
}
}
sub next_offset {
my ( $partition ) = @_;
my $offsets;
eval {
$offsets = $consumer->offsets(
$TOPIC,
$partition,
$RECEIVE_LATEST_OFFSET, # time
);
};
if ( $@ ) {
fail "'offsets' FATAL error: $@";
return;
} else {
if ( $offsets ) {
return $offsets->[0];
}
if ( !$offsets ) {
fail 'offsets are not received';
return;
}
}
}
sub fetching {
my ( $partition ) = @_;
my $next_offset = next_offset( $partition );
return 0 unless $next_offset;
my $msgs_to_receive = min( $next_offset, int( rand( $MAX_MSGS_RECV ) + 1 ) );
my $start_offset = int( rand( $next_offset - $msgs_to_receive ) );
my $bytes_to_receive = $msgs_to_receive * ( $MSG_LEN + $MESSAGE_SIZE_OVERHEAD );
eval {
$consumer->fetch(
$TOPIC,
$partition,
$start_offset,
$bytes_to_receive, # Maximum size of MESSAGE(s) to receive
);
};
if ( $@ ) {
fail "'fetch' FATAL error: $@";
return;
} else {
return $msgs_to_receive;
}
}
sub is_died {
my ( $pid ) = @_;
my $rc = waitpid( $pid, &WNOHANG );
return( ( $rc == 0 ) ? 0 : 1 );
}
sub wait_child_ready {
my $secs_to_wait = $SECS_TO_WAIT;
my $all_alive;
my $alive_pid;
while ( $secs_to_wait-- ) {
$all_alive = 1;
foreach my $pid ( @clients ) {
if ( is_died( $pid ) ) {
$all_alive = 0;
$alive_pid = $pid;
last;
}
}
last if $all_alive;
sleep 1;
}
unless ( $all_alive ) {
kill 'KILL' => @clients;
fail "too long a wait for ready $alive_pid, waitpid = ".waitpid( $alive_pid, &WNOHANG );
}
return $all_alive;
}
sub wait_parent_ready {
my $secs_to_wait = $SECS_TO_WAIT;
my $ok = 1;
until ( $is_ready ) {
unless ( $secs_to_wait-- ) {
fail 'too long a wait for a parent';
$ok = 0;
last;
}
sleep 1;
}
return $ok;
}
sub wait_child_success {
my $secs_to_wait = $SECS_TO_WAIT;
my $all_died;
my $alive_pid;
while ( $secs_to_wait-- ) {
$all_died = 1;
foreach my $pid ( @clients ) {
unless ( is_died( $pid ) ) {
$all_died = 0;
$alive_pid = $pid;
pass 'There are clients working';
last;
}
}
last if $all_died;
sleep 1;
}
unless ( $all_died ) {
kill 'KILL' => @clients;
fail "too long a wait for success $alive_pid, waitpid = ".waitpid( $alive_pid, &WNOHANG );
}
return $all_died;
}
sub note_nonfatals {
my $arr_ref = $connection->nonfatal_errors();
if ( @$arr_ref ) {
note "Non-fatal errors:";
}
foreach my $txt ( @$arr_ref ) {
note "\t$txt";
}
}
sub create_client {
my ( $client_type, $partition ) = @_;
if ( $pid = fork ) { # herein the parent
return $pid;
} elsif ( defined $pid ) { # herein the child process
$is_ready = 0;
$ppid = getppid();
note "Started '$client_type', pid = $$, partition = $partition";
$connection = Kafka::Connection->new(
host => $HOST,
port => $port,
AutoCreateTopicsEnable => 1,
);
if ( $client_type eq 'producer' ) {
$producer = Kafka::Producer->new(
Connection => $connection,
# Require verification the number of messages sent and recorded
RequiredAcks => $BLOCK_UNTIL_IS_COMMITTED,
);
} elsif ( $client_type eq 'consumer' ) {
$consumer = Kafka::Consumer->new(
Connection => $connection,
);
} else {
# nothing to do now
}
my $count = 0;
if ( wait_parent_ready() ) {
if ( $client_type eq 'producer' ) {
while ( $count < $CLIENT_MSGS ) {
$count += sending( $partition );
}
} elsif ( $client_type eq 'consumer' ) {
while ( $count < $CLIENT_MSGS ) {
$count += fetching( $partition );
}
} else {
# nothing to do now
}
}
note "Finished '$client_type', pid = $$, partition = $partition";
note_nonfatals();
exit;
} else {
fail 'An unexpected error (fork 1)';
}
}
#-- Global data ----------------------------------------------------------------
# INSTRUCTIONS -----------------------------------------------------------------
create_msg_pool();
setup();
#-- FORK competition
$cluster = Kafka::Cluster->new(
kafka_dir => $KAFKA_BASE_DIR,
replication_factor => $REPLICATION_FACTOR,
partition => $PAIRS_CLIENTS,
);
isa_ok( $cluster, 'Kafka::Cluster' );
#-- Connecting to the Kafka server port (for example for node_id = 0)
( $port ) = $cluster->servers;
foreach my $i ( 1 .. $PAIRS_CLIENTS ) {
foreach my $client_type ( 'producer', 'consumer' ) {
my $pid = create_client( $client_type, $i - 1 );
ok $pid, "client started: $pid";
push @clients, $pid; # partitions are 0 based
}
}
if ( wait_child_ready() ) {
sleep 1;
kill 'USR1' => @clients;
}
wait_child_success();
$cluster->close;
#-- Inside a process competition
$cluster = Kafka::Cluster->new(
kafka_dir => $KAFKA_BASE_DIR,
replication_factor => $REPLICATION_FACTOR,
partition => $PAIRS_CLIENTS,
);
isa_ok( $cluster, 'Kafka::Cluster' );
#-- Connecting to the Kafka server port (for example for node_id = 0)
( $port ) = $cluster->servers;
$connection = Kafka::Connection->new(
host => $HOST,
port => $port,
AutoCreateTopicsEnable => 1,
);
$producer = Kafka::Producer->new(
Connection => $connection,
# Require verification the number of messages sent and recorded
RequiredAcks => $BLOCK_UNTIL_IS_COMMITTED,
);
$consumer = Kafka::Consumer->new(
Connection => $connection,
);
@clients = ();
foreach my $i ( 1 .. $PAIRS_CLIENTS ) {
foreach my $client_type ( 'producer', 'consumer' ) {
push @clients,
{
client_type => $client_type,
partition => $i - 1, # partitions are 0 based
count => 0,
}
;
}
}
@clients = shuffle @clients;
my $tm = time;
my $nonfatals = 0;
while ( scalar @clients ) {
my $i = int( rand scalar @clients );
my $client_type = $clients[ $i ]->{client_type};
my $partition = $clients[ $i ]->{partition};
my $count = $clients[ $i ]->{count};
if ( $count == 0 ) {
note "Started '$client_type', partition = $partition";
}
if ( $client_type eq 'producer' ) {
$clients[ $i ]->{count} += sending( $partition );
} else { # consumer
$clients[ $i ]->{count} += fetching( $partition );
}
$count = $clients[ $i ]->{count};
if ( ( my $new_nonfatals = $connection->nonfatal_errors() ) != $nonfatals ) {
note "New non-fatal errors fixed: '$client_type', partition = $partition";
note_nonfatals();
$nonfatals = $new_nonfatals;
}
if ( $count > $CLIENT_MSGS ) {
note "Finished '$client_type', partition = $partition";
#-- total
if ( $client_type eq 'producer' ) {
is $count, next_offset( $partition ), "total number of recorded messages coincides with the number of messages sent ($count)";
}
splice( @clients, $i, 1 );
}
if ( $tm < time ) {
pass 'There are clients working';
$tm = time;
}
}
note_nonfatals();
$cluster->close;
# POSTCONDITIONS ---------------------------------------------------------------
restore_properties();