The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Kafka::Protocol;

=head1 NAME

Kafka::Protocol - Functions to process messages in the Apache Kafka protocol.

=head1 VERSION

This documentation refers to C<Kafka::Protocol> version 1.07 .


use 5.010;
use strict;
use warnings;

our $VERSION = '1.07';

use Exporter qw(
our @EXPORT_OK = qw(

use Compress::Snappy ();
use Compress::LZ4Frame ();
use Const::Fast;
use Gzip::Faster qw( gzip gunzip );
use Params::Util qw(
use Scalar::Util qw(
use String::CRC32;
use Try::Tiny;

use Kafka qw(
use Kafka::Exceptions;
use Kafka::Internals qw(


    use 5.010;
    use strict;
    use warnings;

    use Data::Compare;
    use Kafka qw(
    use Kafka::Internals qw(
    use Kafka::Protocol qw(

    # a encoded produce request hex stream
    my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' );

    # a decoded produce request
    my $decoded = {
        CorrelationId                       => 4,
        ClientId                            => q{},
        RequiredAcks                        => $WAIT_WRITTEN_TO_LOCAL_LOG,
        Timeout                             => $REQUEST_TIMEOUT * 100,  # ms
        topics                              => [
                TopicName                   => 'mytopic',
                partitions                  => [
                        Partition           => 0,
                        MessageSet              => [
                                Offset          => $PRODUCER_ANY_OFFSET,
                                MagicByte       => 0,
                                Attributes      => $COMPRESSION_NONE,
                                Key             => q{},
                                Value           => 'Hello!',

    my $encoded_request = encode_produce_request( $decoded );
    say 'encoded correctly' if $encoded_request eq $encoded;

    # a encoded produce response hex stream
    $encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' );

    # a decoded produce response
    $decoded = {
        CorrelationId                           => 4,
        topics                                  => [
                TopicName                       => 'mytopic',
                partitions                      => [
                        Partition               => 0,
                        ErrorCode               => $ERROR_NO_ERROR,
                        Offset                  => 0,

    my $decoded_response = decode_produce_response( \$encoded );
    say 'decoded correctly' if Compare( $decoded_response, $decoded );

    # more examples, see t/*_decode_encode.t


This module is not a user module.

In order to achieve better performance,
functions of this module do not perform arguments validation.

The main features of the C<Kafka::Protocol> module are:

=over 3

=item *

Supports parsing the Apache Kafka protocol.

=item *

Supports Apache Kafka Requests and Responses (PRODUCE and FETCH).
Within this package we currently support
access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses.

=item *

Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.



# A Guide To The Kafka Protocol 0.8:
# -- Protocol Primitive Types
# int8, int16, int32, int64
#     Signed integers
#     stored in big endian order.
# bytes, string
#     consist of a signed integer
#     giving a length N
#     followed by N bytes of content.
#     A length of -1 indicates null.
#     string uses an int16 for its size,
#     and bytes uses an int32.
# Arrays
#     These will always be encoded as an int32 size containing the length N
#     followed by N repetitions of the structure
#     which can itself be made up of other primitive types.
# -- N.B.
# - The response will always match the paired request
# - One structure common to both the produce and fetch requests is the message set format.
# - MessageSets are not preceded by an int32 like other array elements in the protocol.
# - A message set is also the unit of compression in Kafka,
#     and we allow messages to recursively contain compressed message sets.
# -- Protocol Fields
# ApiKey => int16                 That identifies the API being invoked
# ApiVersion => int16             This is a numeric version number for this api.
#                                Currently the supported version for all APIs is 0.
# Attributes => int8              Metadata attributes about the message.
#                                 The lowest 2 bits contain the compression codec used for the message.
# ClientId => string              This is a user supplied identifier for the client application.
# CorrelationId => int32          This is a user-supplied integer.
#                                 It will be passed back in the response by the server, unmodified.
#                                 It is useful for matching request and response between the client and server.
# Crc => int32                    The CRC32 of the remainder of the message bytes.
# ErrorCode => int16              The error from this partition, if any.
#                                 Errors are given on a per-partition basis
#                                     because a given partition may be unavailable or maintained on a different host,
#                                     while others may have successfully accepted the produce request.
# FetchOffset => int64            The offset to begin this fetch from.
# HighwaterMarkOffset => int64    The offset at the end of the log for this partition.
#                                 This can be used by the client to determine how many messages behind the end of the log they are.
#                                 - 0.8 documents: Replication design
#                                 The high watermark is the offset of the last committed message.
#                                 Each log is periodically synced to disks.
#                                 Data before the flushed offset is guaranteed to be persisted on disks.
#                                 As we will see, the flush offset can be before or after high watermark.
#                                 - 0.7 documents: Wire protocol
#                                 If the last segment file for the partition is not empty and was modified earlier than TIME,
#                                         it will return both the first offset for that segment and the high water mark.
#                                 The high water mark is not the offset of the last message,
#                                         but rather the offset that the next message sent to the partition will be written to.
# Host => string                  The brokers hostname
# Isr => [ReplicaId]              The set subset of the replicas that are "caught up" to the leader - a set of in-sync replicas (ISR)
# Key => bytes                    An optional message key
#                                 The key can be null.
# Leader => int32                 The node id for the kafka broker currently acting as leader for this partition.
#                                 If no leader exists because we are in the middle of a leader election this id will be -1.
# MagicByte => int8               A version id used to allow backwards compatible evolution of the message binary format.
# MaxBytes => int32               The maximum bytes to include in the message set for this partition.
# MaxNumberOfOffsets => int32     Kafka here is return up to 'MaxNumberOfOffsets' of offsets
# MaxWaitTime => int32            The maximum amount of time (ms)
#                                     to block waiting
#                                     if insufficient data is available at the time the request is issued.
# MessageSetSize => int32         The size in bytes of the message set for this partition
# MessageSize => int32            The size of the subsequent request or response message in bytes
# MinBytes => int32               The minimum number of bytes of messages that must be available to give a response.
#                                 If the client sets this to 0 the server will always respond immediately.
#                                 If this is set to 1,
#                                     the server will respond as soon
#                                     as at least one partition
#                                     has at least 1 byte of data
#                                     or the specified timeout occurs.
#                                 By setting higher values
#                                     in combination with the timeout
#                                     for reading only large chunks of data
# NodeId => int32                 The id of the broker.
#                                 This must be set to a unique integer for each broker.
# Offset => int64                 The offset used in kafka as the log sequence number.
#                                 When the producer is sending messages it doesn't actually know the offset
#                                     and can fill in any value here it likes.
# Partition => int32              The id of the partition the fetch is for
#                                     or the partition that data is being published to
#                                     or the partition this response entry corresponds to.
# Port => int32                   The brokers port
# ReplicaId => int32              Indicates the node id of the replica initiating this request.
#                                 Normal client consumers should always specify this as -1 as they have no node id.
# Replicas => [ReplicaId]         The set of alive nodes that currently acts as slaves for the leader for this partition.
# RequiredAcks => int16           Indicates how many acknowledgements the servers should receive
#                                     before responding to the request.
#                                 If it is 0 the server does not send any response.
#                                 If it is 1, the server will wait the data is written to the local log before sending a response.
#                                 If it is -1 the server will block until the message is committed by all in sync replicas before sending a response.
# Size => int32                   The size of the subsequent request or response message in bytes
# Time => int64                   Used to ask for all messages before a certain time (ms).
#                                 There are two special values.
#                                 Specify -1 to receive the latest offset (this will only ever return one offset).
#                                 Specify -2 to receive the earliest available offsets.
# Timeout => int32                This provides a maximum time (ms) the server can await the receipt
#                                     of the number of acknowledgements in RequiredAcks.
#                                 The timeout is not an exact limit on the request time for a few reasons:
#                                 (1) it does not include network latency,
#                                 (2) the timer begins at the beginning of the processing of this request
#                                     so if many requests are queued due to server overload
#                                     that wait time will not be included,
#                                 (3) we will not terminate a local write
#                                     so if the local write time exceeds this timeout it will not be respected.
#                                 To get a hard timeout of this type the client should use the socket timeout.
# TopicName => string             The name of the topic.
# Value => bytes                  The actual message contents
#                                 Kafka supports recursive messages in which case this may itself contain a message set.
#                                 The message can be null.

our $_int64_template;                           # Used to unpack a 64 bit value
if ( $BITS64 ) {
    $_int64_template    = q{q>};
    # unpack a big-endian signed quad (64-bit) value on 64 bit systems.
    *_unpack64          = sub { $_[0] };
    # pack a big-endian signed quad (64-bit) value on 64 bit systems.
    *_pack64            = sub { pack( q{q>}, $_[0] ) };
} else {
    eval q{ require Kafka::Int64; }                 ## no critic
        or die "Cannot load Kafka::Int64 : $@";

    $_int64_template    = q{a[8]};
    # unpack a big-endian signed quad (64-bit) value on 32 bit systems.
    *_unpack64          = \&Kafka::Int64::unpackq;
    # pack a big-endian signed quad (64-bit) value on 32 bit systems.
    *_pack64            = \&Kafka::Int64::packq;

=head2 EXPORT

The following constants are available for export



The default API version that will be used as fallback, if it's not possible to
detect what the Kafka server supports. Only Kafka servers > can be
queried to get which API version they implements. On Kafka servers 0.8.x and
0.9.x, the protocol will default to use $DEFAULT_APIVERSION. Currently its
value is '0'


const our $DEFAULT_APIVERSION                   => 0;

# HashRef, keys are APIKEYs, values are the api version that this protocol
# implements. If not populated, the connection will use the $DEFAULT_APIVERSION

# Attributes

# According to Apache Kafka documentation:
# Attributes - Metadata attributes about the message.
# The lowest 2 bits contain the compression codec used for the message.
const our $COMPRESSION_CODEC_MASK       => 0b11;


According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.'

const our $CONSUMERS_REPLICAID          => -1;


According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.'

const our $NULL_BYTES_LENGTH            => -1;

=head3 C<$BAD_OFFSET>

According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset
and can fill in any value here it likes.'

const our $BAD_OFFSET                   => -1;

my ( $_Request_header_template,             $_Request_header_length ) = (
    q{l>s>s>l>s>},          # Size
                            # 2 ApiKey
                            # 2 ApiVersion
                            # 4 CorrelationId
                            # 2 ClientId length
    10                      # 'Size' is not included in the calculation of length
my ( $_ProduceRequest_header_template,      $_ProduceRequest_header_length ) = (
    q{s>l>l>},              # 2 RequiredAcks
                            # 4 Timeout
                            # 4 topics array size
my ( $_MessageSet_template,                 $_MessageSet_length ) = (
    # qq{${_int64_template}l>},
                            # 8 Offset
                            # 4 MessageSize
my ( $_FetchRequest_header_template,        $_FetchRequest_header_length ) = (
                            # 4 ReplicaId
                            # 4 MaxWaitTime
                            # 4 MinBytes
                            # 4 topics array size
my ( $_FetchRequest_header_template_v3,     $_FetchRequest_header_length_v3 ) = (
                            # 4 ReplicaId
                            # 4 MaxWaitTime
                            # 4 MinBytes
                            # 4 MaxBytes
                            # 4 topics array size
my ( $_FetchRequest_body_template,          $_FetchRequest_body_length ) = (
#    qq{l>${_int64_template}l>},
                            # 4 Partition
                            # 8 FetchOffset
                            # 4 MaxBytes
my ( $_OffsetRequest_header_template,       $_OffsetRequest_header_length ) = (
    q{l>l>},                # 4 ReplicaId
                            # 4 topics array size
my ( $_OffsetRequest_body_template,         $_OffsetRequest_body_length ) = (
#    qq{l>${_int64_template}l>},
                            # 4 Partition
                            # 8 Time
                            # 4 MaxNumberOfOffsets
my ( $_OffsetRequest_body_template_v1,      $_OffsetRequest_body_length_v1 ) = (
#    qq{l>${_int64_template}},
                          # 4 Partition
                          # 8 Time
my ( $_FetchResponse_header_template,       $_FetchResponse_header_length ) = (
    q{x[l]l>l>},            # Size (skip)
                            # 4 CorrelationId
                            # 4 topics array size
my ( $_FetchResponse_header_template_v1,       $_FetchResponse_header_length_v1 ) = (
    q{x[l]l>l>l>},          # Size (skip)
                            # 4 CorrelationId
                            # 4 throttle_time_ms
                            # 4 topics array size
my ( $_Message_template,                    $_Message_length ) = (
                            # 8 Offset
                            # MessageSize
                            # Crc
                            # MagicByte
                            # Attributes
                            # Key length
    8                       # Only Offset length
my ( $_Message_template_with_timestamp,     $_Message_length_with_timestamp ) = (
                            # 8 Offset
                            # MessageSize
                            # Crc
                            # MagicByte
                            # Attributes
                            # Timestamp
                            # Key length
    8                       # Only Offset length

my ( $_FetchResponse_topic_body_template,   $_FetchResponse_topic_body_length )= (
                            # TopicName
                            # partitions array size
                            # 4 Partition
                            # 2 ErrorCode
                            # 8 HighwaterMarkOffset
    14                      # without TopicName and partitions array size
my $_Key_or_Value_template = q{X[l]l>/a};   # Key or Value

#-- public functions -----------------------------------------------------------


The following functions are available for C<Kafka::MockProtocol> module.


=head3 C<encode_api_versions_request( $ApiVersions_Request )>

Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.

This function takes the following arguments:

=over 3

=item C<$ApiVersions_Request>

C<$ApiVersions_Request> is a reference to the hash representing the structure
of the APIVERSIONS Request. it contains CorrelationId, ClientId (can be empty
string), and ApiVersion (must be 0)



sub encode_api_versions_request {
    my ( $ApiVersions_Request ) = @_;

    my @data;
    my $request = {
                                                # template    => '...',
                                                # len         => ...,
        data        => \@data,

    _encode_request_header( $request, $APIKEY_APIVERSIONS, $ApiVersions_Request );
                                                                            # Size
                                                                            # ApiKey
                                                                            # ApiVersion
                                                                            # CorrelationId
                                                                            # ClientId

    return pack( $request->{template}, $request->{len}, @data );

my $_decode_api_version_response_template = q{x[l]l>s>l>X[l]l>/(s>s>s>)};
                                           # x[l]    # Size (skip)
                                           # l>      # CorrelationId
                                           # s>      # ErrorCode
                                           # l>      # ApiVersions array size
                                           # X[l]
                                           # l>/(    # ApiVersions array
                                           #     s>     # ApiKey
                                           #     s>     # MinVersion
                                           #     s>     # MaxVersion
                                           # )

=head3 C<decode_api_versions_response( $bin_stream_ref )>

Decodes the argument and returns a reference to the hash representing
the structure of the APIVERSIONS Response.

This function takes the following arguments:

=over 3

=item C<$bin_stream_ref>

C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.



sub decode_api_versions_response {
    my ( $bin_stream_ref ) = @_;

    my @data = unpack( $_decode_api_version_response_template, $$bin_stream_ref );

    my $i = 0;
    my $ApiVersions_Response = {};

    $ApiVersions_Response->{CorrelationId}                        =  $data[ $i++ ];   # CorrelationId
    $ApiVersions_Response->{ErrorCode}                            =  $data[ $i++ ];   # ErrorCode

    my $ApiVersions_array = $ApiVersions_Response->{ApiVersions}  =  [];
    my $ApiVersions_array_size                                    =  $data[ $i++ ];   # ApiVersions array size
    while ( $ApiVersions_array_size-- ) {
        push( @$ApiVersions_array, {
            ApiKey                                                => $data[ $i++ ],   # ApiKey
            MinVersion                                            => $data[ $i++ ],   # MinVersion
            MaxVersion                                            => $data[ $i++ ],   # MaxVersion

    return $ApiVersions_Response;

=head3 C<encode_find_coordinator_request( $FindCoordinator_Request )>

Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.

This function takes the following arguments:

=over 3

=item C<$FindCoordinator_Request>

C<$FindCoordinator_Request> is a reference to the hash representing the structure
of the FINDCOORDINATOR Request. it contains CorrelationId, ClientId (can be empty
string), CoordinatorKey and CoordinatorType (for version 1 of protocol)



sub encode_find_coordinator_request {
    my ( $FindCoordinator_Request ) = @_;

    my @data;
    my $request = {
                                                # template    => '...',
                                                # len         => ...,
        data        => \@data,

    my $api_version =
      _encode_request_header( $request, $APIKEY_FINDCOORDINATOR, $FindCoordinator_Request );
                                                                            # Size
                                                                            # ApiKey
                                                                            # ApiVersion
                                                                            # CorrelationId
                                                                            # ClientId

    my $coordinator_key = $FindCoordinator_Request->{CoordinatorKey};
    my $coordinator_type = $FindCoordinator_Request->{CoordinatorType};

    $request->{template}    .= q{s>};              # string length
    $request->{len}         += 2;
    _encode_string( $request, $coordinator_key );  # CoordinatorKey (GroupId for version 0)
    if ($api_version >= 1) {
        # CoordinatorType (0 for groups)
        $request->{template}    .= q{c>};
        $request->{len}         += 1;
        push( @{ $request->{data} }, $coordinator_type );

    return pack( $request->{template}, $request->{len}, @data );

my $_decode_find_protocol_response_template = q{x[l]l>s>l>s>/al>};
                                           # x[l]    # Size (skip)
                                           # l>      # CorrelationId
                                           # s>      # ErrorCode
                                           #     l>                      # NodeId
                                           #     s>/a                    # Host
                                           #     l>                      # Port

my $_decode_find_protocol_response_template_v1 = q{x[l]l>l>s>l>s>/al>};
                                           # x[l]    # Size (skip)
                                           # l>      # CorrelationId
                                           # l>      # throttle_time_ms
                                           # s>      # ErrorCode
                                           # s>/a    # ErrorMessage
                                           #     l>                      # NodeId
                                           #     s>/a                    # Host
                                           #     l>                      # Port

=head3 C<decode_find_coordinator_response( $bin_stream_ref )>

Decodes the argument and returns a reference to the hash representing
the structure of the FINDCOORDINATOR Response.

This function takes the following arguments:

=over 3

=item C<$bin_stream_ref>

C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.



sub decode_find_coordinator_response {
    my ( $bin_stream_ref, $api_version ) = @_;

   $api_version //= $DEFAULT_APIVERSION;
    my $is_v1 = $api_version == 1;

    my @data = unpack(   $is_v1 ? $_decode_find_protocol_response_template_v1
                       :          $_decode_find_protocol_response_template, $$bin_stream_ref );

    my $i = 0;
    my $FindCoordinator_Response = {};

    $FindCoordinator_Response->{CorrelationId}  =  $data[ $i++ ];
    if ($is_v1) {
        $FindCoordinator_Response->{ThrottleTimeMs}       =  $data[ $i++ ]; # only v1
   $FindCoordinator_Response->{ErrorCode}       =  $data[ $i++ ];
    if ($is_v1) {
        $FindCoordinator_Response->{ErrorMessage}         =  $data[ $i++ ]; # only v1
    $FindCoordinator_Response->{NodeId}         =  $data[ $i++ ];
    $FindCoordinator_Response->{Host}           =  $data[ $i++ ];
    $FindCoordinator_Response->{Port}           =  $data[ $i++ ];

    return $FindCoordinator_Response;

# PRODUCE Request --------------------------------------------------------------

=head3 C<encode_produce_request( $Produce_Request, $compression_codec )>

Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.

This function takes the following arguments:

=over 3

=item C<$Produce_Request>

C<$Produce_Request> is a reference to the hash representing
the structure of the PRODUCE Request (examples see C<t/*_decode_encode.t>).

=item C<$compression_codec>


C<$compression_codec> sets the required type of C<$messages> compression,
if the compression is desirable.

Supported codecs:

NOTE: $COMPRESSION_LZ4 requires Kafka 0.10 or higher, as initial implementation of LZ4 in Kafka did not follow the standard LZ4 framing specification.



sub encode_produce_request {
    my ( $Produce_Request, $compression_codec ) = @_;

    my @data;
    my $request = {
                                                # template    => '...',
                                                # len         => ...,
        data        => \@data,

    _encode_request_header( $request, $APIKEY_PRODUCE, $Produce_Request );
                                                                            # Size
                                                                            # ApiKey
                                                                            # ApiVersion
                                                                            # CorrelationId
                                                                            # ClientId

    my $topics_array = $Produce_Request->{topics};
    push( @data,
        $Produce_Request->{RequiredAcks},                                   # RequiredAcks
        $Produce_Request->{Timeout},                                        # Timeout
        scalar( @$topics_array ),                                           # topics array size
    $request->{template}    .= $_ProduceRequest_header_template;
    $request->{len}         += $_ProduceRequest_header_length;

    foreach my $topic ( @$topics_array ) {
        $request->{template}    .= q{s>};                                   # string length
        $request->{len}         += 2;
        _encode_string( $request, $topic->{TopicName} );                    # TopicName

        my $partitions_array = $topic->{partitions};
        push( @data, scalar( @$partitions_array ) );
        $request->{template}    .= q{l>};                                   # partitions array size
        $request->{len}         += 4;
        foreach my $partition ( @$partitions_array ) {
            push( @data, $partition->{Partition} );
            $request->{template}    .= q{l>};                               # Partition
            $request->{len}         += 4;

            _encode_MessageSet_array( $request, $partition->{MessageSet}, $compression_codec );

    return pack( $request->{template}, $request->{len}, @data );

# PRODUCE Response -------------------------------------------------------------

my $_decode_produce_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}))};
                                        # x[l]                    # Size (skip)
                                        # l>                      # CorrelationId

                                        # l>                      # topics array size
                                        # X[l]
                                        # l>/(                    # topics array
                                        #     s>/a                    # TopicName

                                        #     l>                      # partitions array size
                                        #     X[l]
                                        #     l>/(                    # partitions array
                                        #         l>                      # Partition
                                        #         s>                      # ErrorCode
                                        #         $_int64_template        # Offset
                                        #     )
                                        # )

my $_decode_produce_response_template_v1 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}))l>};
                                        # x[l]                    # Size (skip)
                                        # l>                      # CorrelationId

                                        # l>                      # topics array size
                                        # X[l]
                                        # l>/(                    # topics array
                                        #     s>/a                    # TopicName

                                        #     l>                      # partitions array size
                                        #     X[l]
                                        #     l>/(                    # partitions array
                                        #         l>                      # Partition
                                        #         s>                      # ErrorCode
                                        #         $_int64_template        # Offset
                                        #     )
                                        # )
                                        # l>                      # Throttle_Time_Ms

my $_decode_produce_response_template_v2 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}${_int64_template}))l>};
                                        # x[l]                    # Size (skip)
                                        # l>                      # CorrelationId

                                        # l>                      # topics array size
                                        # X[l]
                                        # l>/(                    # topics array
                                        #     s>/a                    # TopicName

                                        #     l>                      # partitions array size
                                        #     X[l]
                                        #     l>/(                    # partitions array
                                        #         l>                      # Partition
                                        #         s>                      # ErrorCode
                                        #         $_int64_template        # Offset
                                        #         $_int64_template        # Log_Append_Time
                                        #     )
                                        # )
                                        # l>                      # Throttle_Time_Ms

=head3 C<decode_produce_response( $bin_stream_ref )>

Decodes the argument and returns a reference to the hash representing
the structure of the PRODUCE Response (examples see C<t/*_decode_encode.t>).

This function takes the following arguments:

=over 3

=item C<$bin_stream_ref>

C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.


sub decode_produce_response {
    my ( $bin_stream_ref, $api_version ) = @_;

    $api_version //= $DEFAULT_APIVERSION;
    my $is_v1 = $api_version == 1;
    my $is_v2 = $api_version == 2;

    my @data = unpack(   $is_v1 ? $_decode_produce_response_template_v1
                       : $is_v2 ? $_decode_produce_response_template_v2
                       :          $_decode_produce_response_template, $$bin_stream_ref );

    my $i = 0;
    my $Produce_Response = {};

    $Produce_Response->{CorrelationId}              =  $data[ $i++ ];   # CorrelationId

    my $topics_array = $Produce_Response->{topics}  =  [];
    my $topics_array_size                           =  $data[ $i++ ];   # topics array size
    while ( $topics_array_size-- ) {
        my $topic = {
            TopicName                               => $data[ $i++ ],

        my $partitions_array = $topic->{partitions} =  [];
        my $partitions_array_size                   =  $data[ $i++ ];   # partitions array size
        while ( $partitions_array_size-- ) {
            my $partition = {
                Partition                           => $data[ $i++ ],          # Partition
                ErrorCode                           => $data[ $i++ ],          # ErrorCode
                Offset                   => _unpack64( $data[ $i++ ] ),        # Offset
     ( $is_v2 ? (Log_Append_Time         => _unpack64( $data[ $i++ ] )) : () ), # Log_Append_Time

            push( @$partitions_array, $partition );

        push( @$topics_array, $topic );
    defined $data[ $i ]
      and $Produce_Response->{Throttle_Time_Ms} = $data[ $i++ ];

    return $Produce_Response;

# FETCH Request ----------------------------------------------------------------

=head3 C<encode_fetch_request( $Fetch_Request )>

Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.

This function takes the following arguments:

=over 3

=item C<$Fetch_Request>

C<$Fetch_Request> is a reference to the hash representing
the structure of the FETCH Request (examples see C<t/*_decode_encode.t>).



# version 0, 1, 2 have the same request protocol.
sub encode_fetch_request {
    my ( $Fetch_Request ) = @_;

    my @data;
    my $request = {
                                                # template    => '...',
                                                # len         => ...,
        data        => \@data,
    my $api_version = _encode_request_header( $request, $APIKEY_FETCH, $Fetch_Request );
                                                                            # Size
                                                                            # ApiKey
                                                                            # ApiVersion
                                                                            # CorrelationId
                                                                            # ClientId
    my $is_v3 = $api_version == 3;

    push( @data, $CONSUMERS_REPLICAID );                                    # ReplicaId
    my $topics_array = $Fetch_Request->{topics};
    push( @data,
        $Fetch_Request->{MaxWaitTime},                                      # MaxWaitTime
        $Fetch_Request->{MinBytes},                                         # MinBytes
        ( $is_v3 ? $Fetch_Request->{MaxBytes} : () ),                       # MaxBytes (version 3 only)
        scalar( @$topics_array ),                                           # topics array size
    if ($is_v3) {
        $request->{template}    .= $_FetchRequest_header_template_v3;
        $request->{len}         += $_FetchRequest_header_length_v3;
    } else {
        $request->{template}    .= $_FetchRequest_header_template;
        $request->{len}         += $_FetchRequest_header_length;

    foreach my $topic ( @$topics_array ) {
        $request->{template}    .= q{s>};                                   # string length
        $request->{len}         += 2;
        _encode_string( $request, $topic->{TopicName} );                    # TopicName

        my $partitions_array = $topic->{partitions};
        push( @data, scalar( @$partitions_array ) );
        $request->{template}    .= q{l>};                                   # partitions array size
        $request->{len}         += 4;
        foreach my $partition ( @$partitions_array ) {
            push( @data,
                $partition->{Partition},                                    # Partition
                _pack64( $partition->{FetchOffset} ),                       # FetchOffset
                $partition->{MaxBytes},                                     # MaxBytes
            $request->{template}    .= $_FetchRequest_body_template;
            $request->{len}         += $_FetchRequest_body_length;

    return pack( $request->{template}, $request->{len}, @data );

# FETCH Response ---------------------------------------------------------------

=head3 C<decode_fetch_response( $bin_stream_ref )>

Decodes the argument and returns a reference to the hash representing
the structure of the FETCH Response (examples see C<t/*_decode_encode.t>).

This function takes the following arguments:

=over 3

=item C<$bin_stream_ref>

C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.


sub decode_fetch_response {
    my ( $bin_stream_ref, $api_version ) = @_;

    $api_version //= $DEFAULT_APIVERSION;
    my $is_v1 = $api_version == 1;
    my $is_v2 = $api_version == 2;
    my $is_v3 = $api_version == 3;

# According to Apache Kafka documentation:
# As an optimization the server is allowed to return a partial message at the end of the message set.
# Clients should handle this case.
# NOTE: look inside _decode_MessageSet_template and _decode_MessageSet_array

    my @data;
    my $response = {
                                                # template      => '...',
                                                # stream_offset => ...,
        data        => \@data,
        bin_stream  => $bin_stream_ref,

    _decode_fetch_response_template( $response, $api_version );
    @data = unpack( $response->{template}, $$bin_stream_ref );

    my $i = 0;
    my $Fetch_Response = {};

    $Fetch_Response->{CorrelationId}                        =  $data[ $i++ ];   # CorrelationId
    if ($is_v1 || $is_v2 || $is_v3) {
        $Fetch_Response->{ThrottleTimeMs}                   =  $data[ $i++ ];   # ( ThrottleTimeMs ) only v1 and above

    my $topics_array = $Fetch_Response->{topics}            =  [];
    my $topics_array_size                                   =  $data[ $i++ ];   # topics array size
    while ( $topics_array_size-- ) {
        my $topic = {
            TopicName                                       => $ data[ $i++ ],  # TopicName

        my $partitions_array = $topic->{partitions}         =  [];
        my $partitions_array_size                           =  $data[ $i++ ];   # partitions array size
        my ( $MessageSetSize, $MessageSet_array );
        while ( $partitions_array_size-- ) {
            my $partition = {
                Partition                                   => $data[ $i++ ],   # Partition
                ErrorCode                                   => $data[ $i++ ],   # ErrorCode
                HighwaterMarkOffset              => _unpack64( $data[ $i++ ] ), # HighwaterMarkOffset

            $MessageSetSize                                 =  $data[ $i++ ];   # MessageSetSize
            $MessageSet_array = $partition->{MessageSet}    =  [];

            _decode_MessageSet_array( $response, $MessageSetSize, \$i, $MessageSet_array );

            push( @$partitions_array, $partition );

        push( @$topics_array, $topic );

    return $Fetch_Response;

# OFFSET Request ---------------------------------------------------------------

=head3 C<encode_offset_request( $Offset_Request )>

Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.

This function takes the following arguments:

=over 3

=item C<$Offset_Request>

C<$Offset_Request> is a reference to the hash representing
the structure of the OFFSET Request (examples see C<t/*_decode_encode.t>).


sub encode_offset_request {
    my ( $Offset_Request ) = @_;

    my @data;
    my $request = {
                                                # template    => '...',
                                                # len         => ...,
        data        => \@data,
    my $api_version = _encode_request_header( $request, $APIKEY_OFFSET, $Offset_Request );
                                                                            # Size
                                                                            # ApiKey
                                                                            # ApiVersion
                                                                            # CorrelationId
                                                                            # ClientId

    my $is_v1 = $api_version == 1;

    my $topics_array = $Offset_Request->{topics};
    push( @data,
        $CONSUMERS_REPLICAID,                                               # ReplicaId
        scalar( @$topics_array ),                                           # topics array size
    $request->{template}    .= $_OffsetRequest_header_template;
    $request->{len}         += $_OffsetRequest_header_length;

    my $body_template = $is_v1 ? $_OffsetRequest_body_template_v1 : $_OffsetRequest_body_template;
    my $body_length   = $is_v1 ? $_OffsetRequest_body_length_v1   : $_OffsetRequest_body_length;
    foreach my $topic ( @$topics_array ) {
        $request->{template}    .= q{s>};                                   # string length
        $request->{len}          += 2;
        _encode_string( $request, $topic->{TopicName} );                    # TopicName

        my $partitions_array = $topic->{partitions};
        push( @data, scalar( @$partitions_array ) );
        $request->{template}    .= q{l>};                                   # partitions array size
        $request->{len}         += 4;   # [l] partitions array size
        foreach my $partition ( @$partitions_array ) {
            push( @data,
                $partition->{Partition},                                    # Partition
                _pack64( $partition->{Time} ),                              # Time
                $is_v1 ? () : $partition->{MaxNumberOfOffsets},             # MaxNumberOfOffsets
            $request->{template}    .= $body_template;
            $request->{len}         += $body_length;

    # say STDERR $request->{template};
    # say STDERR HexDump($_) foreach @data; use Data::HexDump;
    return pack( $request->{template}, $request->{len}, @data );

# OFFSET Response --------------------------------------------------------------

my $_decode_offset_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>l>X[l]l>/(${_int64_template})))};
                                        # x[l]                    # Size (skip)
                                        # l>                      # CorrelationId

                                        # l>                      # topics array size
                                        # X[l]
                                        # l>/(                    # topics array
                                        #     s>/a                    # TopicName

                                        #     l>                      # PartitionOffsets array size
                                        #     X[l]
                                        #     l>/(                    # PartitionOffsets array
                                        #         l>                      # Partition
                                        #         s>                      # ErrorCode

                                        #         l>                      # Offset array size
                                        #         X[l]
                                        #         l>/(                    # Offset array
                                        #             $_int64_template        # Offset
                                        #         )
                                        #     )
                                        # )

my $_decode_offset_response_template_v1 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}${_int64_template}))};
                                        # x[l]                    # Size (skip)
                                        # l>                      # CorrelationId

                                        # l>                      # topics array size
                                        # X[l]
                                        # l>/(                    # topics array
                                        #     s>/a                    # TopicName

                                        #     l>                      # PartitionOffsets array size
                                        #     X[l]
                                        #     l>/(                    # PartitionOffsets array
                                        #         l>                      # Partition
                                        #         s>                      # ErrorCode
                                        #         $_int64_template        # Timestamp
                                        #         $_int64_template        # Offset
                                        #     )
                                        # )

=head3 C<decode_offset_response( $bin_stream_ref )>

Decodes the argument and returns a reference to the hash representing
the structure of the OFFSET Response (examples see C<t/*_decode_encode.t>).

This function takes the following arguments:

=over 3

=item C<$bin_stream_ref>

C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.


sub decode_offset_response {
    my ( $bin_stream_ref, $api_version ) = @_;

    $api_version //= $DEFAULT_APIVERSION;
    my $is_v1 = $api_version == 1;
    my $template = $is_v1 ? $_decode_offset_response_template_v1 : $_decode_offset_response_template;

    my @data = unpack( $template, $$bin_stream_ref );

    my $i = 0;
    my $Offset_Response = { };

    $Offset_Response->{CorrelationId}                           =  $data[ $i++ ];   # CorrelationId

    my $topics_array = $Offset_Response->{topics}               =  [];
    my $topics_array_size                                       =  $data[ $i++ ];   # topics array size
    while ( $topics_array_size-- ) {
        my $topic = {
            TopicName                                           => $data[ $i++ ],   # TopicName

        my $PartitionOffsets_array = $topic->{PartitionOffsets} =  [];
        my $PartitionOffsets_array_size                         =  $data[ $i++ ];   # PartitionOffsets array size
        my ( $PartitionOffset, $Offset_array, $Offset_array_size );
        while ( $PartitionOffsets_array_size-- ) {
            $PartitionOffset = {
                Partition                                       => $data[ $i++ ],   # Partition
                ErrorCode                                       => $data[ $i++ ],   # ErrorCode
            if ($is_v1) {
                $PartitionOffset->{Timestamp}           = _unpack64($data[ $i++ ]); # Timestamp (v1 only)
                $PartitionOffset->{Offset}              = _unpack64($data[ $i++ ]); # Offset (v1 only)

            } else {
                $Offset_array = $PartitionOffset->{Offset}      =  [];
                $Offset_array_size                              =  $data[ $i++ ];   # Offset array size
                while ( $Offset_array_size-- ) {
                    push( @$Offset_array,                   _unpack64( $data[ $i++ ] ) );   # Offset

            push( @$PartitionOffsets_array, $PartitionOffset );

        push( @$topics_array, $topic );

    return $Offset_Response;

# METADATA Request -------------------------------------------------------------

=head3 C<encode_metadata_request( $Metadata_Request )>

Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.

This function takes the following arguments:

=over 3

=item C<$Metadata_Request>

C<$Metadata_Request> is a reference to the hash representing
the structure of the METADATA Request (examples see C<t/*_decode_encode.t>).


sub encode_metadata_request {
    my ( $Metadata_Request ) = @_;

    my @data;
    my $request = {
                                                # template    => '...',
                                                # len         => ...,
        data        => \@data,

    _encode_request_header( $request, $APIKEY_METADATA, $Metadata_Request );
                                                                            # Size
                                                                            # ApiKey
                                                                            # ApiVersion
                                                                            # CorrelationId
                                                                            # ClientId

    my $topics_array = $Metadata_Request->{topics};
    push( @data, scalar( @$topics_array ) );                                # topics array size
    $request->{template}    .= q{l>};
    $request->{len}         += 4;

    foreach my $topic ( @$topics_array ) {
        $request->{template}    .= q{s>};                                   # string length
        $request->{len}         += 2;
        _encode_string( $request, $topic );                                 # TopicName

    return pack( $request->{template}, $request->{len}, @data );

# METADATA Response ------------------------------------------------------------

my $_decode_metadata_response_template = q{x[l]l>l>X[l]l>/(l>s>/al>)l>X[l]l>/(s>s>/al>X[l]l>/(s>l>l>l>X[l]l>/(l>)l>X[l]l>/(l>)))};
                                        # x[l]                    # Size (skip)
                                        # l>                      # CorrelationId

                                        # l>                      # Broker array size
                                        # X[l]
                                        # l>/(                    # Broker array
                                        #     l>                      # NodeId
                                        #     s>/a                    # Host
                                        #     l>                      # Port
                                        # )

                                        # l>                      # TopicMetadata array size
                                        # X[l]
                                        # l>/(                    # TopicMetadata array
                                        #     s>                      # ErrorCode
                                        #     s>/a                    # TopicName

                                        #     l>                      # PartitionMetadata array size
                                        #     X[l]
                                        #     l>/(                    # PartitionMetadata array
                                        #         s>                      # ErrorCode
                                        #         l>                      # Partition
                                        #         l>                      # Leader

                                        #         l>                      # Replicas array size
                                        #         X[l]
                                        #         l>/(                    # Replicas array
                                        #             l>                      # ReplicaId
                                        #         )

                                        #         l>                      # Isr array size
                                        #         X[l]
                                        #         l>/(                    # Isr array
                                        #             l>                      # ReplicaId
                                        #         )
                                        #     )
                                        # )

=head3 C<decode_metadata_response( $bin_stream_ref )>

Decodes the argument and returns a reference to the hash representing
the structure of the METADATA Response (examples see C<t/*_decode_encode.t>).

This function takes the following arguments:

=over 3

=item C<$bin_stream_ref>

C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.


sub decode_metadata_response {
    my ( $bin_stream_ref ) = @_;

    my @data = unpack( $_decode_metadata_response_template, $$bin_stream_ref );

    my $i = 0;
    my $Metadata_Response = {};

    $Metadata_Response->{CorrelationId}                           =  $data[ $i++ ];   # CorrelationId

    my $Broker_array = $Metadata_Response->{Broker}               =  [];
    my $Broker_array_size                                         =  $data[ $i++ ];   # Broker array size
    while ( $Broker_array_size-- ) {
        push( @$Broker_array, {
            NodeId                                                => $data[ $i++ ],   # NodeId
            Host                                                  => $data[ $i++ ],   # Host
            Port                                                  => $data[ $i++ ],   # Port

    my $TopicMetadata_array = $Metadata_Response->{TopicMetadata} =  [];
    my $TopicMetadata_array_size                                  =  $data[ $i++ ];   # TopicMetadata array size
    while ( $TopicMetadata_array_size-- ) {
        my $TopicMetadata = {
            ErrorCode                                             => $data[ $i++ ],   # ErrorCode
            TopicName                                             => $data[ $i++ ],   # TopicName

        my $PartitionMetadata_array = $TopicMetadata->{PartitionMetadata} =  [];
        my $PartitionMetadata_array_size                          =  $data[ $i++ ];   # PartitionMetadata array size
        while ( $PartitionMetadata_array_size-- ) {
            my $PartitionMetadata = {
                ErrorCode                                         => $data[ $i++ ],   # ErrorCode
                Partition                                         => $data[ $i++ ],   # Partition
                Leader                                            => $data[ $i++ ],   # Leader

            my $Replicas_array = $PartitionMetadata->{Replicas}   =  [];
            my $Replicas_array_size                               =  $data[ $i++ ];   # Replicas array size
            while ( $Replicas_array_size-- ) {
                push( @$Replicas_array,                              $data[ $i++ ] ); # ReplicaId

            my $Isr_array = $PartitionMetadata->{Isr}             =  [];
            my $Isr_array_size                                    =  $data[ $i++ ];   # Isr array size
            while ( $Isr_array_size-- ) {
                push( @$Isr_array,                                   $data[ $i++ ] ); # ReplicaId

            push( @$PartitionMetadata_array, $PartitionMetadata );

        push( @$TopicMetadata_array, $TopicMetadata );

    return $Metadata_Response;

# OffsetCommit Request -------------------------------------------------------------

=head3 C<encode_offsetcommit_request( $OffsetCommit_Request )>

Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.

This function takes the following arguments:

=over 3

=item C<$OffsetCommit_Request>

C<$OffsetCommit_Request> is a reference to the hash representing
the structure of the OffsetCommit Request (examples see C<t/*_decode_encode.t>).


sub encode_offsetcommit_request {
    my ( $OffsetCommit_Request ) = @_;

    my @data;
    my $request = {
                                                # template    => '...',
                                                # len         => ...,
        data        => \@data,

    my $api_version = _encode_request_header( $request, $APIKEY_OFFSETCOMMIT, $OffsetCommit_Request );
                                                                            # Size
                                                                            # ApiKey
                                                                            # ApiVersion
                                                                            # CorrelationId
                                                                            # ClientId

    my $is_v1 = $api_version == 1;

    $request->{template}    .= q{s>};
    $request->{len}         += 2;
    _encode_string( $request, $OffsetCommit_Request->{GroupId} );           # GroupId

    if ($is_v1) {
        $request->{template}    .= q{l>};                                   # GroupGenerationId
        $request->{len}         += 4;
        ### WARNING TODO: we don't support consumer groups properly, so we set
        ### generation id to -1 and member id null (see
        ### )
        push( @data, -1 );

        $request->{template}    .= q{s>};                                   # string length
        $request->{len}         += 2;
        _encode_string( $request, '' );                                     # MemberId

    my $topics_array = $OffsetCommit_Request->{topics};
    push( @data, scalar( @$topics_array ) );                                # topics array size
    $request->{template}    .= q{l>};
    $request->{len}         += 4;

    foreach my $topic ( @$topics_array ) {
        $request->{template}    .= q{s>};
        $request->{len}         += 2;
        _encode_string( $request, $topic->{TopicName} );                    # TopicName

        my $partitions_array = $topic->{partitions};
        push( @data, scalar( @$partitions_array ) );                        # partitions array size
        $request->{template}    .= q{l>};
        $request->{len}         += 4;

        foreach my $partition ( @$partitions_array ){
            push( @data,
                $partition->{Partition},                                    # Partition
                $partition->{Offset},                                       # Offset
            $request->{template}    .= qq{l>${_int64_template}};
            $request->{len}         += 12;

            if ($is_v1) {                                                   # timestamp
                ### WARNING TODO: currently we hardcode "now"
                push( @data, time() * 1000);
                $request->{template}    .= qq{${_int64_template}};
                $request->{len}         += 8;

            $request->{template}    .= q{s>};
            $request->{len}         += 2;
            _encode_string( $request, $partition->{Metadata} ),             # Metadata
    return pack( $request->{template}, $request->{len}, @data );

# OFFSETCOMMIT Response --------------------------------------------------------------

my $_decode_offsetcommit_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>))};
                                             # x[l]                          # Size (skip)
                                             # l>                            # CorrelationId
                                             # l>                            # topics array size
                                             # X[l]
                                             # l>/(                          # topics array
                                             #    s>/a                       # TopicName
                                             #    l>                         # partitions array size
                                             #    X[l]
                                             #    l>/(                       # partitions array
                                             #       l>                      # Partition
                                             #       s>                      # ErrorCode
                                             #    )
                                             # )

=head3 C<decode_offsetcommit_response( $bin_stream_ref )>

Decodes the argument and returns a reference to the hash representing
the structure of the OFFSETCOMMIT Response (examples see C<t/*_decode_encode.t>).

This function takes the following arguments:

=over 3

=item C<$bin_stream_ref>

C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.


sub decode_offsetcommit_response {
    my ( $bin_stream_ref ) = @_;

    my @data = unpack( $_decode_offsetcommit_response_template, $$bin_stream_ref );

    my $i = 0;
    my $OffsetCommit_Response = {};

    $OffsetCommit_Response->{CorrelationId}                     =  $data[ $i++ ];   #CorrelationId

    my $topics_array = $OffsetCommit_Response->{topics}         =  [];
    my $topics_array_size                                       =  $data[ $i++ ];   # topics array size
    while ( $topics_array_size-- ) {
        my $topic = {
            TopicName                                           => $data[ $i++ ],   # TopicName

        my $Partition_array = $topic->{partitions} =  [];
        my $Partition_array_size                                =  $data[ $i++ ];   # Partitions array size
        while ( $Partition_array_size-- ) {
            my $Partition = {
                Partition                                       => $data[ $i++ ],   # Partition
                ErrorCode                                       => $data[ $i++ ],   # ErrorCode
            push( @$Partition_array, $Partition);
        push( @$topics_array, $topic);
    return $OffsetCommit_Response;

# OffsetFetch Request -------------------------------------------------------------

=head3 C<encode_offsetfetch_request( $OffsetFetch_Request )>

Encodes the argument and returns a reference to the encoded binary string
representing a Request buffer.

This function takes the following arguments:

=over 3

=item C<$OffsetFetch_Request>

C<$OffsetFetch_Request> is a reference to the hash representing
the structure of the OffsetFetch Request (examples see C<t/*_decode_encode.t>).


sub encode_offsetfetch_request {
    my ( $OffsetFetch_Request ) = @_;

    my @data;
    my $request = {
                                                # template    => '...',
                                                # len         => ...,
        data        => \@data,

    _encode_request_header( $request, $APIKEY_OFFSETFETCH, $OffsetFetch_Request );
                                                                            # Size
                                                                            # ApiKey
                                                                            # ApiVersion
                                                                            # CorrelationId
                                                                            # ClientId

    $request->{template}    .= q{s>};
    $request->{len}         += 2;
    _encode_string( $request, $OffsetFetch_Request->{GroupId} );            # GroupId

    my $topics_array = $OffsetFetch_Request->{topics};
    push( @data, scalar( @$topics_array ) );                                # topics array size
    $request->{template}    .= q{l>};
    $request->{len}         += 4;

    foreach my $topic ( @$topics_array ) {
        $request->{template}    .= q{s>};
        $request->{len}         += 2;
        _encode_string( $request, $topic->{TopicName} );                    # TopicName

        my $partitions_array = $topic->{partitions};
        push( @data, scalar( @$partitions_array ) );                        # partitions array size
        $request->{template}    .= q{l>};
        $request->{len}         += 4;

        foreach my $partition ( @$partitions_array ){
            push( @data,
                $partition->{Partition},                                    # Partition
            $request->{template}    .= qq{l>};
            $request->{len}         += 4;
    return pack( $request->{template}, $request->{len}, @data );

# OFFSETFETCH Response --------------------------------------------------------------

my $_decode_offsetfetch_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>${_int64_template}s>/as>))};
                                             # x[l]                          # Size (skip)
                                             # l>                            # CorrelationId
                                             # l>                            # topics array size
                                             # X[l]
                                             # l>/(                          # topics array
                                             #    s>/a                       # TopicName
                                             #    l>                         # partitions array size
                                             #    X[l]
                                             #    l>/(                       # partitions array
                                             #       l>                      # Partition
                                             #       $_int64_template        # Offset
                                             #       s>/a                    # Metadata
                                             #       s>                      # ErrorCode
                                             #    )
                                             # )

=head3 C<decode_offsetfetch_response( $bin_stream_ref )>

Decodes the argument and returns a reference to the hash representing
the structure of the OFFSETFETCH Response (examples see C<t/*_decode_encode.t>).

This function takes the following arguments:

=over 3

=item C<$bin_stream_ref>

C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
must be a non-empty binary string.


sub decode_offsetfetch_response {
    my ( $bin_stream_ref ) = @_;

    my @data = unpack( $_decode_offsetfetch_response_template, $$bin_stream_ref );

    my $i = 0;
    my $OffsetFetch_Response = {};

    $OffsetFetch_Response->{CorrelationId}                      =  $data[ $i++ ];   #CorrelationId

    my $topics_array = $OffsetFetch_Response->{topics}          =  [];
    my $topics_array_size                                       =  $data[ $i++ ];   # topics array size
    while ( $topics_array_size-- ) {
        my $topic = {
            TopicName                                           => $data[ $i++ ],   # TopicName

        my $Partition_array = $topic->{partitions} =  [];
        my $Partition_array_size                                =  $data[ $i++ ];   # Partitions array size
        while ( $Partition_array_size-- ) {
            my $Partition = {
                Partition                                       => $data[ $i++ ],   # Partition
                Offset                                          => $data[ $i++ ],   # Partition
                Metadata                                        => $data[ $i++ ],   # Partition
                ErrorCode                                       => $data[ $i++ ],   # ErrorCode
            push( @$Partition_array, $Partition);
        push( @$topics_array, $topic);
    return $OffsetFetch_Response;

#-- private functions ----------------------------------------------------------

# Generates a template to encrypt the request header
sub _encode_request_header {
    my ( $request, $api_key, $request_ref ) = @_;

    # we need to find out which API version to use for the request (and the
    # response). $request_ref->{ApiVersion} can be specified by the end user,
    # or providede by by Kafka::Connection ( see
    # Kafka::Connection::_get_api_versions() ). If not provided, we
    # default to $DEFAULT_APIVERSION
    my $api_version = $request_ref->{ApiVersion} // $DEFAULT_APIVERSION;
    @{ $request->{data} } = (
                                                                            # Size
        $api_key,                                                           # ApiKey
        $api_version,                                                       # ApiVersion
        $request_ref->{CorrelationId},                                      # CorrelationId
    $request->{template}    = $_Request_header_template;
    $request->{len}         = $_Request_header_length;
    _encode_string( $request, $request_ref->{ClientId} );                   # ClientId

    return $api_version;

# Generates a template to decrypt the fetch response body
sub _decode_fetch_response_template {
    my ( $response, $api_version ) = @_;

    my $is_v1 = $api_version == 1;
    my $is_v2 = $api_version == 2;
    my $is_v3 = $api_version == 3;

    if ($is_v1 || $is_v2 || $is_v3) {
        $response->{template}       = $_FetchResponse_header_template_v1;
        $response->{stream_offset}  = $_FetchResponse_header_length_v1;    # bytes before topics array size
                                                                                # [l] Size
                                                                                # [l] CorrelationId
                                                                                # [l] throttle_time_ms
    } else {
        $response->{template}       = $_FetchResponse_header_template;
        $response->{stream_offset}  = $_FetchResponse_header_length;    # bytes before topics array size
                                                                                # [l] Size
                                                                                # [l] CorrelationId
    my $topics_array_size = unpack(
         q{x[}.$response->{stream_offset} . q{]}
        .q{l>},                            # topics array size
        ${ $response->{bin_stream} }
    $response->{stream_offset} += 4;        # bytes before TopicName length
                                                                                # [l] topics array size

    my ( $TopicName_length, $partitions_array_size );
    while ( $topics_array_size-- ) {
        $TopicName_length = unpack(
            .q{]s>},                        # TopicName length
            ${ $response->{bin_stream} }
        $response->{stream_offset} +=       # bytes before partitions array size
              2                                                                 # [s] TopicName length
            + $TopicName_length                                                 # TopicName
        $partitions_array_size = unpack(
            .q{]l>},                        # partitions array size
            ${ $response->{bin_stream} }
        $response->{stream_offset} += 4;    # bytes before Partition
                                                                                # [l] partitions array size

        $response->{template}       .= $_FetchResponse_topic_body_template;
        $response->{stream_offset}  += $_FetchResponse_topic_body_length;   # (without TopicName and partitions array size)
                                            # bytes before MessageSetSize
                                                                                # TopicName
                                                                                # [l] # partitions array size
                                                                                # [l] Partition
                                                                                # [s] ErrorCode
                                                                                # [q] HighwaterMarkOffset

        _decode_MessageSet_template( $response );


# kafka uses a snappy-java compressor, with it's own headers and frame formats
my $XERIAL_SNAPPY_BLOCK_SIZE   = 0x8000; # 32Kb

sub _snappy_xerial_decompress {
    my ( $data ) = @_;
    my $uncompressed;
    my $raw_format_suspected = 1;
    if ( length($data) > 16 ) {
        my ( $header, $x_version, $x_compatversion) = unpack( q{a[8]L>L>}, $data );
        if ( $header eq $XERIAL_SNAPPY_MAGIC_HEADER ) {
            $raw_format_suspected = 0;
            _error( $ERROR_COMPRESSION, 'snappy: bad compatversion in snappy xerial header' ) unless $x_compatversion == $XERIAL_SNAPPY_FILE_VERSION;
            _error( $ERROR_COMPRESSION, 'snappy: bad version in snappy xerial header' ) unless $x_version == $XERIAL_SNAPPY_FILE_VERSION;
            $data = substr( $data, 16 );
            while ( length($data) > 0 ) {
                $uncompressed //= '';
                _error( $ERROR_COMPRESSION, 'snappy: bad frame length header' ) if length($data) < 4;
                my $compressed_frame_length = unpack( q{L>}, $data );
                $data = substr( $data, 4 );
                _error( $ERROR_COMPRESSION, 'snappy: partial frame ' ) if length($data) < $compressed_frame_length;
                my $compressed_frame = substr( $data, 0, $compressed_frame_length,  '');
                my $uncompressed_frame = Compress::Snappy::decompress( $compressed_frame );
                _error( $ERROR_COMPRESSION, 'snappy: can\'t uncompress frame ' ) if not defined $uncompressed_frame;
                $uncompressed .=  $uncompressed_frame;
    if ( $raw_format_suspected ) {
        $uncompressed = Compress::Snappy::decompress( $data );
    return $uncompressed;

sub _snappy_xerial_compress {
    my ( $data ) = @_;
    my $compressed_data;
    while ( length($data) ) {
         my $block = substr $data, 0, $XERIAL_SNAPPY_BLOCK_SIZE, '';
         my $compressed_block = Compress::Snappy::compress( $block );
         _error( $ERROR_COMPRESSION, 'snappy: can\'t compress frame!' ) if not defined $compressed_block;
         $compressed_data .= pack( q{L>}, length($compressed_block) ) . $compressed_block;
    return $compressed_data;

sub _decompress_data {
    my ( $data, $compression_codec ) = @_;
    say STDERR format_message( '[%s] decompress_data request, compression_codec: %s, data: %s',
        scalar( localtime ),
        unpack( 'H*', $data ),
    ) if Kafka::Protocol->debug_level // 0 >= 3;

    my $decompressed;
    if ( $compression_codec == $COMPRESSION_GZIP ) {
        try {
            $decompressed = gunzip( $data );
        } catch {
            _error( $ERROR_COMPRESSION, format_message( 'gunzip failed: %s', $_ ) );
    } elsif ( $compression_codec == $COMPRESSION_SNAPPY ) {
        $decompressed = _snappy_xerial_decompress( $data )
            // _error( $ERROR_COMPRESSION, 'Unable to decompress snappy compressed data' );
    } elsif ( $compression_codec == $COMPRESSION_LZ4 ) {
        # New 0.10 clients (proposed behavior) - produce and consume v1 messages w/ correct LZ4F checksum
        if ( Compress::LZ4Frame::looks_like_lz4frame( $data ) ) {
            $decompressed = Compress::LZ4Frame::decompress( $data ) // _error( $ERROR_COMPRESSION, 'Unable to decompress LZ4 compressed data' );
        } else {
            _error( $ERROR_COMPRESSION, 'Unable to decompress LZ4 compressed data. Frame is not valid' );

    } else {
        _error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" );
    _error( $ERROR_COMPRESSION, 'Decompression produced empty result' ) unless defined $decompressed;
    return $decompressed;

sub _compress_data {
    my ( $data, $compression_codec ) = @_;
    my $compressed;

    # Compression
    if ( $compression_codec == $COMPRESSION_GZIP ) {
        $compressed = gzip( $data )
            // _error( $ERROR_COMPRESSION, 'Unable to compress gzip data' );
    } elsif ( $compression_codec == $COMPRESSION_SNAPPY ) {
        $compressed = _snappy_xerial_compress( $data )
            // _error( $ERROR_COMPRESSION, 'Unable to compress snappy data' );
    } elsif ( $compression_codec == $COMPRESSION_LZ4 ) {
        # New 0.10 clients (proposed behavior) - produce and consume v1 messages w/ correct LZ4F checksum
        $compressed = Compress::LZ4Frame::compress_checksum( $data )
            // _error( $ERROR_COMPRESSION, 'Unable to compress LZ4 data' );
    } else {
         _error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" );
    return $compressed;

# Decrypts MessageSet
sub _decode_MessageSet_array {
    my ( $response, $MessageSetSize, $i_ref, $MessageSet_array_ref, $_override_offset) = @_;
    # $_override_offset should not be set manually, it's used when recursing,
    # to decode internal compressed message, which have offsets starting at 0,
    # 1, etc, and instead we need to set the external wrapping message offset.

    my $data = $response->{data};
    my $data_array_size = scalar @{ $data };

# NOTE: not all messages can be returned
    my ( $Message, $MessageSize, $Crc, $Key_length, $Value_length );
    while ( $MessageSetSize && $$i_ref < $data_array_size ) {

        my $message_offset = _unpack64( $data->[ $$i_ref++ ] );
        if (defined $_override_offset) {
            $message_offset = $_override_offset;
        $Message = {
            Offset  => $message_offset, # Offset

        $MessageSize                                                 =  $data->[ $$i_ref++ ];   # MessageSize
# NOTE: The CRC is the CRC32 of the remainder of the message bytes.
# This is used to check the integrity of the message on the broker and consumer:
# MagicByte + Attributes + Key length + Key + Value length + Value
        $Crc                                                         =  $data->[ $$i_ref++ ];   # Crc
# WARNING: The current version of the module does not support the following:
# A message set is also the unit of compression in Kafka,
# and we allow messages to recursively contain compressed message sets to allow batch compression.
        $Message->{MagicByte}                                        =  $data->[ $$i_ref++ ];   # MagicByte
        my $is_v1_msg_format = $Message->{MagicByte} == 1;
        $Message->{Attributes}                                       =  $data->[ $$i_ref++ ];   # Attributes
        if ($is_v1_msg_format) {
            $Message->{Timestamp}                                    =  $data->[ $$i_ref++ ];   # Timestamp

        $Key_length                                                  =  $data->[ $$i_ref++ ];   # Key length
        $Message->{Key}   = $Key_length   == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ];   # Key
        $Value_length                                                =  $data->[ $$i_ref++ ];   # Value length
        $Message->{Value} = $Value_length == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ];   # Value

        if ( my $compression_codec = $Message->{Attributes} & $COMPRESSION_CODEC_MASK ) {
            my $decompressed = _decompress_data($Message->{Value}, $compression_codec);
            my @data;
            my $Value_length = length $decompressed;
            my $resp = {
                data            => \@data,
                bin_stream      => \$decompressed,
                stream_offset   => 0,
            _decode_MessageSet_sized_template( $Value_length, $resp );
            @data = unpack( $resp->{template}, ${ $resp->{bin_stream} } );
            my $i = 0;  # i_ref
            my $size = length( $decompressed );
                $size,  # message set size
                \$i,    # i_ref
        } else {
            push( @$MessageSet_array_ref, $Message );

        $MessageSetSize -= 12
                                    # [q] Offset
                                    # [l] MessageSize
            + $MessageSize          # Message


# Generates a template to encode single message within MessageSet
sub _encode_Message {
    my ($request, $Key, $Value, $Attributes, $Offset, $v1_format, $Timestamp) = @_;

    # v1 format - supported since 0.10.0, introduces Timestamps
    my $MagicByte = $v1_format ? 1 : 0;
    $Timestamp  //= -1;
    $Attributes //= $COMPRESSION_NONE; # According to Apache Kafka documentation:
                                       # The lowest 2 bits contain the compression codec used for the message.
                                       # The other bits should be set to 0.

    my $key_length   = length( $Key   );
    my $value_length = length( $Value );

    my $message_body = pack(
            q{cc}                                           # MagicByte
                                                            # Attributes
            .( $v1_format    ? q{a[8]}              : q{} ) # Timestamp - v1 only (when MagicByte > 0) (#$_int64_template)
            .q{l>}                                          # Key length
            .( $key_length   ? qq{a[$key_length]}   : q{} ) # Key
            .q{l>}                                          # Value length
            .( $value_length ? qq{a[$value_length]} : q{} ) # Value
        $v1_format    ? ( _pack64( $Timestamp )  ) : (),
        $key_length   ? ( $key_length,    $Key   ) : ( $NULL_BYTES_LENGTH ),
        $value_length ? ( $value_length,  $Value ) : ( $NULL_BYTES_LENGTH ),

    my $MessageBodySize = length($message_body);
    my $MessageSize     = $MessageBodySize + 4;

    my $data = $request->{data};
    $request->{template}    .= $_MessageSet_template . qq{l>a[$MessageBodySize]}; # crc + message_body
    $request->{len}         += $_MessageSet_length + $MessageSize;

    push @$data, (_pack64( $Offset ), $MessageSize, crc32( $message_body ), $message_body );

# Generates a template to encode MessageSet
sub _encode_MessageSet_array {
    my ( $request, $MessageSet_array_ref, $compression_codec ) = @_;

    # not sure if it would be good to mix different formats in different messages in the same set
    # so detect if we should use v1 format
    my $use_v1_format;
    foreach my $MessageSet ( @$MessageSet_array_ref ) {
        if ( $MessageSet->{Timestamp} ) {
            $use_v1_format = 1;

    if ( $compression_codec ) {
        my $subrequest = {
            data     => [],
            template => '',
            len      => 0
        my ( $Key, $Value );

        foreach my $MessageSet ( @$MessageSet_array_ref ) {
            _encode_Message( $subrequest,
                             $Key = $MessageSet->{Key},

        $Value = pack($subrequest->{template}, @{$subrequest->{data}});

        $MessageSet_array_ref = [
                Offset  => $PRODUCER_ANY_OFFSET,
                Key     => $Key,
                Value   => _compress_data($Value, $compression_codec),


    my $subrequest = {
        data     => [],
        template => '',
        len      => 0

    foreach my $MessageSet ( @$MessageSet_array_ref ) {
        _encode_Message( $subrequest,
                         $compression_codec // $COMPRESSION_NONE,

    my $data = $request->{data};
    $request->{template}     .= q{l>}   . $subrequest->{template};
    $request->{len}          +=     4   + $subrequest->{len};
    push @$data, ( $subrequest->{len}, @{ $subrequest->{data}     } );


# Generates a template to decrypt MessageSet
sub _decode_MessageSet_template {
    my ( $response ) = @_;

    my $MessageSetSize = unpack(
        .q{]l>},                            # MessageSetSize
        ${ $response->{bin_stream} }
    $response->{template} .= q{l>};         # MessageSetSize
    $response->{stream_offset} += 4;        # bytes before Offset

    return _decode_MessageSet_sized_template($MessageSetSize, $response);

sub _decode_MessageSet_sized_template {
    my ( $MessageSetSize, $response ) = @_;

    my $bin_stream_length = length ${ $response->{bin_stream} };

    my ( $local_template, $MessageSize, $MagicByte, $Key_length, $Value_length );
    while ( $MessageSetSize ) {
# Not the full MessageSet
# 22 is the minimal size of a v0 message format until (and including) the Key
# Length. If the message version is v1, then it's 22 + 8. We'll check later
        last CREATE_TEMPLATE if $MessageSetSize < 22;

                # [q] Offset
                # [l] MessageSize
                # [l] Crc
                # [c] MagicByte
                # [c] Attributes
                # ( [q] Timestamp ) message format v1 only
                # [l] Key length
                # [l] Value length

        $local_template = q{};
            $response->{stream_offset} += 8; # the size of the offset data value
            ($MessageSize, $MagicByte) = unpack(
                .q{l>}                        # MessageSize
                .q{x[l]}                      # Crc
                .q{c},                        # MagicByte

                ${ $response->{bin_stream} }

            my $is_v1_msg_format = $MagicByte == 1;
            if ($is_v1_msg_format && $MessageSetSize < (22+8)) {
                # Not the full MessageSet
                $local_template = q{};
                last MESSAGE_SET;
            $local_template .= ( $is_v1_msg_format ? $_Message_template_with_timestamp : $_Message_template );

            $response->{stream_offset} += ($is_v1_msg_format ? 18: 10);

            $Key_length = unpack(
                .q{]l>},                        # Key length
                ${ $response->{bin_stream} }

            $response->{stream_offset} += 4;    # bytes before Key or Value length
                                                                                # [l] Key length
            $response->{stream_offset} += $Key_length   # bytes before Key
                if $Key_length != $NULL_BYTES_LENGTH;                           # Key
            if ( $bin_stream_length >= $response->{stream_offset} + 4 ) {   # + [l] Value length
                $local_template .= $_Key_or_Value_template
                    if $Key_length != $NULL_BYTES_LENGTH;
            } else {
# Not the full MessageSet
                $local_template = q{};
                last MESSAGE_SET;

            $local_template .= q{l>};           # Value length
            $Value_length = unpack(
                .q{]l>},                        # Value length
                ${ $response->{bin_stream} }
            $response->{stream_offset} +=       # bytes before Value or next Message
                  4                                                             # [l] Value length
            $response->{stream_offset} += $Value_length # bytes before next Message
                if $Value_length != $NULL_BYTES_LENGTH;                         # Value
            if ( $bin_stream_length >= $response->{stream_offset} ) {
                $local_template .= $_Key_or_Value_template
                    if $Value_length != $NULL_BYTES_LENGTH;
            } else {
# Not the full MessageSet
                $local_template = q{};
                last MESSAGE_SET;

        if ( $local_template ) {
            $response->{template} .= $local_template;
            $MessageSetSize -= 12
                                        # [q] Offset
                                        # [l] MessageSize
                + $MessageSize          # Message
        } else {
            last CREATE_TEMPLATE;


# Generates a template to encrypt string
sub _encode_string {
    my ( $request, $string ) = @_;

    if ( $string eq q{} ) {
        push( @{ $request->{data} }, 0 );
    } else {
        my $string_length = length $string;
        push( @{ $request->{data} }, $string_length, $string );
        $request->{template}    .= q{a*};   # string;
        $request->{len}         += $string_length;


# Handler for errors
sub _error {
    Kafka::Exception::Protocol->throw( throw_args( @_ ) );





In order to achieve better performance, functions of this module do not perform
arguments validation.

=head1 SEE ALSO

The basic operation of the Kafka package modules:

L<Kafka|Kafka> - constants and messages used by the Kafka package modules.

L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.

L<Kafka::Producer|Kafka::Producer> - interface for producing client.

L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.

L<Kafka::Message|Kafka::Message> - interface to access Kafka message

L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
protocol on 32 bit systems.

L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
Apache Kafka's Protocol.

L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server.

L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.

L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
by several package modules.

A wealth of detail about the Apache Kafka and the Kafka Protocol:

Main page at L<>

Kafka Protocol at L<>


Kafka package is hosted on GitHub:

=head1 AUTHOR

Sergey Gladkov

Please use GitHub project link above to report problems or contact authors.


Alexander Solovey

Jeremy Jordan

Sergiy Zuban

Vlad Marchenko

Damien Krotkine

Greg Franklin


Copyright (C) 2012-2017 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under
the same terms as Perl itself. See I<perlartistic> at

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
