The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Kafka::Protocol;

use 5.010;
use strict;
use warnings;

use Exporter qw( import );
our @EXPORT_OK  = qw(
    REQUESTTYPE_PRODUCE
    REQUESTTYPE_FETCH
    REQUESTTYPE_MULTIFETCH
    REQUESTTYPE_MULTIPRODUCE
    REQUESTTYPE_OFFSETS
    produce_request
    fetch_request
    offsets_request
    fetch_response
    offsets_response
    );

our $VERSION = '0.12';

use bytes;
use Carp;
use String::CRC32;
use Params::Util    qw( _STRING _NONNEGINT _POSINT _NUMBER _ARRAY0 _SCALAR );

use Kafka qw(
    ERROR_INVALID_MESSAGE_CODE
    ERROR_MISMATCH_ARGUMENT
    ERROR_CHECKSUM_ERROR
    ERROR_COMPRESSED_PAYLOAD
    ERROR_NUMBER_OF_OFFSETS
    BITS64
    );

if ( !BITS64 ) { eval 'use Kafka::Int64; 1;' or die "Cannot load Kafka::Int64 : $@"; }  ## no critic

use constant {
    DEBUG                               => 0,

    REQUESTTYPE_PRODUCE                 => 0,
    REQUESTTYPE_FETCH                   => 1,
    REQUESTTYPE_MULTIFETCH              => 2,   # Not used now
    REQUESTTYPE_MULTIPRODUCE            => 3,   # Not used now
    REQUESTTYPE_OFFSETS                 => 4,

    MAGICVALUE_NOCOMPRESSION            => 0,
    MAGICVALUE_COMPRESSION              => 1,   # Not used now
    COMPRESSION_NO_COMPRESSION          => 0,
    COMPRESSION_GZIP                    => 1,   # Not used now
    COMPRESSION_SNAPPY                  => 2,   # Not used now
};

our $_last_error;
our $_last_errorcode;

my $position;

sub last_error {
    return $_last_error;
}

sub last_errorcode {
    return $_last_errorcode;
}

sub _error {
    my $error_code  = shift;

    $_last_errorcode  = $error_code;
    $_last_error      = $Kafka::ERROR[$_last_errorcode];
    confess $_last_error;
}

# Wire Format: https://cwiki.apache.org/confluence/display/KAFKA/Wire+Format
################################################################################
# Requests Wire Format

# Request Header ---------------------------------------------------------------

sub _request_header_encode {
    my $request_length  = shift;
    my $request_type    = shift;
    my $topic           = shift;
    my $partition       = shift;

    my$ encoded =
        pack( "
            N                                   # REQUEST_LENGTH
            n                                   # REQUEST_TYPE
            n                                   # TOPIC_LENGTH
            ",
            $request_length + 2+ 2 + bytes::length( $topic ) + 4,
            $request_type,
            bytes::length( $topic ),
            )
        .$topic
        .pack( "
            N                                   # PARTITION
            ",
            $partition,
            );

    if ( DEBUG )
    {
        print STDERR "Request header:\n"
            ."REQUEST_LENGTH    = ".($request_length + 2+ 2 + bytes::length( $topic ) + 4)."\n"
            ."REQUEST_TYPE      = $request_type\n"
            ."TOPIC_LENGTH      = ".bytes::length( $topic )."\n"
            ."TOPIC             = $topic\n"
            ."PARTITION         = $partition\n";
    }

    return $encoded;
}

# PRODUCE Request --------------------------------------------------------------

# LIMITATION: For all messages use the same properties:
#   magic:          Magic Value
#   compression:    (only for magic = MAGICVALUE_COMPRESSION)
# $messages may be one of:
#   simple SCALAR
#   reference to an array of scalars

