The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

use 5.010;
use strict;
use warnings;

# NAME: Performance test

use lib 'lib';
use bytes;
use Params::Util qw( _ARRAY0 );
use Time::HiRes     qw( gettimeofday );

use Test::More;
plan "no_plan";

BEGIN {
    eval "use Test::Deep";
    plan skip_all => "because Test::Deep required for testing" if $@;
}

# PRECONDITIONS ----------------------------------------------------------------

#-- verify load the module
BEGIN { use_ok 'Kafka', qw(
    KAFKA_SERVER_PORT
    DEFAULT_TIMEOUT
    TIMESTAMP_LATEST
    TIMESTAMP_EARLIEST
    DEFAULT_MAX_OFFSETS
    DEFAULT_MAX_SIZE
    ERROR_CANNOT_BIND
    ) }
BEGIN { use_ok 'Kafka::IO' }
BEGIN { use_ok 'Kafka::Producer' }
BEGIN { use_ok 'Kafka::Consumer' }

#-- declaration of variables to test
my (
    $io,
    $producer,
    $consumer,
    $first_offset,
    $messages,
    $total,
    $fetch,
    $request_size,
    $delta,
    @copy,
    $in_single,
    $in_package,
    $number_of_package_mix,
    $number_of_package_ser,
    );

my $topic = <DATA>;
chomp $topic;
$topic ||= "test";

my $partition           = 0;
my $timeout             = $ENV{KAFKA_BENCHMARK_TIMEOUT} || DEFAULT_TIMEOUT;

my @chars = ( " ", "A" .. "Z", "a" .. "z", 0 .. 9, qw(! @ $ % ^ & *) );
my $min_len             = $ENV{KAFKA_BENCHMARK_LEN_MIN} || 200;
my $max_len             = $ENV{KAFKA_BENCHMARK_LEN_MAX} || 200;
my $number_of_package   = $ENV{KAFKA_BENCHMARK_PACKAGE} || 5_000;
my $number_of_single    = $ENV{KAFKA_BENCHMARK_SINGLE}  || 5;
my $max_size            = DEFAULT_MAX_SIZE * 512;   # ~512 MB

my %bench = ();

#-- setting up facilities
unless ( $io = Kafka::IO->new(
    host    => "localhost",
    timeout => $timeout,
    ) )
{
    fail "(".Kafka::IO::last_errorcode().") ".Kafka::IO::last_error();
    exit 1;
}
isa_ok( $io, 'Kafka::IO');

unless ( $producer = Kafka::Producer->new( IO => $io ) )
{
    fail "(".Kafka::Producer::last_errorcode().") ".Kafka::Producer::last_error();
    exit 1;
}
isa_ok( $producer, 'Kafka::Producer');

unless ( $consumer = Kafka::Consumer->new( IO => $io ) )
{
    fail "(".Kafka::Consumer::last_errorcode().") ".Kafka::Consumer::last_error();
    exit 1;
}
isa_ok( $consumer, 'Kafka::Consumer');

#-- definition of the functions

sub next_offset {
    my $consumer    = shift;
    my $topic       = shift;
    my $partition   = shift;
    my $is_package  = shift;

    my $offsets = $consumer->offsets(
        $topic,
        $partition,
        TIMESTAMP_LATEST,                           # time
        DEFAULT_MAX_OFFSETS                         # max_number
        );
    if( $offsets )
    {
        ok( defined( _ARRAY0( $offsets ) ), "offsets are obtained" ) if $is_package;
        return $$offsets[0];
    }
    if ( !$offsets or $consumer->last_error )
    {
        fail "(".$consumer->last_errorcode.") ".$consumer->last_error;
        return;
    }
}

sub send_messages {
    my $producer    = shift;
    my $topic       = shift;
    my $partition   = shift;
    my $messages    = shift;

    my ( $time_before, $time_after );
    $time_before = gettimeofday;
    my $ret = $producer->send( $topic, $partition, $messages );
    $time_after = gettimeofday;
    if ( $ret )
    {
        is( $ret, 1, "sent a series of messages" ) if $#{$messages};
        return $time_after - $time_before;
    }
    else
    {
        fail "(".$producer->last_errorcode.") ".$producer->last_error;
        return;
    }
}

