The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

# NAME: Consuming messages

#-- Pragmas --------------------------------------------------------------------

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    ../lib
);

# ENVIRONMENT ------------------------------------------------------------------

#-- load the modules -----------------------------------------------------------

use Getopt::Long;
use Scalar::Util qw(
    blessed
);
use Time::HiRes qw(
    gettimeofday
);
use Try::Tiny;

use Kafka qw(
    $MESSAGE_SIZE_OVERHEAD
    $RECEIVE_EARLIEST_OFFSETS
    $RECEIVE_LATEST_OFFSET
);
use Kafka::Connection;
use Kafka::Consumer;

#-- setting up facilities ------------------------------------------------------

my $host                = 'localhost',
my $port                = undef;
my $topic               = 'mytopic';
my $partition           = 0;
my $msg_len             = 200;
my $number_of_messages  = 20_000;
my $re_read             = 0;
my $no_infinite         = 0;

my ( $ret, $help );

$ret = GetOptions(
    'host=s'        => \$host,
    'port=i'        => \$port,
    'topic=s'       => \$topic,
    'partition=i'   => \$partition,
    'package=i'     => \$number_of_messages,
    'length=i'      => \$msg_len,
    're_read'       => \$re_read,
    'no_infinite'   => \$no_infinite,
    'help|?'        => \$help,
);

if ( !$ret || $help || !$host || !$port || !$topic || !$msg_len || !$number_of_messages ) {
    print <<HELP;
Usage: $0 [--host="..."] --port=... [--topic="..."] [--partition=...] [--package=...] [--length=...] [--re_read] [--no_infinite]

Consume messages from parition of a given topic

Options:
    --help
        Display this help and exit

    --host="..."
        Apache Kafka host to connect to
    --port=...
        Apache Kafka port to connect to
    --topic="..."
        topic name
    --partition=...
        partition to use
    --package=...
        number of messages in single fetch request
    --length=...
        length of messages
    --re_read
        re-read all the available data
    --no_infinite
        no an infinite loop
HELP

    exit 1;
}

my $ctrl_c  = 0;
$SIG{INT}   = \&tsk;

sub tsk {
    $SIG{INT} = \&tsk;
    $ctrl_c = 1;
}

#-- declarations ---------------------------------------------------------------

my ( $connect, $consumer, $desired_size, $first_offset, $fetch, $dispatch_time, $messages_recv, $mbs );

sub exit_on_error {
    my ( $error ) = @_;

    my $message;
    if ( !blessed( $error ) || !$_->isa( 'Kafka::Exception' ) ) {
        $message = $error;
    } else {
        $message = $_->message;
    }
    say STDERR $message;
    exit 1;
}

sub fetch_messages {
    my ( $offset, $max_size ) = @_;

    my ( $messages, $time_before, $time_after );
    $time_before = gettimeofday();
    try {
        $messages = $consumer->fetch( $topic, $partition, $offset, $max_size );
    } catch {
        exit_on_error( $_ );
    };
    $time_after = gettimeofday();

    my $cnt = 0;
    foreach my $m ( @$messages ) {
        unless ( $m->valid ) {
            say STDERR "Message No $cnt, Error: ", $m->error;
            say STDERR 'Payload    : ', length( $m->payload ) > 100 ? substr( $m->payload, 0, 100 ).'...' : $m->payload;
            say STDERR 'offset     : ', $m->offset;
            say STDERR 'next_offset: ', $m->next_offset;
        }
        ++$cnt;
    }

    return $messages, $time_after - $time_before;
}

#-- Global data ----------------------------------------------------------------

try {
    $connect  = Kafka::Connection->new( host => $host, port => $port );
    $consumer = Kafka::Consumer->new( Connection => $connect );
} catch {
    exit_on_error( $_ );
};

$desired_size = ( $MESSAGE_SIZE_OVERHEAD + $msg_len ) * $number_of_messages;

# INSTRUCTIONS -----------------------------------------------------------------

$fetch          = [];
$messages_recv  = 0;
$dispatch_time  = 0;
my $cnt         = 0;

INFINITE:                                       # an infinite loop
{
    $first_offset = ( $re_read || !@$fetch ) ? 0 : $fetch->[ @$fetch - 1 ]->next_offset;
    $fetch  = [];

    CONSUMPTION:
    while (1) {
        # until all messages
        FETCH:
        {
            last INFINITE if $ctrl_c;

            # useful work
            my ( $fetched, $to_bench ) = fetch_messages(
                @$fetch ? $fetch->[ @$fetch - 1 ]->next_offset : $first_offset,
                $desired_size,
            );
            last FETCH unless @$fetched;    # all messages fetched

            $dispatch_time += $to_bench;
            $messages_recv += scalar @$fetched;
            push @$fetch, @$fetched;

            # decoration
            $mbs = ( $messages_recv * $msg_len ) / ( 1024 * 1024 );
            print( STDERR
                sprintf( '[%s] Received %d messages (%.3f MB) %s messages/sec (%s  MB/sec)',
                    scalar localtime,
                    $messages_recv,
                    $mbs,
                    $dispatch_time ? sprintf( '%d', int( $messages_recv / $dispatch_time ) ) : 'N/A',
                    $dispatch_time ? sprintf( '%.3f', $mbs / $dispatch_time ) : 'N/A',
                ),
                ' ' x 10,
            );
            if ( ( $cnt += scalar @$fetched ) < 200_000 ) {
                print STDERR "\r";
            }
            else {
                print STDERR "\n";
                $cnt = 0;
            }

            redo FETCH;                     # could still remain unread messages
        }
        last CONSUMPTION if $dispatch_time; # achieved significant time
    }
    redo INFINITE unless $no_infinite;
}

# POSTCONDITIONS ---------------------------------------------------------------

# Closes and cleans up

undef $consumer;
undef $connect;

# Statistics

$mbs = ( $messages_recv * $msg_len ) / ( 1024 * 1024 );
say( STDERR sprintf( '[%s] Total: Received %d messages (%.3f MB), %s messages/sec (%s MB/sec)',
        scalar localtime,
        $messages_recv,
        $mbs,
        $dispatch_time ? sprintf( '%d', int( $messages_recv / $dispatch_time ) ) : 'N/A',
        $dispatch_time ? sprintf( '%.3f', $mbs / $dispatch_time ) : 'N/A',
    ),
);