sub produce_request {
    my $topic           = _STRING( shift ) or return _error( ERROR_MISMATCH_ARGUMENT );
    my $partition       = shift;
    my $messages        = shift;

    return _error( ERROR_MISMATCH_ARGUMENT ) unless defined( _NONNEGINT( $partition ) );

    (
        _STRING( $messages ) or
        _ARRAY0( $messages )
    ) or return _error( ERROR_MISMATCH_ARGUMENT );

    $messages = [ $messages ] if ( !ref( $messages ) );

    my $encoded = _messages_encode( $messages );

    $encoded = pack( "
        N                                       # MESSAGES_LENGTH
        ",
        bytes::length( $encoded ),
        ).$encoded;

    if ( DEBUG )
    {
        print STDERR "Produce request:\n"
            ."MESSAGES_LENGTH    = ".bytes::length( $encoded )."\n";
    }

    $encoded = _request_header_encode(
            bytes::length( $encoded ),
            REQUESTTYPE_PRODUCE,
            $topic,
            $partition
            ).$encoded;

    return $encoded;
}

# MESSAGE ----------------------------------------------------------------------

sub _messages_encode {
    my $messages    = shift;
# for future versions
    my $magic       = shift || MAGICVALUE_NOCOMPRESSION;
    my $compression = shift || COMPRESSION_NO_COMPRESSION;

    my $encoded = "";
    foreach my $message ( @$messages )
    {
        return _error( ERROR_INVALID_MESSAGE_CODE ) if ref( $message );

        $encoded .=
            pack( "
                N                               # LENGTH
                C                               # MAGIC
                ",
                bytes::length( $message ) + 5 + ( $magic ? 1 : 0 ),
                $magic,
                )
            .( $magic ?
                pack( "
                    C                           # COMPRESSION
                    ",
                    $compression,
                    )
                : "" )
            .pack( "
                N                               # CHECKSUM
                ",
                crc32( $message ),
                )
            .$message;
    }

    if ( DEBUG )
    {
        my $tmp = $encoded;
        _messages_decode( \$tmp );
    }

    return $encoded;
}

