The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

# NAME: Producer test



use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    ../lib
);





use Getopt::Long;
use Scalar::Util qw(
    blessed
);
use Time::HiRes qw(
    gettimeofday
);
use Try::Tiny;

use Kafka::Connection;
use Kafka::Producer;



my $host                = 'localhost',
my $port                = undef;
my $topic               = 'mytopic';
my $partitions          = 1;
my $msg_len             = 200;
my $number_of_messages  = 10_000;

my ( $ret, $help );

$ret = GetOptions(
    'host=s'        => \$host,
    'port=i'        => \$port,
    'topic=s'       => \$topic,
    'partitions=i'  => \$partitions,
    'messages=i'    => \$number_of_messages,
    'length=i'      => \$msg_len,
    'help|?'        => \$help,
);

if ( !$ret || $help || !$host || !$port || !$topic || !$partitions || !$number_of_messages || !$msg_len ) {
    print <<HELP;
Usage: $0 [--host="..."] --port=... [--topic="..."] [--partitions=...] [--messages=...] [--length=...]

Send messages to random paritions of a given topic

Options:
    --help
        Display this help and exit

    --host="..."
        Apache Kafka host to connect to
    --port=...
        Apache Kafka port to connect to
    --topic="..."
        topic name
    --partitions=...
        number of partitions to use
    --messages=...
        number of messages to send
    --length=...
        length of messages
HELP

    exit 1;
}



my ( $connect, $producer, $messages, $messages_sent, $dispatch_time, $mbs );

sub exit_on_error {
    my ( $error ) = @_;

    my $message;
    if ( !blessed( $error ) || !$_->isa( 'Kafka::Exception' ) ) {
        $message = $error;
    } else {
        $message = $_->message;
    }
    say STDERR $message;
    exit 1;
}

sub random_strings {
    my @chars = ( ' ', 'A'..'Z', 'a'..'z', 0..9, qw( ! @ $ % ^ & * ) );

    print STDERR 'generation of messages can take a while';
    my @strings;
    $strings[ $number_of_messages - 1 ] = undef;
    foreach my $i ( 0..( $number_of_messages - 1 ) ) {
        $strings[ $i ] = join( q{}, @chars[ map { rand @chars } ( 1..$msg_len ) ] );
    }

    return \@strings;
}

sub send_message {
    my ( $partition, $message ) = @_;

    my ( $ret, $time_before, $time_after );
    $time_before = gettimeofday();
    try {
        $ret = $producer->send( $topic, $partition, $message );
    } catch {
        exit_on_error( $_ );
    };
    $time_after = gettimeofday();

    return $time_after - $time_before;
}



try {
    $connect  = Kafka::Connection->new( host => $host, port => $port );
    $producer = Kafka::Producer->new( Connection => $connect );
} catch {
    exit_on_error( $_ );
};



$messages       = random_strings();
$messages_sent  = 0;
$dispatch_time  = 0;

while (1) {
    print STDERR "\rmessage sending one by one, please wait...\r";
    foreach my $idx ( 0..( $number_of_messages - 1 ) ) {
        $dispatch_time += send_message( int( rand( $partitions ) ), $messages->[ $idx ] );

        # decoration
        unless ( ( my $num = $idx + 1 ) % 1000 ) {
            $mbs = ( $num * $msg_len ) / ( 1024 * 1024 );
            print( STDERR
                sprintf( '[%s] Sent %d messages (%.3f MB) %s messages/sec (%s MB/sec)',
                    scalar localtime,
                    $num,
                    $mbs,
                    $dispatch_time ? sprintf( '%d',   int( $num / $dispatch_time ) ) : 'N/A',
                    $dispatch_time ? sprintf( '%.3f', $mbs / $dispatch_time )        : 'N/A'
                ),
                ' ' x 10,
                "\r",
            );
        }
    }
    $messages_sent += $number_of_messages;

    last if $dispatch_time; # achieved significant time
}



# Closes and cleans up

undef $producer;
$connect->close;
undef $connect;

# Statistics

$mbs = ( $messages_sent * $msg_len ) / ( 1024 * 1024 );
say( STDERR sprintf( '[%s] Total: Sent %d messages (%.3f MB), %d messages/sec (%.3f MB/sec)',
        scalar localtime,
        $messages_sent,
        $mbs,
        int( $messages_sent / $dispatch_time ),
        $mbs / $dispatch_time,
    ),
);