The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

# NAME: Producer test

use 5.010;
use strict;
use warnings;

use lib qw(

use Getopt::Long;
use Scalar::Util qw(
use Time::HiRes qw(
use Try::Tiny;

use Kafka::Connection;
use Kafka::Producer;

my $host                = 'localhost',
my $port                = undef;
my $topic               = 'mytopic';
my $partitions          = 1;
my $msg_len             = 200;
my $number_of_messages  = 10_000;

my ( $ret, $help );

$ret = GetOptions(
    'host=s'        => \$host,
    'port=i'        => \$port,
    'topic=s'       => \$topic,
    'partitions=i'  => \$partitions,
    'messages=i'    => \$number_of_messages,
    'length=i'      => \$msg_len,
    'help|?'        => \$help,

if ( !$ret || $help || !$host || !$port || !$topic || !$partitions || !$number_of_messages || !$msg_len ) {
    print <<HELP;
Usage: $0 [--host="..."] --port=... [--topic="..."] [--partitions=...] [--messages=...] [--length=...]

Send messages to random paritions of a given topic

        Display this help and exit

        Apache Kafka host to connect to
        Apache Kafka port to connect to
        topic name
        number of partitions to use
        number of messages to send
        length of messages

    exit 1;

my ( $connect, $producer, $messages, $messages_sent, $dispatch_time, $mbs );

sub exit_on_error {
    my ( $error ) = @_;

    my $message;
    if ( !blessed( $error ) || !$_->isa( 'Kafka::Exception' ) ) {
        $message = $error;
    } else {
        $message = $_->message;
    say STDERR $message;
    exit 1;

sub random_strings {
    my @chars = ( ' ', 'A'..'Z', 'a'..'z', 0..9, qw( ! @ $ % ^ & * ) );

    print STDERR 'generation of messages can take a while';
    my @strings;
    $strings[ $number_of_messages - 1 ] = undef;
    foreach my $i ( 0..( $number_of_messages - 1 ) ) {
        $strings[ $i ] = join( q{}, @chars[ map { rand @chars } ( 1..$msg_len ) ] );

    return \@strings;

sub send_message {
    my ( $partition, $message ) = @_;

    my ( $ret, $time_before, $time_after );
    $time_before = gettimeofday();
    try {
        $ret = $producer->send( $topic, $partition, $message );
    } catch {
        exit_on_error( $_ );
    $time_after = gettimeofday();

    return $time_after - $time_before;

try {
    $connect  = Kafka::Connection->new( host => $host, port => $port );
    $producer = Kafka::Producer->new( Connection => $connect );
} catch {
    exit_on_error( $_ );

$messages       = random_strings();
$messages_sent  = 0;
$dispatch_time  = 0;

while (1) {
    print STDERR "\rmessage sending one by one, please wait...\r";
    foreach my $idx ( 0..( $number_of_messages - 1 ) ) {
        $dispatch_time += send_message( int( rand( $partitions ) ), $messages->[ $idx ] );

        # decoration
        unless ( ( my $num = $idx + 1 ) % 1000 ) {
            $mbs = ( $num * $msg_len ) / ( 1024 * 1024 );
            print( STDERR
                sprintf( '[%s] Sent %d messages (%.3f MB) %s messages/sec (%s MB/sec)',
                    scalar localtime,
                    $dispatch_time ? sprintf( '%d',   int( $num / $dispatch_time ) ) : 'N/A',
                    $dispatch_time ? sprintf( '%.3f', $mbs / $dispatch_time )        : 'N/A'
                ' ' x 10,
    $messages_sent += $number_of_messages;

    last if $dispatch_time; # achieved significant time

# Closes and cleans up

undef $producer;
undef $connect;

# Statistics

$mbs = ( $messages_sent * $msg_len ) / ( 1024 * 1024 );
say( STDERR sprintf( '[%s] Total: Sent %d messages (%.3f MB), %d messages/sec (%.3f MB/sec)',
        scalar localtime,
        int( $messages_sent / $dispatch_time ),
        $mbs / $dispatch_time,