sub _messages_decode {
    my $encoded_messages = shift;               # requires a reference

    my $decoded_messages = [];
    my $len = bytes::length( $$encoded_messages );

    # 10 = lenfth( LENGTH + MAGIC + CHECKSUM + ( COMPRESSION or 1 byte of PAYLOAD ) )
    while ( $len - $position >= 10 )
    {
# will unpack exception if the message structure disrupted
        my $message = {};

        (
            $message->{length},
        ) = unpack( "x${position}
            N                                    # LENGTH
            ", $$encoded_messages );
        $position += 4;

        last if ( ( $len - $position ) < $message->{length} );

        (
            $message->{magic},
        ) = unpack( "x${position}
            C                                   # MAGIC
            ", $$encoded_messages );
        $position += 1;

        if ( $message->{magic} )
        {
            (
                $message->{compression},
            ) = unpack( "x${position}
                C                               # COMPRESSION
                ", $$encoded_messages );
            $position += 1;
        }
        else
        {
            $message->{compression} = 0;
        }

        my $p_len = $message->{length} - 5 - ( $message->{magic} ? 1 : 0 );
        (
            $message->{checksum},
            $message->{payload},
        ) = unpack( "x${position}
            N                                   # CHECKSUM
            a${p_len}                           # PAYLOAD
            ", $$encoded_messages );
        $position += 4 + $p_len;

        $message->{error} = "";
        $message->{error} = $Kafka::ERROR[ERROR_CHECKSUM_ERROR] if $message->{checksum} != crc32( $message->{payload} );
# compression in the current version is a bug
        $message->{error} .= ( $message->{error} ? "\n" : "" ).$Kafka::ERROR[ERROR_COMPRESSED_PAYLOAD] if $message->{magic};
        $message->{valid} = !$message->{error};

        push @$decoded_messages, $message;
    }

    if ( DEBUG )
    {
        print STDERR "Messages:\n";
        for ( my $idx = 0; $idx <= $#{$decoded_messages} ; $idx++ )
        {
            my $message = $decoded_messages->[ $idx ];
            print STDERR
                 "index              = $idx\n"
                ."LENGTH             = $message->{length}\n"
                ."MAGIC              = $message->{magic}\n"
                .( $message->{magic} ? "COMPRESSION        = $message->{compression}\n" : "" )
                ."CHECKSUM           = $message->{checksum}\n"
                ."PAYLOAD            = $message->{payload}\n"
                ."valid              = $message->{valid}\n"
                ."error              = $message->{error}\n";
        }
    }

    return $decoded_messages;
}

# FETCH Request ----------------------------------------------------------------

sub fetch_request {
    my $topic           = _STRING( shift ) or return _error( ERROR_MISMATCH_ARGUMENT );
    my $partition       = shift;
    my $offset          = shift;
    my $max_size        = _POSINT( shift ) or return _error( ERROR_MISMATCH_ARGUMENT );

    return _error( ERROR_MISMATCH_ARGUMENT ) unless defined( _NONNEGINT( $partition ) );
    ( ref( $offset ) eq "Math::BigInt" and $offset >= 0 ) or defined( _NONNEGINT( $offset ) ) or return _error( ERROR_MISMATCH_ARGUMENT );

    my $encoded = ( BITS64 ? pack( "q>", $offset + 0 ) : Kafka::Int64::packq( $offset + 0 ) )   # OFFSET
        .pack( "
            N                                   # MAX_SIZE
            ",
            $max_size,
            );

    if ( DEBUG )
    {
        print STDERR "Fetch request:\n"
            ."OFFSET             = $offset\n"
            ."MAX_SIZE           = $max_size\n";
    }

    $encoded = _request_header_encode(
            bytes::length( $encoded ),
            REQUESTTYPE_FETCH,
            $topic,
            $partition
            ).$encoded;

    return $encoded;
}

# OFFSETS Request --------------------------------------------------------------

sub offsets_request {
    my $topic           = _STRING( shift ) or return _error( ERROR_MISMATCH_ARGUMENT );
    my $partition       = shift;
    my $time            = shift;
    my $max_number      = _POSINT( shift ) or return _error( ERROR_MISMATCH_ARGUMENT );

    return _error( ERROR_MISMATCH_ARGUMENT ) unless defined( _NONNEGINT( $partition ) );
    ( ref( $time ) eq "Math::BigInt" ) or defined( _NUMBER( $time ) ) or return _error( ERROR_MISMATCH_ARGUMENT );
    $time = int( $time );
    return _error( ERROR_MISMATCH_ARGUMENT ) if $time < -2;

    my $encoded = ( BITS64 ? pack( "q>", $time + 0 ) : Kafka::Int64::packq( $time + 0 ) )   # TIME
        .pack( "
            N                                   # MAX_NUMBER
            ",
            $max_number,
            );

    if ( DEBUG )
    {
        print STDERR "Offsets request:\n"
            ."TIME               = $time\n"
            ."MAX_NUMBER         = $max_number\n";
    }

    $encoded = _request_header_encode(
            bytes::length( $encoded ),
            REQUESTTYPE_OFFSETS,
            $topic,
            $partition
            ).$encoded;

    return $encoded;
}

################################################################################
# Responses Wire Format

# Response Header

sub _response_header_decode {
    my $response = shift;                       # requires a reference

    my $header = {};

    # 6 = length( RESPONSE_LENGTH + ERROR_CODE )
    if ( bytes::length( $$response ) >= 6 )
    {
# will unpack exception if the message structure disrupted
        $position = 0;
        (
            $header->{response_length},
            $header->{error_code},
        ) = unpack( "
            N                                       # RESPONSE_LENGTH
            n                                       # ERROR_CODE
            ", $$response );
        $position += 6;

        if ( DEBUG )
        {
            print STDERR "Response Header:\n"
                ."RESPONSE_LENGTH    = $header->{response_length}\n"
                ."ERROR_CODE         = $header->{error_code}\n";
        }
    }

    return $header;
}

# PRODUCE Response

#   None

# FETCH Response

sub fetch_response {
    my $response = _SCALAR( shift ) or return _error( ERROR_MISMATCH_ARGUMENT );

    _STRING( $$response ) or return _error( ERROR_MISMATCH_ARGUMENT );
    # 6 = length( RESPONSE_LENGTH + ERROR_CODE )
    return _error( ERROR_MISMATCH_ARGUMENT ) if bytes::length( $$response ) < 6;

    my $decoded = {};

    if ( DEBUG )
    {
        print STDERR "Fetch response:\n";
    }

    if ( scalar keys %{$decoded->{header}      = _response_header_decode( $response )} )
    {
        $decoded->{messages}    = _messages_decode( $response ) unless $decoded->{header}->{error_code};
    }

    return $decoded;
}

# OFFSETS Response

sub offsets_response {
    my $response = _SCALAR( shift ) or return _error( ERROR_MISMATCH_ARGUMENT );

    _STRING( $$response ) or return _error( ERROR_MISMATCH_ARGUMENT );
    # 6 = length( RESPONSE_LENGTH + ERROR_CODE )
    return _error( ERROR_MISMATCH_ARGUMENT ) if bytes::length( $$response ) < 6;

    my $decoded = {};
    my $len = bytes::length( $$response );

    $decoded->{header} = _response_header_decode( $response );

    unless ( $decoded->{header}->{error_code} )
    {
        (
            $decoded->{number_offsets},
        ) = unpack( "
            x".$position
            ."N                                   # NUMBER_OFFSETS
            ", $$response );
        $position += 4;

        $decoded->{offsets} = [];
        while ( $position < $len )
        {
            my $offset;

            $offset = BITS64 ?                  # OFFSET
                unpack( "x${position}q>", $$response )
                : Kafka::Int64::unpackq( unpack( "x${position}a8", $$response ) );
            $position += 8;

            push @{$decoded->{offsets}}, $offset;
        }

        $decoded->{error} = ( $decoded->{number_offsets} == scalar( @{$decoded->{offsets}} ) ) ? "" : $Kafka::ERROR[ERROR_NUMBER_OF_OFFSETS];

        if ( DEBUG )
        {
            print STDERR "Offsets response:\n"
                ."NUMBER_OFFSETS     = $decoded->{number_offsets}\n"
                ."error              = $decoded->{error}\n";
            for ( my $idx = 0; $idx <= $#{$decoded->{offsets}} ; $idx++ )
            {
                print STDERR "OFFSET             = $decoded->{offsets}->[ $idx ]\n";
            }
        }
    }

    return $decoded;
}

################################################################################

1;

__END__

=head1 NAME

Kafka::Protocol - functions to process messages in the
Apache Kafka's 0.7 Wire Format

=head1 VERSION

This documentation refers to C<Kafka::Consumer> version 0.12

=head1 SYNOPSIS

Setting up:

    #-- Export
    use Kafka::Protocol qw(
        DEFAULT_MAX_SIZE
        REQUESTTYPE_PRODUCE
        REQUESTTYPE_FETCH
        REQUESTTYPE_MULTIFETCH
        REQUESTTYPE_MULTIPRODUCE
        REQUESTTYPE_OFFSETS
        produce_request
        fetch_request
        offsets_request
        fetch_response
        offsets_response
        );

    print "REQUEST_TYPE(s):\n";
    print
        REQUESTTYPE_PRODUCE,        " ",
        REQUESTTYPE_FETCH           " ",
        REQUESTTYPE_MULTIFETCH      " ",
        REQUESTTYPE_MULTIPRODUCE    " ",
        REQUESTTYPE_OFFSETS         "\n";

    #-- declaration of variables to test
    my $topic       = "test";
    my $partition   = 0;
    my $single_message = "The first message";
    my $series_of_messages = [
        "The first message",
        "The second message",
        "The third message",
        ];
    my $offset      = 0;
    my $max_size    = DEFAULT_MAX_SIZE;
    my $time        = -2;
    my $max_number  = 100;
    my ( $str, $hsh_ref, $arr_ref );

Requests:

    #-- Producer request:
    $str = unpack( "H*",
        produce_request( $topic, $partition, $single_message );
    $str = unpack( "H*",
        produce_request( $topic, $partition, $series_of_messages );

    #-- Offsets request:
    $str = unpack( "H*",
        offsets_request( $topic, $partition, $time, $max_number );

    #-- Fetch request:
    $str = unpack( "H*",
        fetch_request( $topic, $partition, $offset, $max_size );

Responses (look at the L<Sample Data|Kafka::Mock/"Sample Data"> section of
the L<Kafka::Mock|Kafka::Mock> module for a C<%responses> example):

    #-- Offsets response
    $arr_ref = offsets_response( \$responses{4} );

    #-- Fetch response
    $hsh_ref = fetch_response( \$responses{1} );

An error:

    eval { fetch_response( [] ) };  # expecting to die
                                    # 'Mismatch argument'
    print STDERR
            "(", Kafka::Protocol::last_error(), ") ",
            $Kafka::Protocol::last_error(), "\n";

=head1 DESCRIPTION

When producing messages, the driver has to specify what topic and partition
to send the message to. When requesting messages, the driver has to specify
what topic, partition, and offset it wants them pulled from.

While you can request "old" messages if you know their topic, partition, and
offset, Kafka does not have a message index. You cannot efficiently query
Kafka for the N-1000th message, or ask for all messages written between
30 and 35 minutes ago.

The main features of the C<Kafka::Protocol> module are:

=over 3

=item *

Supports parsing the Apache Kafka Wire Format protocol.

=item *

Supports Apache Kafka Requests and Responses (PRODUCE and FETCH with
no compression codec attribute now). Within this package we currently support
access to PRODUCE Request, FETCH Request, OFFSETS Request, FETCH Response,
OFFSETS Response.

=item *

Support for working with 64 bit elements of the Kafka Wire Format protocol
on 32 bit systems.

=back

=head2 FUNCTIONS

The following functions are available for C<Kafka::Protocol> module.

=over 3

=item *

B<offset>, B<max_size> or B<time>, B<max_number> are the additional information
that might be encoded parameters of the messages we want to access.

=back

=head3 C<produce_request( $topic, $partition, $messages )>

Returns a binary PRODUCE request string coded according to
the Apache Kafka Wire Format protocol, or error will cause the program to
halt (C<confess>) if the argument is not valid.

C<produce_request()> takes arguments. The following arguments are currently
recognized:

=over 3

=item C<$topic>

The C<$topic> must be a normal non-false string of non-zero length.

=item C<$partition>

The C<$partition> must be a non-negative integer (of any length).
That is, a positive integer, or zero.

=item C<$messages>

The C<$messages> is an arbitrary amount of data (a simple data string or
a reference to an array of the data strings).

=back

=head3 C<fetch_request( $topic, $partition, $offset, $max_size )>

Returns a binary FETCH request string coded according to
the Apache Kafka Wire Format protocol, or error will cause the program to
halt (C<confess>) if the argument is not valid.

C<fetch_request()> takes arguments. The following arguments are currently
recognized:

=over 3

=item C<$topic>

The C<$topic> must be a normal non-false string of non-zero length.

=item C<$partition>

The C<$partition> must be a non-negative integer (of any length).
That is, a positive integer, or zero.

=item C<$offset>

Offset in topic and partition to start from (64 bits).

The argument must be a non-negative integer (of any length).
That is, a positive integer, or zero. The argument may be a
L<Math::BigInt|Math::BigInt> integer on 32 bit system.

=item C<$max_size>

C<$max_number> is the maximum size of the message set to return. The argument
must be a positive integer (of any length).

=back

=head3 C<offsets_request( $topic, $partition, $time, $max_number )>

Returns a binary OFFSETS request string coded according to
the Apache Kafka Wire Format protocol, or error will cause the program to
halt (C<confess>) if the argument is not valid.

C<offsets_request()> takes arguments. The following arguments are currently
recognized:

=over 3

=item C<$topic>

The C<$topic> must be a normal non-false string of non-zero length.

=item C<$partition>

The C<$partition> must be a non-negative integer (of any length).
That is, a positive integer, or zero.

=item C<$time>

C<$time> is the timestamp of the offsets before this time - milliseconds since
UNIX Epoch.

The argument must be a positive number. That is, it is defined and Perl thinks
it's a number. The argument may be a L<Math::BigInt|Math::BigInt> integer on 32
bit system.

The special values -1 (latest), -2 (earliest) are allowed.

=item C<$max_number>

C<$max_number> is the maximum number of offsets to retrieve. The argument must
be a positive integer (of any length).

=back

=head3 C<offsets_response( $response )>

Decodes the argument and returns a reference to the hash representing
the structure of
the OFFSETS Response. Offsets are L<Math::BigInt|Math::BigInt> integers
on 32 bit system. Hash additionally comprises a pair of items C<{error}>
describing the possible error at line structure of the argument (now only
"Amount received offsets does not match 'NUMBER of OFFSETS'" possible). Error
will cause the program to halt (C<confess>) if the argument is not valid.

C<offsets_response()> takes arguments. The following arguments are currently
recognized:

=over 3

=item C<$response>

C<$response> is a reference to the OFFSETS Response buffer. The buffer
must be a non-empty string 6+ bytes long.

=back

=head3 C<fetch_response( $response )>

Decodes the argument and returns a reference to the hash representing
the structure of the FETCH Response. Error will cause the program to halt
(C<confess>) if the argument is not valid.

C<fetch_response()> takes arguments. The following arguments are currently
recognized:

=over 3

=item C<$response>

C<$response> is a reference to the FETCH Response buffer.
The buffer must be a non-empty string 6+ bytes long.

=back

=head3 C<last_errorcode>

This method returns an error code that specifies the position of the
description in the C<@Kafka::ERROR> array.  Analysing this information
can be done to determine the cause of the error.

The server or the resource might not be available, access to the resource
might be denied or other things might have failed for some reason.

=head3 C<last_error>

This method returns an error message that contains information about the
encountered failure.  Messages returned from this method may contain
additional details and do not comply with the C<Kafka::ERROR> array.

=head2 EXPORT

None by default.

It has an additional constants available for import, which can be used
to define the module functions, and to identify REQUEST types
(look at L</"SEE ALSO"> section):

=over 3

=item

0 - C<REQUESTTYPE_PRODUCE>

=item

1 - C<REQUESTTYPE_FETCH>

=item

2 - C<REQUESTTYPE_MULTIFETCH>

=item

3 - C<REQUESTTYPE_MULTIPRODUCE>

=item

4 - C<REQUESTTYPE_OFFSETS>

=back

=head1 DIAGNOSTICS

C<Kafka::Protocol> is not a user module and any L<functions|/FUNCTIONS> error
is FATAL.
FATAL errors will cause the program to halt (C<confess>), since the
problem is so severe that it would be dangerous to continue. (This can
always be trapped with C<eval>. Under the circumstances, dying is the best
thing to do).

=over 3

=item C<Mismatch argument>

This means that you didn't give the right argument to some of
L<functions|/FUNCTIONS>.

=item C<Invalid message>

This means that the array of messages contain a reference instead a simple data
string.

=back

For more error description, always look at the message from the L</last_error>
from the C<Kafka::Protocol::last_error> function.

=head1 SEE ALSO

The basic operation of the Kafka package modules:

L<Kafka|Kafka> - constants and messages used by the Kafka package modules

L<Kafka::IO|Kafka::IO> - object interface to socket communications with
the Apache Kafka server

L<Kafka::Producer|Kafka::Producer> - object interface to the producer client

L<Kafka::Consumer|Kafka::Consumer> - object interface to the consumer client

L<Kafka::Message|Kafka::Message> - object interface to the Kafka message
properties

L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
Apache Kafka's wire format

L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
protocol on 32 bit systems

L<Kafka::Mock|Kafka::Mock> - object interface to the TCP mock server for testing

A wealth of detail about the Apache Kafka and Wire Format:

Main page at L<http://incubator.apache.org/kafka/>

Wire Format at L<http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/>

Writing a Driver for Kafka at
L<http://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka>

=head1 AUTHOR

Sergey Gladkov, E<lt>sgladkov@trackingsoft.comE<gt>

=head1 CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Vlad Marchenko

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2012-2013 by TrackingSoft LLC.
All rights reserved.

This package is free software; you can redistribute it and/or modify it under
the same terms as Perl itself. See I<perlartistic> at
L<http://dev.perl.org/licenses/artistic.html>.

This program is
distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.

=cut