The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

use 5.010;
use strict;
use warnings;

# NAME: Consuming messages

use lib 'lib';
use bytes;
use Time::HiRes     qw( gettimeofday );
use Getopt::Long;

# PRECONDITIONS ----------------------------------------------------------------

#-- verify load the module
use Kafka qw(
    DEFAULT_TIMEOUT
    DEFAULT_MAX_SIZE
    TIMESTAMP_LATEST
    TIMESTAMP_EARLIEST
    DEFAULT_MAX_OFFSETS
    );
use Kafka::IO;
use Kafka::Consumer;

#-- declaration of variables to test
my $host                = "localhost",
my $topic               = undef;
my $partition           = undef;
my $timeout             = DEFAULT_TIMEOUT;
my $number_of_package   = 1_000;
my $msg_len             = 200;

my $re_read             = 0;
my $no_infinite         = 0;
my $ctrl_c              = 0;

my $help;

#-- setting up facilities

my $ret = GetOptions(
    "host=s"        => \$host,
    "topic=s"       => \$topic,
    "partition=i"   => \$partition,
    "length=i"      => \$msg_len,
    "package=i"     => \$number_of_package,
    "re_read"       => \$re_read,
    "no_infinite"   => \$no_infinite,
    "help|?"        => \$help,
    );

if ( !$ret or $help or !$topic or !defined( $partition ) )
{
    print <<HELP;
Usage: $0 --topic="..." --partition=... [--host="..."] [--package=...] [--length=...] [--re_read] [--no_infinite]

Consume messages from parition of a given topic

Options:
    --help
        Display this help and exit

    --topic="..."
        topic name
    --partition=...
        partition to use
    --host="..."
        Apache Kafka host to connect to
    --package=...
        number of messages in single fetch request
    --length=...
        length of messages
    --re_read
        re-read all the available data
    --no_infinite
        no an infinite loop
HELP
    exit 1;
}
my (
    $io,
    $consumer,
    $first_offset,
    $fetch,
    @tmp_bench,
    );
my %bench = ();

unless ( $io = Kafka::IO->new(
    host    => $host,
    timeout => $timeout,
    ) )
{
    print STDERR "Kafka::IO->new: (".Kafka::IO::last_errorcode().") ".Kafka::IO::last_error();
    exit 1;
}

unless ( $consumer = Kafka::Consumer->new( IO => $io ) )
{
    print STDERR "Kafka::Consumer->new: (".Kafka::Consumer::last_errorcode().") ".Kafka::Consumer::last_error();
    exit 1;
}

#-- definition of the functions

$SIG{INT} = \&tsk;

sub tsk {
    $SIG{INT} = \&tsk;
    $ctrl_c = 1;
}

sub fetch_messages {
    my $consumer    = shift;
    my $topic       = shift;
    my $partition   = shift;
    my $offset      = shift;
    my $max_size    = shift;

    my ( $time_before, $time_after );
    $time_before = gettimeofday;
    my $messages = $consumer->fetch( $topic, $partition, $offset, $max_size );
    $time_after = gettimeofday;
    if ( $messages )
    {
        my $cnt = 0;
        foreach my $m ( @$messages )
        {
            unless ( $m->valid )
            {
                print STDERR "Message No $cnt, Error: ", $m->error, "\n";
                print STDERR "Payload    : ", bytes::length( $m->payload ) > 100 ? substr( $m->payload, 0, 100 )."..." : $m->payload, "\n";
                print STDERR "offset     : ", $m->offset, "\n";
                print STDERR "next_offset: ", $m->next_offset, "\n";
            }
            ++$cnt;
        }
        return ( $messages, $time_after - $time_before );
    }
    if ( !$messages or $consumer->last_error )
    {
        print STDERR "fetch: (".$consumer->last_errorcode.") ".$consumer->last_error;
        exit 1;
    }
    return;
}

# INSTRUCTIONS -----------------------------------------------------------------

#-- Mix
$fetch = [];
my $wanted_size = ( 9 + $msg_len ) * $number_of_package;
my %total = (
    messages    => 0,
    seconds     => 0,
    );

# an infinite loop
INFINITE: {
    $first_offset = ( $re_read or !@$fetch ) ? 0 : $fetch->[ @$fetch - 1 ]->next_offset;
    $fetch = [];
    $bench{fetch_mix} = 0;
    my $cnt = 0;
    my $all_bytes = 0;

# to determine the number of messages per second
    MIX: while (1)
    {
# until all messages
        {
            last INFINITE if $ctrl_c;

# useful work
            my ( $fetched, $to_bench ) = fetch_messages(
                $consumer,
                $topic,
                $partition,
                @$fetch ? $fetch->[ @$fetch - 1 ]->next_offset : $first_offset,
                $wanted_size,
                );
            last unless @$fetched;

# decoration
            $bench{fetch_mix} += $to_bench;
            $total{seconds}     += $to_bench;
            $total{messages}    += scalar @$fetched;
            foreach my $m ( @$fetched )
            {
                my $len = bytes::length( $m->payload );
                $all_bytes      += $len;
                $total{bytes}   += $len;
            }

            push @$fetch, @$fetched;
            my $already = scalar @$fetch;
            my $mbs = $all_bytes / ( 1024 * 1024 );
            my $tmp_bench = $bench{fetch_mix};

            $cnt += scalar @$fetched;

            my $secs = $bench{fetch_mix};

            $tmp_bench /= ( scalar $already );

            print STDERR "[", scalar localtime, "] ",
                "Received $already messages ",
                "(".sprintf( "%.3f", $mbs )." MB), ",
                $tmp_bench ? sprintf( "%d", int( 1 / $tmp_bench ) ) : "N/A",
                " messages/sec ",
                "(".( $secs ? ( sprintf( "%.3f", $mbs / $secs ) ) : "N/A" )." MB/sec)",
                " " x 10;
            if ( $cnt < 10_000 )
            {
                print STDERR "\r";
            }
            else
            {
                print STDERR "\n";
                $cnt = 0;
            }

            redo;
        }

        last if $bench{fetch_mix};
    }

    redo unless $no_infinite;
}

# POSTCONDITIONS ---------------------------------------------------------------

# Closes and cleans up
$consumer->close;

# Statistics
my $mbs = $total{bytes} / ( 1024 * 1024 );
print STDERR "\n[", scalar localtime, "] Total: ",
    "Received $total{messages} messages ",
    "(".sprintf( "%.3f", $mbs )." MB), ",
    ( $total{seconds} ) ? sprintf( "%d", int( $total{messages} / $total{seconds} ) ) : "N/A",
    " messages/sec ",
    "(".( $total{seconds} ? ( sprintf( "%.3f", $mbs / $total{seconds} ) ) : "N/A" )." MB/sec)\n";