sub fetch_messages {
    my $consumer    = shift;
    my $topic       = shift;
    my $partition   = shift;
    my $offset      = shift;
    my $max_size    = shift;
    my $is_package  = shift;

    my ( $time_before, $time_after );
    $time_before = gettimeofday;
    my $messages = $consumer->fetch( $topic, $partition, $offset, $max_size );
    $time_after = gettimeofday;
    if ( $messages )
    {
        ok( defined( _ARRAY0( $messages ) ), "messages are received" ) if $is_package;
        my @fetch;
        my $cnt = 0;
        foreach my $m ( @$messages )
        {
            push @fetch, $m->payload;
            unless ( $m->valid )
            {
                diag "Message No $cnt, Error: ", $m->error;
                diag "Payload    : ", bytes::length( $m->payload ) > 100 ? substr( $m->payload, 0, 100 )."..." : $m->payload;
                diag "offset     : ", $m->offset;
                diag "next_offset: ", $m->next_offset;
            }
            ++$cnt;
        }
        return ( \@fetch, $time_after - $time_before );
    }
    if ( !$messages or $consumer->last_error )
    {
        fail "(".$consumer->last_errorcode.") ".$consumer->last_error;
        return;
    }
}

sub random_strings {
    my $chars       = shift;
    my $min_len     = shift;
    my $max_len     = shift;
    my $number_of   = shift;

    note "generation of messages can take a while";
    my ( @strings, $size );
    $strings[ $number_of - 1 ] = undef;
    my $delta = $max_len - $min_len + 1;
    foreach my $i ( 0 .. ( $number_of - 1 ) )
    {
        my $len = $delta ? int( rand( $delta ) ) + $min_len : $min_len;
        $strings[ $i ] = join( "", @chars[ map { rand @$chars } ( 1 .. $len ) ]);
        $size += $len;
    }
    note "generation of messages complited";
    return \@strings, $size;
}

sub report {
    diag "Legend:";
    diag "Message length: $min_len .. $max_len";
    diag "Messages      : package - $number_of_package, single - $number_of_single";
    diag "IO timeout    : $timeout";

    diag "Total:";
    foreach my $k ( qw( send_package send_single send_mix fetch_package fetch_single fetch_mix fetch_inseries ) )
    {
        diag sprintf( "%-14s ", $k ), sprintf( "%.4f", $bench{ $k } );
    }

    foreach my $result ( (
        $bench{send_package}, $bench{fetch_package}
        ) )
    {
        $result /= $number_of_package;
    }

    foreach my $result ( (
        $bench{send_mix}, $bench{fetch_mix}
        ) )
    {
        $result /= $number_of_package_mix;
    }

    foreach my $result ( (
        $bench{fetch_inseries}
        ) )
    {
        $result /= $number_of_package_ser;
    }

    foreach my $result ( ( $bench{send_single}, $bench{fetch_single} ) )
    {
        $result /= $number_of_single;
    }

    diag "Seconds per message:";
    foreach my $k ( qw( send_package send_single send_mix fetch_package fetch_single fetch_mix fetch_inseries ) )
    {
        diag sprintf( "%-14s ", $k ), sprintf( "%.4f", $bench{ $k } );
    }

    diag "Messages per second:";
    foreach my $k ( qw( send_package send_single send_mix fetch_package fetch_single fetch_mix fetch_inseries ) )
    {
        diag sprintf( "%-14s ", $k ), $bench{ $k } ? sprintf( "%4d", int( 1 / $bench{ $k } ) ) : "N/A";
    }
}

# INSTRUCTIONS -----------------------------------------------------------------

$in_package = $number_of_package;

#-- Package
( $messages, $total ) = random_strings( \@chars, $min_len, $max_len, $in_package );
@copy = (); push @copy, @$messages;
$request_size = $in_package * 9 + $total;

$fetch = [];
$bench{fetch_package} = $bench{send_package} = 0;

# to wait for forcing a flush of previous data to disk
$first_offset = next_offset( $consumer, $topic, $partition, 1 );
while (1)
{
    sleep 1;
    if ( $first_offset != next_offset( $consumer, $topic, $partition, 1 ) )
    {
        diag 'to wait for forcing a flush of previous data to disk';
        $first_offset = next_offset( $consumer, $topic, $partition, 1 );
    }
    else
    {
        last;
    }
}

