package Kafka::IO;

=head1 NAME

Kafka::IO - Interface to network communication with the Apache Kafka server.

=head1 VERSION

This documentation refers to C<Kafka::IO> version 1.03 .


use 5.010;
use strict;
use warnings;

our $DEBUG = 0;

our $VERSION = '1.03';

use Carp;
use Config;
use Const::Fast;
use Data::Validate::Domain qw(
use Data::Validate::IP qw(
use Errno qw(
use Fcntl;
use IO::Select;
use Params::Util qw(
use POSIX qw(
use Scalar::Util qw(
use Socket qw(
use Sys::SigAction qw(
use Time::HiRes ();
use Try::Tiny;

use Kafka qw(
use Kafka::Exceptions;
use Kafka::Internals qw(


    use 5.010;
    use strict;
    use warnings;

    use Scalar::Util qw(
    use Try::Tiny;

    use Kafka::IO;

    my $io;
    try {
        $io = Kafka::IO->new( host => 'localhost' );
    } catch {
        my $error = $_;
        if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
            warn 'Error: (', $error->code, ') ',  $error->message, "\n";
        } else {
            die $error;

    # Closes and cleans up
    undef $io;


This module is private and should not be used directly.

In order to achieve better performance, methods of this module do not
perform arguments validation.

The main features of the C<Kafka::IO> class are:

=over 3

=item *

Provides an object oriented API for communication with Kafka.

=item *

This class allows you to create Kafka 0.9+ clients.



# Hard limit of IO operation retry attempts, to prevent high CPU usage in IO retry loop
const my $MAX_RETRIES => 30;

our $_hdr;

#-- constructor ----------------------------------------------------------------


=head3 C<new>

Establishes TCP connection to given host and port, creates and returns C<Kafka::IO> IO object.

C<new()> takes arguments in key-value pairs. The following arguments are currently recognized:

=over 3

=item C<host =E<gt> $host>

C<$host> is Kafka host to connect to. It can be a host name or an IP-address in
IPv4 or IPv6 form (for example '', '0:0:0:0:0:0:0:1' or '::1').

=item C<port =E<gt> $port>

Optional, default = C<$KAFKA_SERVER_PORT>.

C<$port> is integer attribute denoting the port number of to access Apache Kafka.

C<$KAFKA_SERVER_PORT> is the default Apache Kafka server port that can be imported
from the L<Kafka|Kafka> module.

=item C<timeout =E<gt> $timeout>

C<$REQUEST_TIMEOUT> is the default timeout that can be imported from the L<Kafka|Kafka> module.

Special behavior when C<timeout> is set to C<undef>:


=over 3

=item *

Alarms are not used internally (namely when performing C<gethostbyname>).

=item *

Default C<$REQUEST_TIMEOUT> is used for the rest of IO operations.


=over 3

=item C<ip_version =E<gt> $ip_version>

Force version of IP protocol for resolving host name (or interpretation of passed address).

Optional, undefined by default, which works in the following way: version of IP address
is detected automatically, host name is resolved into IPv4 address.

See description of L<$IP_V4|Kafka::IO/$IP_V4>, L<$IP_V6|Kafka::IO/$IP_V6>
in C<Kafka> L<EXPORT|Kafka/EXPORT>.


sub new {
    my ( $class, @args ) = @_;

    my $self = bless {
        host        => '',
        timeout     => $REQUEST_TIMEOUT,
        port        => $KAFKA_SERVER_PORT,
        ip_version  => undef,
        af          => '',  # Address family constant
        pf          => '',  # Protocol family constant
        ip          => '',  # Human-readable textual representation of the ip address
    }, $class;

    while ( @args ) {
        my $k = shift @args;
        $self->{ $k } = shift @args if exists $self->{ $k };

    # we trust it: make it untainted
    ( $self->{host} ) = $self->{host} =~ /\A(.+)\z/;
    ( $self->{port} ) = $self->{port} =~ /\A(.+)\z/;

    $self->{socket}     = undef;
    $self->{_io_select} = undef;
    my $error;
    try {
    } catch {
        $error = $_;

    $self->_error( $ERROR_CANNOT_BIND, format_message("Kafka::IO(%s:%s)->new: %s", $self->{host}, $self->{port}, $error ) )
        if defined $error
    return $self;

#-- public attributes ----------------------------------------------------------

=head2 METHODS

The following methods are provided by C<Kafka::IO> class:


=head3 C<< send( $message <, $timeout> ) >>

Sends a C<$message> to Kafka.

The argument must be a bytes string.

Use optional C<$timeout> argument to override default timeout for this request only.

Returns the number of characters sent.

sub send {
    my ( $self, $message, $timeout ) = @_;
    $self->_error( $ERROR_MISMATCH_ARGUMENT, '->send' )
        unless defined( _STRING( $message ) )
    my $length = length( $message );
    $self->_error( $ERROR_MISMATCH_ARGUMENT, '->send' )
        unless $length <= $MAX_SOCKET_REQUEST_BYTES
    $timeout = $self->{timeout} // $REQUEST_TIMEOUT unless defined $timeout;
    $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
        unless $timeout > 0
    my $select = $self->{_io_select};
    $self->_error( $ERROR_NO_CONNECTION, 'Attempt to work with a closed socket' ) unless $select;

    $self->_debug_msg( $message, 'Request to', 'green' )
        if $self->debug_level >= 2
    my $sent = 0;

    my $started = Time::HiRes::time();
    my $until = $started + $timeout;

    my $error_code;
    my $errno;
    my $retries = 0;
    my $interrupts = 0;
    ATTEMPT: while ( $sent < $length && $retries++ < $MAX_RETRIES ) {
        my $remaining_time = $until - Time::HiRes::time();
        last ATTEMPT if $remaining_time <= 0; # timeout expired

        undef $!;
        my $can_write = $select->can_write( $remaining_time );
        $errno = $!;
        if ( $errno ) {
            if ( $errno == EINTR ) {
                undef $errno;
                --$retries; # this attempt does not count
                next ATTEMPT;


            last ATTEMPT;

        if ( $can_write ) {
            # check for EOF on the first attempt only
            if ( $retries == 1 && $self->_is_close_wait ) {
                $error_code = $ERROR_NO_CONNECTION;
                last ATTEMPT;

            undef $!;
            my $wrote = CORE::send( $self->{socket}, $message, MSG_DONTWAIT );
            $errno = $!;

            if( defined $wrote && $wrote > 0 ) {
                $sent += $wrote;
                if ( $sent < $length ) {
                    # remove written data from message
                    $message = substr( $message, $wrote );

            if( $errno ) {
                if( $errno == EINTR ) {
                    undef $errno;
                    --$retries; # this attempt does not count
                    next ATTEMPT;
                } elsif (
                           $errno != EAGAIN
                        && $errno != EWOULDBLOCK
                        ## on freebsd, if we got ECONNRESET, it's a timeout from the other side
                        && !( $errno == ECONNRESET && $^O eq 'freebsd' )
                    ) {
                    last ATTEMPT;

            last ATTEMPT unless defined $wrote;

    unless( !$errno && defined( $sent ) && $sent == $length )
            $error_code // $ERROR_CANNOT_SEND,
            format_message( "Kafka::IO(%s)->send: ERRNO=%s ERROR='%s' (length=%s, sent=%s, timeout=%s, retries=%s, interrupts=%s, secs=%.6f)",
                ( $errno // 0 ) + 0,
                ( $errno // '<none>' ) . '',
                Time::HiRes::time() - $started,

    return $sent;

=head3 C<< receive( $length <, $timeout> ) >>

Receives a message up to C<$length> size from Kafka.

C<$length> argument must be a positive number.

Use optional C<$timeout> argument to override default timeout for this call only.

Returns a reference to the received message.

sub receive {
    my ( $self, $length, $timeout ) = @_;
    $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
        unless $length > 0
    $timeout = $self->{timeout} // $REQUEST_TIMEOUT unless defined $timeout;
    $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
        unless $timeout > 0
    my $select = $self->{_io_select};
    $self->_error( $ERROR_NO_CONNECTION, 'Attempt to work with a closed socket' ) unless $select;

    my $message = '';
    my $len_to_read = $length;

    my $started = Time::HiRes::time();
    my $until = $started + $timeout;

    my $error_code;
    my $errno;
    my $retries = 0;
    my $interrupts = 0;
    ATTEMPT: while ( $len_to_read > 0 && $retries++ < $MAX_RETRIES ) {
        my $remaining_time = $until - Time::HiRes::time();
        last if $remaining_time <= 0; # timeout expired

        undef $!;
        my $can_read = $select->can_read( $remaining_time );
        $errno = $!;
        if ( $errno ) {
            if ( $errno == EINTR ) {
                undef $errno;
                --$retries; # this attempt does not count
                next ATTEMPT;


            last ATTEMPT;

        if ( $can_read ) {
            my $buf = '';
            undef $!;
            my $from_recv = CORE::recv( $self->{socket}, $buf, $len_to_read, MSG_DONTWAIT );
            $errno = $!;

            if ( defined( $from_recv ) && length( $buf ) ) {
                $message .= $buf;
                $len_to_read = $length - length( $message );
                --$retries; # this attempt was successful, don't count as a retry
            if ( $errno ) {
                if ( $errno == EINTR ) {
                    undef $errno;
                    --$retries; # this attempt does not count
                    next ATTEMPT;
                } elsif (
                           $errno != EAGAIN
                        && $errno != EWOULDBLOCK
                        ## on freebsd, if we got ECONNRESET, it's a timeout from the other side
                        && !( $errno == ECONNRESET && $^O eq 'freebsd' )
                    ) {
                    last ATTEMPT;

            if ( length( $buf ) == 0 ) {
                if( defined( $from_recv ) && ! $errno ) {
                    # no error and nothing received with select returning "can read" means EOF: other side closed socket
                    $self->_debug_msg( 'EOF on receive attempt, closing socket' )
                        if $self->debug_level;

                    if( length( $message ) == 0 ) {
                        # we did not receive anything yet, so we may (in some cases) reconnect and try again
                        $error_code = $ERROR_NO_CONNECTION;

                    last ATTEMPT;
                # we did not read anything on this attempt: wait a bit before the next one; should not happen, but just in case...
                if ( my $remaining_attempts = $MAX_RETRIES - $retries ) {
                    $remaining_time = $until - Time::HiRes::time();
                    my $micro_seconds = int( $remaining_time * 1e6 / $remaining_attempts );
                    if ( $micro_seconds > 0 ) {
                        $micro_seconds = 250_000 if $micro_seconds > 250_000; # prevent long sleeps if total remaining time is big
                        $self->_debug_msg( format_message( 'sleeping (remaining attempts %d, time %.6f): %d microseconds', $remaining_attempts, $remaining_time, $micro_seconds ) )
                            if $self->debug_level;
                        Time::HiRes::usleep( $micro_seconds );

    unless( !$errno && length( $message ) >= $length )
            $error_code // $ERROR_CANNOT_RECV,
            format_message( "Kafka::IO(%s)->receive: ERRNO=%s ERROR='%s' (length=%s, received=%s, timeout=%s, retries=%s, interrupts=%s, secs=%.6f)",
                ( $errno // 0 ) + 0,
                ( $errno // '<none>' ) . '',
                length( $message ),
                Time::HiRes::time() - $started,
    $self->_debug_msg( $message, 'Response from', 'yellow' )
        if $self->debug_level >= 2;

    # returns tainted data
    return \$message;

=head3 C<close>

Closes connection to Kafka server.
Returns true if those operations succeed and if no error was reported by any PerlIO layer.

sub close {
    my ( $self ) = @_;

    my $ret = 1;
    if ( $self->{socket} ) {
        $ret = CORE::close( $self->{socket} );
        $self->{socket}     = undef;
        $self->{_io_select} = undef;

    return $ret;

sub _is_close_wait {
    my ( $self ) = @_;
    return 1 unless $self->{socket} && $self->{_io_select}; # closed already
    # socket is open; check if we can read, and if we can but recv() cannot peek, it means we got EOF
    return unless $self->{_io_select}->can_read( 0 ); # we cannot read, but may be able to write
    my $buf = '';
    undef $!;
    my $status = CORE::recv( $self->{socket}, $buf, 1, MSG_DONTWAIT | MSG_PEEK ); # peek, do not remove data from queue
    # EOF when there is no error, status is defined, but result is empty
    return ! $! && defined $status && length( $buf ) == 0;

# The method verifies if we can connect to a Kafka broker.
# This is evil: opens and immediately closes a NEW connection so do not use unless there is a strong reason for it.
sub _is_alive {
    my ( $self ) = @_;

    my $socket = $self->{socket};
    return unless $socket;

    socket( my $tmp_socket, $self->{pf}, SOCK_STREAM, IPPROTO_TCP );
    my $is_alive = connect( $tmp_socket, getpeername( $socket ) );
    CORE::close( $tmp_socket );

    return $is_alive;

#-- private attributes ---------------------------------------------------------

#-- private methods ------------------------------------------------------------

# You need to have access to Kafka instance and be able to connect through TCP.
# uses
sub _connect {
    my ( $self ) = @_;

    $self->{socket}     = undef;
    $self->{_io_select} = undef;

    my $name    = $self->{host};
    my $port    = $self->{port};
    my $timeout = $self->{timeout};

    my $ip = '';
    if ( $self->_get_family( $name ) ) {
        $ip = $self->{ip} = $name;
    } else {
        if ( defined $timeout ) {
            my $remaining;
            my $start = time();

            $self->_debug_msg( format_message( "name = '%s', number of wallclock seconds = %s", $name, ceil( $timeout ) ) )
                if $self->debug_level;

            # DNS lookup.
            local $@;
            my $h = set_sig_handler( 'ALRM', sub { die 'alarm clock restarted' },
                    mask    => [ 'ALRM' ],
                    safe    => 0,   # perl 5.8+ uses safe signal delivery so we need unsafe signal for timeout to work
            eval {
                $remaining = alarm( ceil( $timeout ) );
                $ip = $self->_gethostbyname( $name );
                alarm 0;
            alarm 0;                                # race condition protection
            my $error = $@;
            undef $h;

            $self->_debug_msg( format_message( "_connect: ip = '%s', error = '%s', \$? = %s, \$! = '%s'", $ip, $error, $?, $! ) )
                if $self->debug_level;

            die $error if $error;
            die( format_message( "gethostbyname %s: \$? = '%s', \$! = '%s'\n", $name, $?, $! ) ) unless $ip;

            my $elapsed = time() - $start;
            # $SIG{ALRM} restored automatically, but we need to restart previous alarm manually

            $self->_debug_msg( format_message( '_connect: %s (remaining) - %s (elapsed) = %s', $remaining, $elapsed, $remaining - $elapsed ) )
                if $self->debug_level;
            if ( $remaining ) {
                if ( $remaining - $elapsed > 0 ) {
                    $self->_debug_msg( '_connect: remaining - elapsed > 0 (to alarm restart)' )
                        if $self->debug_level;
                    alarm( ceil( $remaining - $elapsed ) );
                } else {
                    $self->_debug_msg( '_connect: remaining - elapsed < 0 (to alarm function call)' )
                        if $self->debug_level;
                    # $SIG{ALRM}->();
                    kill ALRM => $$;
                $self->_debug_msg( "_connect: after alarm 'recalled'" )
                    if $self->debug_level;
        } else {
            $ip = $self->_gethostbyname( $name );
            die( format_message( "could not resolve host name to IP address: %s\n", $name ) ) unless $ip;

    # Create socket.
    socket( my $connection, $self->{pf}, SOCK_STREAM, scalar getprotobyname( 'tcp' ) ) or die( "socket: $!\n" );

    # Set autoflushing.
    $_ = select( $connection ); $| = 1; select $_;

    # Set FD_CLOEXEC.
    my $flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl: $!\n";
    fcntl( $connection, F_SETFL, $flags | FD_CLOEXEC ) or die "fnctl: $!\n";

    $flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl F_GETFL: $!\n"; # 0 for error, 0e0 for 0.
    fcntl( $connection, F_SETFL, $flags | O_NONBLOCK ) or die "fcntl F_SETFL O_NONBLOCK: $!\n"; # 0 for error, 0e0 for 0.

    # Connect returns immediately because of O_NONBLOCK.
    my $sockaddr = $self->{af} eq AF_INET
        ? pack_sockaddr_in(  $port, inet_aton( $ip ) )
        : pack_sockaddr_in6( $port, inet_pton( $self->{af}, $ip ) )
    connect( $connection, $sockaddr ) || $!{EINPROGRESS} || die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) );

    # Reset O_NONBLOCK.
    $flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl F_GETFL: $!\n";  # 0 for error, 0e0 for 0.
    fcntl( $connection, F_SETFL, $flags & ~ O_NONBLOCK ) or die "fcntl F_SETFL not O_NONBLOCK: $!\n";  # 0 for error, 0e0 for 0.

    # Use select() to poll for completion or error. When connect succeeds we can write.
    my $vec = '';
    vec( $vec, fileno( $connection ), 1 ) = 1;
    select( undef, $vec, undef, $timeout // $REQUEST_TIMEOUT );
    unless ( vec( $vec, fileno( $connection ), 1 ) ) {
        # If no response yet, impose our own timeout.
        $! = ETIMEDOUT;
        die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) );

    # This is how we see whether it connected or there was an error. Document Unix, are you kidding?!
    $! = unpack( 'L', getsockopt( $connection, SOL_SOCKET, SO_ERROR ) );
    die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) ) if $!;

    # Set timeout on all reads and writes.
    # Note the difference between Perl's sysread() and read() calls: sysread()
    # queries the kernel exactly once, with max delay specified here. read()
    # queries the kernel repeatedly until there's a read error (such as this
    # timeout), EOF, or a full buffer. So when using read() with a timeout of one
    # second, if the remote server sends 1 byte repeatedly at 1 second intervals,
    # read() will read the whole buffer very slowly and sysread() will return only
    # the first byte. The print() and syswrite() calls are similarly different.
    # <> is of course similar to read() but delimited by newlines instead of buffer
    # sizes.
    my $timeval = _get_timeval( $timeout // $REQUEST_TIMEOUT );
    setsockopt( $connection, SOL_SOCKET, SO_SNDTIMEO, $timeval ) // die "setsockopt SOL_SOCKET, SO_SNDTIMEO: $!\n";
    setsockopt( $connection, SOL_SOCKET, SO_RCVTIMEO, $timeval ) // die "setsockopt SOL_SOCKET, SO_RCVTIMEO: $!\n";

    $self->{socket} = $connection;
    my $s = $self->{_io_select} = IO::Select->new;
    $s->add( $self->{socket} );

    return $connection;

# Packing timeval
# uses
sub _get_timeval {
    my $timeout = shift;

    my $intval = int( $timeout );                               # sec
    my $fraction = int( ( $timeout - $intval ) * 1_000_000 );   # ms

    if ( $Config{osname} eq 'netbsd' && _major_osvers() >= 6 && $Config{longsize} == 4 ) {
        if ( defined $Config{use64bitint} ) {
            $timeout = pack( 'QL', int( $timeout ), $fraction );
        } else {
            $timeout = pack(
                    $Config{byteorder} eq '1234'
                        ? ( $timeout, 0, $fraction )
                        : ( 0, $timeout, $fraction )
    } else {
        $timeout = pack( 'L!L!', $timeout, $fraction );

    return $timeout;

sub _major_osvers {
    my $osvers = $Config{osvers};
    my ( $major_osvers ) = $osvers =~ /^(\d+)/;
    $major_osvers += 0;

    return $major_osvers;

sub _gethostbyname {
    my ( $self, $name ) = @_;

    my $is_v4_fqdn = 1;
    $self->{ip} = '';

    my $ip_version = $self->{ip_version};
    if ( defined( $ip_version ) && $ip_version == $IP_V6 ) {
        my ( $err, @addrs ) = getaddrinfo(
            '',     # not interested in the service name
                family      => AF_INET6,
                socktype    => SOCK_STREAM,
                protocol    => IPPROTO_TCP,
        return( $self->{ip} ) if $err;

        $is_v4_fqdn = 0;
        for my $addr ( @addrs ) {
            my ( $err, $ipaddr ) = getnameinfo( $addr->{addr}, NI_NUMERICHOST, NIx_NOSERV );
            next if $err;

            $self->{af} = AF_INET6;
            $self->{pf} = PF_INET6;
            $self->{ip} = $ipaddr;

    if ( $is_v4_fqdn && ( !defined( $ip_version ) || $ip_version == $IP_V4 ) ) {
        if ( my $ipaddr = gethostbyname( $name ) ) {
            $self->{ip} = inet_ntop( $self->{af}, $ipaddr );

    return $self->{ip};

sub _get_family {
    my ( $self, $name ) = @_;

    my $is_ip;
    my $ip_version = $self->{ip_version} // 0;
    if ( ( ( $is_ip = is_ipv6( $name ) ) && !$ip_version ) || $ip_version == $IP_V6 ) {
        $self->_error( $ERROR_INCOMPATIBLE_HOST_IP_VERSION, format_message( 'ip_version = %s, host = %s', $ip_version, $name ) )
                && (
                        ( !$is_ip && is_ipv4( $name ) )
                    || ( $is_ip && $ip_version == $IP_V4 )

        $self->{af} = AF_INET6;
        $self->{pf} = PF_INET6;
    } elsif ( ( ( $is_ip = is_ipv4( $name ) ) && !$ip_version ) || $ip_version == $IP_V4 ) {
        $self->_error( $ERROR_INCOMPATIBLE_HOST_IP_VERSION, format_message( 'ip_version = %s, host = %s', $ip_version, $name ) )
                && (
                        ( !$is_ip && is_ipv6( $name ) )
                    || ( $is_ip && $ip_version == $IP_V6 )

        $self->{af} = AF_INET;
        $self->{pf} = PF_INET;
    } elsif ( !$ip_version ) {
        $self->{af} = AF_INET;
        $self->{pf} = PF_INET;

    return $is_ip;

# Show additional debugging information
sub _debug_msg {
    my ( $self, $message, $header, $colour ) = @_;

    if ( $header ) {
        unless ( $_hdr ) {
            require Data::HexDump::Range;
            $_hdr = Data::HexDump::Range->new(
                FORMAT                          => 'ANSI',  # 'ANSI'|'ASCII'|'HTML'
                COLOR                           => 'bw',    # 'bw' | 'cycle'
                OFFSET_FORMAT                   => 'hex',   # 'hex' | 'dec'
                DATA_WIDTH                      => 16,      # 16 | 20 | ...
                DISPLAY_RANGE_NAME              => 0,
#                MAXIMUM_RANGE_NAME_SIZE         => 16,
                DISPLAY_COLUMN_NAMES            => 1,
                DISPLAY_RULER                   => 1,
                DISPLAY_OFFSET                  => 1,
#                DISPLAY_CUMULATIVE_OFFSET       => 1,
                DISPLAY_ZERO_SIZE_RANGE         => 1,
                DISPLAY_RANGE_NAME              => 0,
#                DISPLAY_RANGE_SIZE              => 1,
                DISPLAY_ASCII_DUMP              => 1,
                DISPLAY_HEX_DUMP                => 1,
#                DISPLAY_DEC_DUMP                => 1,
#                COLOR_NAMES                     => {},
                ORIENTATION                     => 'horizontal',

        say STDERR
            "# $header ", $self->{host}, ':', $self->{port}, "\n",
            '# Hex Stream: ', unpack( 'H*', $message ), "\n",
                    [ 'data', length( $message ), $colour ],
    } else {
        say STDERR format_message( '[%s] %s', scalar( localtime ), $message );


# Handler for errors
sub _error {
    my $self = shift;
    my %args = throw_args( @_ );
    $self->_debug_msg( format_message( 'throwing IO error %s: %s', $args{code}, $args{message} ) )
        if $self->debug_level;
    Kafka::Exception::IO->throw( %args );




When error is detected, an exception, represented by object of C<Kafka::Exception::IO> class,
is thrown (see L<Kafka::Exceptions|Kafka::Exceptions>).

L<code|Kafka::Exceptions/code> and a more descriptive L<message|Kafka::Exceptions/message> provide
information about thrown exception. Consult documentation of the L<Kafka::Exceptions|Kafka::Exceptions>
for the list of all available methods.

Authors suggest using of L<Try::Tiny|Try::Tiny>'s C<try> and C<catch> to handle exceptions while
working with L<Kafka|Kafka> package.

Here is the list of possible error messages that C<Kafka::IO> may produce:

=over 3

=item C<Invalid argument>

Invalid arguments were passed to a method.

=item C<Cannot send>

Message cannot be sent on a C<Kafka::IO> object socket.

=item C<Cannot receive>

Message cannot be received.

=item C<Cannot bind>

TCP connection cannot be established on given host and port.


=head2 Debug mode

Debug output can be enabled by passing desired level via environment variable
using one of the following ways:

C<PERL_KAFKA_DEBUG=1>     - debug is enabled for the whole L<Kafka|Kafka> package.

C<PERL_KAFKA_DEBUG=IO:1>  - enable debug for C<Kafka::IO> only.

C<Kafka::IO> supports two debug levels (level 2 includes debug output of 1):

=over 3

=item 1

Additional information about processing events/alarms.

=item 2

Dump of binary messages exchange with Kafka server.


=head1 SEE ALSO

The basic operation of the Kafka package modules:

L<Kafka|Kafka> - constants and messages used by the Kafka package modules.

L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.

L<Kafka::Producer|Kafka::Producer> - interface for producing client.

L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.

L<Kafka::Message|Kafka::Message> - interface to access Kafka message

L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
protocol on 32 bit systems.

L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
Apache Kafka's Protocol.

L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server.

L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.

L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
by several package modules.

A wealth of detail about the Apache Kafka and the Kafka Protocol:

Main page at L<>

Kafka Protocol at L<>


Kafka package is hosted on GitHub:

=head1 AUTHOR

Sergey Gladkov

Please use GitHub project link above to report problems or contact authors.


Alexander Solovey

Jeremy Jordan

Sergiy Zuban

Vlad Marchenko

Damien Krotkine


Copyright (C) 2012-2017 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under
the same terms as Perl itself. See I<perlartistic> at

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