while (1)
{
    note "PRODUCE Request transfer size $request_size bytes, please wait...";
    $first_offset = next_offset( $consumer, $topic, $partition, 1 );
    $bench{send_package} = $bench{send_package}
        + send_messages( $producer, $topic, $partition, $messages );
    note "PRODUCE Request transmitted";

    note "waiting for messages to get ready...";
    1 while next_offset( $consumer, $topic, $partition ) < $first_offset + $request_size;

    my ( $fetched, $to_bench );

    note "trying to get FETCH Response to all messages, please wait...";
    ( $fetched, $to_bench ) = fetch_messages( $consumer, $topic, $partition, $first_offset, $max_size, 1 );
    $bench{fetch_package} += $to_bench;
    push @$fetch, @$fetched;

    last if $bench{send_package} and $bench{fetch_package};

    $number_of_package += $in_package;
    push @$messages, @copy;
}

cmp_deeply $fetch, $messages, "all messages are received correctly";

#-- Single message
( $messages, $total ) = random_strings( \@chars, $min_len, $max_len, $number_of_single );
@copy = (); push @copy, @$messages;

$fetch = [];
$bench{fetch_single} = $bench{send_single} = 0;

$in_single = $number_of_single;
while (1)
{
    note "message processing one by one, please wait...";
    foreach my $idx ( 0 .. ( $in_single - 1 ) )
    {
        $first_offset = next_offset( $consumer, $topic, $partition );

        $bench{send_single} += send_messages( $producer, $topic, $partition, [ $copy[ $idx ] ] );

        my ( $fetched, $to_bench );

        1 while next_offset( $consumer, $topic, $partition ) == $first_offset;

        ( $fetched, $to_bench ) = fetch_messages( $consumer, $topic, $partition, $first_offset, $max_size );
        push @$fetch, $$fetched[0];
        $bench{fetch_single} += $to_bench;
    }

    last if $bench{send_single} and $bench{fetch_single};

    $number_of_single += $in_single;
    push @$messages, @copy;
}
cmp_deeply $fetch, $messages, "all messages are processed correctly";

#-- Mix
$number_of_package_mix = $in_package;
( $messages, $total ) = random_strings( \@chars, $min_len, $max_len, $in_package );
@copy = (); push @copy, @$messages;
$request_size = $in_package * 9 + $total;

$fetch = [];
$bench{fetch_mix} = $bench{send_mix} = 0;

while (1)
{
    note "message sending one by one, please wait...";
    $first_offset = next_offset( $consumer, $topic, $partition, 1 );

    foreach my $idx ( 0 .. ( $in_package - 1 ) )
    {
        $bench{send_mix} += send_messages( $producer, $topic, $partition, [ $copy[ $idx ] ] );
    }

    note "waiting for messages to get ready...";
    1 while next_offset( $consumer, $topic, $partition ) < $first_offset + $request_size;

    my ( $fetched, $to_bench );

    note "trying to get FETCH Response to all messages, please wait...";
    ( $fetched, $to_bench ) = fetch_messages( $consumer, $topic, $partition, $first_offset, $max_size, 1 );
    $bench{fetch_mix} += $to_bench;
    push @$fetch, @$fetched;

    last if $bench{send_mix} and $bench{fetch_mix};

    $number_of_package_mix += $in_package;
    push @$messages, @copy;
}

cmp_deeply $fetch, $messages, "all messages are received correctly";

#-- Consuming messages one by one
# Uses Mix section data
$number_of_package_ser = $in_package;

$fetch = [];
$bench{fetch_inseries} = 0;

while (1)
{
    note "trying to get FETCH Response to all messages one by one, please wait...";
    $delta = 0;
    foreach my $idx ( 0 .. ( $in_package - 1 ) )
    {
        my ( $fetched, $to_bench );

        ( $fetched, $to_bench ) = fetch_messages(
            $consumer,
            $topic,
            $partition,
            $first_offset + $delta,
            9 + bytes::length( $$messages[ $idx ] )
            );
        $delta += 9 + bytes::length( $$messages[ $idx ] );

        push @$fetch, $$fetched[0];
        $bench{fetch_inseries} += $to_bench;
    }

    last if $bench{fetch_inseries};

    $number_of_package_ser += $in_package;
    push @$messages, @copy;
}

cmp_deeply $fetch, $messages, "all messages are received correctly";

# POSTCONDITIONS ---------------------------------------------------------------

# Closes and cleans up
$producer->close;
ok( scalar( keys %$producer ) == 0, "the producer object is an empty" );
$consumer->close;
ok( scalar( keys %$consumer ) == 0, "the consumer object is an empty" );

# Statistics
report();

# DO NOT REMOVE THE FOLLOWING LINES
__DATA__
test