The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Kafka::MockProtocol;

=head1 NAME

Kafka::MockProtocol - functions to simulate the messages formation
by the kafka server (in the Apache Kafka's Protocol).

=head1 VERSION

This documentation refers to C<Kafka::MockProtocol> version 0.8007 .


#-- Pragmas --------------------------------------------------------------------

use 5.010;
use strict;
use warnings;

# ENVIRONMENT ------------------------------------------------------------------

our $VERSION = '0.8007';

use Exporter qw(
our @EXPORT_OK = qw(

#-- load the modules -----------------------------------------------------------

use Params::Util qw(
use Scalar::Util qw(
use Scalar::Util::Numeric qw(

use Kafka qw(
use Kafka::Internals qw(
use Kafka::Protocol qw(
use Kafka::TestInternals qw(

#-- declarations ---------------------------------------------------------------


This module is not a user module.

The main features of the C<Kafka::MockProtocol> module are:

=over 3

=item *

Supports parsing the Apache Kafka protocol to emulate the operations performed
by the kafka server.

=item *

Supports Apache Kafka Requests and Responses (PRODUCE, FETCH, OFFSET, METADATA).

=item *

Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.



my ( $_Response_header_template,                            $_Response_header_length ) = (
    q{l>l>l>},      # Size
                    # 4 CorrelationId
                    # 4 topics array size (Broker array size)
    8               # 'Size' is not included in the calculation of length
my ( $_ProduceResponse_body_template,                       $_ProduceResponse_body_length ) = (
    q{l>s>a[8]},    # 4 Partition
                    # 2 ErrorCode
                    # 8 Offset
my ( $_FetchResponse_body_template,                         $_FetchResponse_body_length ) = (
    q{l>s>a[8]},    # 4 Partition
                    # 2 ErrorCode
                    # 8 HighwaterMarkOffset
my ( $_OffsetResponse_body_template,                        $_OffsetResponse_body_length ) = (
    q{l>s>l>},      # 4 Partition
                    # 2 ErrorCode
                    # 4 Offset array size
my ( $_MetadataResponse_Broker_body_template,               $_MetadataResponse_Broker_body_length ) = (
    q{l>s>a*l>},    # 4 NodeId
                    # 2 Host length
                    # Host
                    # 4 Port
    10          # without real Host length
my ( $_MetadataResponse_PartitionMetadata_body_template,    $_MetadataResponse_PartitionMetadata_body_length ) = (
    q{s>l>l>l>},    # 2 ErrorCode
                    # 4 Partition
                    # 4 Leader
                    # 4 Replicas array size
my ( $_ProduceRequest_header_template,                      $_ProduceRequest_header_length ) = (
                    # 4 Size (skip)
                    # 2 ApiKey
                    # 2 ApiVersion
                    # 4 CorrelationId
                    # ClientId
                    # RequiredAcks
                    # Timeout
                    # topics array size
    12          # bytes before ClientId length
my $_ProduceRequest_topic_body_template = q{s>/al>l>};
                    # TopicName
                    # partitions array size
                    # Partition

my $_package_error;

#-- public functions -----------------------------------------------------------


The following functions are available for C<Kafka::MockProtocol> module.


# PRODUCE Request --------------------------------------------------------------

=head3 C<decode_produce_request( $bin_stream_ref )>

Decodes the argument and returns a reference to the hash representing
the structure of the PRODUCE Request (examples see C<t/*_decode_encode.t>).

This function take argument. The following argument is currently recognized:

=over 3

=item C<$bin_stream_ref>

C<$bin_stream_ref> is a reference to the encoded Request buffer. The buffer
must be a non-empty binary string.


sub decode_produce_request {
    my ( $bin_stream_ref ) = @_;

    _is_bin_stream_correct( $bin_stream_ref )
        or return _protocol_error( $ERROR_MISMATCH_ARGUMENT );

    my @data;
    my $request = {
                                                # template      => '...',
                                                # stream_offset => ...,
        bin_stream  => $bin_stream_ref,
        data        => \@data,

    _decode_produce_request_template( $request );
    @data = unpack( $request->{template}, $$bin_stream_ref );

    my ( $i, $Produce_Request ) = ( 0, {} );

    $APIKEY_PRODUCE                                          == $data[ $i++ ]       # ApiKey
        or return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ApiKey' );
    $APIVERSION                                              == $data[ $i++ ]       # ApiVersion
        or return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ApiVersion' );
    $Produce_Request = {
        CorrelationId                                        => $data[ $i++ ],      # CorrelationId
        ClientId                                             => $data[ $i++ ],      # ClientId
        RequiredAcks                                         => $data[ $i++ ],      # RequiredAcks
        Timeout                                              => $data[ $i++ ],      # Timeout

    my $topics_array = $Produce_Request->{topics}            =  [];
    my $topics_array_size                                    =  $data[ $i++ ];      # topics array size
    while ( $topics_array_size-- ) {
        my $topic = {
            TopicName                                        => $data[ $i++ ],      # TopicName length

        my $partitions_array = $topic->{partitions}          =  [];
        my $partitions_array_size                            =  $data[ $i++ ];      # partitions array size
        while ( $partitions_array_size-- ) {
            my $partition = {
                Partition                                    => $data[ $i++ ],      # Partition

            my $MessageSetSize                               =  $data[ $i++ ];      # MessageSetSize
            my $MessageSet_array = $partition->{MessageSet}  =  [];

            _decode_MessageSet_array( $request, $MessageSetSize, \$i, $MessageSet_array );

            push( @$partitions_array, $partition );

        push( @$topics_array, $topic );

    return $Produce_Request;

# PRODUCE Response -------------------------------------------------------------

=head3 C<encode_produce_response( $Produce_Response )>

Encodes the argument and returns a reference to the encoded binary string
representing a Response buffer.

This function take argument. The following argument is currently recognized:

=over 3

=item C<$Produce_Response>

C<$Produce_Response> is a reference to the hash representing
the structure of the PRODUCE Response (examples see C<t/*_decode_encode.t>).


sub encode_produce_response {
    my ( $Produce_Response ) = @_;

    _HASH( $Produce_Response )
        or return _protocol_error( $ERROR_MISMATCH_ARGUMENT );

    my @data = ();
    my $response = {
                                                # template    => '...',
                                                # len         => ...,
        data        => \@data,

    ( defined( $Produce_Response->{CorrelationId} ) && isint( $Produce_Response->{CorrelationId} ) )    # CorrelationId
        ? push( @data, $Produce_Response->{CorrelationId} )
        : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'CorrelationId' );
    $response->{template}   = $_Response_header_template;
    $response->{len}        = $_Response_header_length;

    my $topics_array = $Produce_Response->{topics};
    _ARRAY0( $topics_array )                                                # topics array size
        ? push( @data, scalar( @$topics_array ) )
        : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'topics' );

    foreach my $topic ( @$topics_array ) {
        $response->{template}   .= q{s>};                                   # string length
        $response->{len}        += 2;
        _encode_string( $response, $topic->{TopicName} );                   # TopicName

        my $partitions_array = $topic->{partitions};
        _ARRAY0( $partitions_array )
            ? push( @data, scalar( @$partitions_array ) )
            : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'partitions' );
        $response->{template}   .= q{l>};                                   # partitions array size
        $response->{len}        += 4;   # [l] partitions array size

        foreach my $partition ( @$partitions_array ) {
            ( defined( $partition->{Partition} ) && isint( $partition->{Partition} ) )  # Partition
                ? push( @data, $partition->{Partition} )
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Partition' );
            ( defined( $partition->{ErrorCode} ) && isint( $partition->{ErrorCode} ) )  # ErrorCode
                ? push( @data, $partition->{ErrorCode} )
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ErrorCode' );
            _is_suitable_int( $partition->{Offset} )
                ? push( @data, _pack64( $partition->{Offset} ) )            # Offset
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Offset' );
            $response->{template}   .= $_ProduceResponse_body_template;
            $response->{len}        += $_ProduceResponse_body_length;

    return pack( $response->{template}, $response->{len}, @data );

# FETCH Request ----------------------------------------------------------------

my $_decode_fetch_request_template = qq{x[l]s>s>l>s>/al>l>l>l>X[l]l>/(s>/al>X[l]l>/(l>${_int64_template}l>))};
                                        # x[l]                    # Size (skip)
                                        # s>                      # ApiKey
                                        # s>                      # ApiVersion
                                        # l>                      # CorrelationId
                                        # s>/a                    # ClientId
                                        # l>                      # ReplicaId
                                        # l>                      # MaxWaitTime
                                        # l>                      # MinBytes

                                        # l>                      # topics array size
                                        # X[l]
                                        # l>/(                    # topics array
                                        #     s>/a                    # TopicName

                                        #     l>                      # partitions array size
                                        #     X[l]
                                        #     l>/(                    # partitions array
                                        #         l>                      # Partition
                                        #         $_int64_template        # FetchOffset
                                        #         l>                      # MaxBytes
                                        #     )
                                        # )

=head3 C<decode_fetch_request( $bin_stream_ref )>

Decodes the argument and returns a reference to the hash representing
the structure of the FETCH Request (examples see C<t/*_decode_encode.t>).

This function take argument. The following argument is currently recognized:

=over 3

=item C<$bin_stream_ref>

C<$bin_stream_ref> is a reference to the encoded Request buffer. The buffer
must be a non-empty binary string.


sub decode_fetch_request {
    my ( $bin_stream_ref ) = @_;

    _is_bin_stream_correct( $bin_stream_ref )
        or return _protocol_error( $ERROR_MISMATCH_ARGUMENT );

    my @data = unpack( $_decode_fetch_request_template, $$bin_stream_ref );

    my ( $i, $Fetch_Request ) = ( 0, {} );

    $APIKEY_FETCH                                            == $data[ $i++ ]       # ApiKey
        or return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ApiKey' );
    $APIVERSION                                              == $data[ $i++ ]       # ApiVersion
        or return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ApiVersion' );
    $Fetch_Request->{CorrelationId}                          =  $data[ $i++ ];      # CorrelationId
    $Fetch_Request->{ClientId}                               =  $data[ $i++ ];      # ClientId

    $CONSUMERS_REPLICAID                                     == $data[ $i++ ]       # ReplicaId
        or return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ReplicaId' );
    $Fetch_Request->{MaxWaitTime}                            =  $data[ $i++ ];      # MaxWaitTime
    $Fetch_Request->{MinBytes}                               =  $data[ $i++ ];      # MinBytes

    my $topics_array = $Fetch_Request->{topics}              =  [];
    my $topics_array_size                                    =  $data[ $i++ ];      # topics array size
    while ( $topics_array_size-- ) {
        my $topic = {
            TopicName                                        => $data[ $i++ ],      # TopicName

        my $partitions_array = $topic->{partitions}          =  [];
        my $partitions_array_size                            =  $data[ $i++ ];      # partitions array size
        while ( $partitions_array_size-- ) {
            my $partition = {
                Partition                                    => $data[ $i++ ],      # Partition
                FetchOffset                       => _unpack64( $data[ $i++ ] ),    # FetchOffset
                MaxBytes                                     => $data[ $i++ ],      # MaxBytes

            push( @$partitions_array, $partition );

        push( @$topics_array, $topic );

    return $Fetch_Request;

# FETCH Response ---------------------------------------------------------------

=head3 C<encode_fetch_response( $Fetch_Response )>

Encodes the argument and returns a reference to the encoded binary string
representing a Response buffer.

This function take argument. The following argument is currently recognized:

=over 3

=item C<$Fetch_Response>

C<$Fetch_Response> is a reference to the hash representing
the structure of the FETCH Response (examples see C<t/*_decode_encode.t>).


sub encode_fetch_response {
    my ( $Fetch_Response ) = @_;

    _HASH( $Fetch_Response )
        or return _protocol_error( $ERROR_MISMATCH_ARGUMENT );

    my @data = ();
    my $response = {
                                                # template    => '...',
                                                # len         => ...,
        data        => \@data,

    ( defined( $Fetch_Response->{CorrelationId} ) && isint( $Fetch_Response->{CorrelationId} ) )    # CorrelationId
        ? push( @data, $Fetch_Response->{CorrelationId} )
        : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'CorrelationId' );
    $response->{template}   = $_Response_header_template;
    $response->{len}        = $_Response_header_length;

    my $topics_array = $Fetch_Response->{topics} // [];
    _ARRAY0( $topics_array )
        ? push( @data, scalar( @$topics_array ) )
        : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'topics' );

    foreach my $topic ( @$topics_array ) {
        $response->{template}   .= q{s>};                                   # string length
        $response->{len}        += 2;
        _encode_string( $response, $topic->{TopicName} );                   # TopicName

        my $partitions_array = $topic->{partitions} // [];
        _ARRAY0( $partitions_array )
            ? push( @data, scalar( @$partitions_array ) )
            : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'partitions' );
        $response->{template}   .= q{l>};                                   # partitions array size
        $response->{len}        += 4;       # [l] partitions array size

        foreach my $partition ( @$partitions_array ) {
            ( defined( $partition->{Partition} ) && isint( $partition->{Partition} ) )  # Partition
                ? push( @data, $partition->{Partition} )
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Partition' );
            ( defined( $partition->{ErrorCode} ) && isint( $partition->{ErrorCode} ) )  # ErrorCode
                ? push( @data, $partition->{ErrorCode} )
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ErrorCode' );
            _is_suitable_int( $partition->{HighwaterMarkOffset} )           # HighwaterMarkOffset
                ? push( @data, _pack64( $partition->{HighwaterMarkOffset} ) )
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'HighwaterMarkOffset' );
            $response->{template}   .= $_FetchResponse_body_template;
            $response->{len}        += $_FetchResponse_body_length;

            _encode_MessageSet_array( $response, $partition->{MessageSet} );

    return pack( $response->{template}, $response->{len}, @data );

# OFFSET Request ---------------------------------------------------------------

my $_decode_offset_request_template = qq{x[l]s>s>l>s>/al>l>X[l]l>/(s>/al>X[l]l>/(l>${_int64_template}l>))};
                                        # x[l]                    # Size (skip)
                                        # s>                      # ApiKey
                                        # s>                      # ApiVersion
                                        # l>                      # CorrelationId
                                        # s>/a                    # ClientId
                                        # l>                      # ReplicaId

                                        # l>                      # topics array size
                                        # X[l]
                                        # l>/(                    # topics array
                                        #     s>/a                    # TopicName

                                        #     l>                      # partitions array size
                                        #     X[l]
                                        #     l>/(                    # partitions array
                                        #         l>                      # Partition
                                        #         $_int64_template        # Time
                                        #         l>                      # MaxNumberOfOffsets
                                        #     )
                                        # )

=head3 C<decode_offset_request( $bin_stream_ref )>

Decodes the argument and returns a reference to the hash representing
the structure of the OFFSET Request (examples see C<t/*_decode_encode.t>).

This function take argument. The following argument is currently recognized:

=over 3

=item C<$bin_stream_ref>

C<$bin_stream_ref> is a reference to the encoded Request buffer. The buffer
must be a non-empty binary string.


sub decode_offset_request {
    my ( $bin_stream_ref ) = @_;

    _is_bin_stream_correct( $bin_stream_ref )
        or return _protocol_error( $ERROR_MISMATCH_ARGUMENT );

    my @data = unpack( $_decode_offset_request_template, $$bin_stream_ref );

    my ( $i, $Offset_Request ) = ( 0, {} );

    $APIKEY_OFFSET                                           == $data[ $i++ ]       # ApiKey
        or return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ApiKey' );
    $APIVERSION                                              == $data[ $i++ ]       # ApiVersion
        or return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ApiVersion' );
    $Offset_Request->{CorrelationId}                         =  $data[ $i++ ];      # CorrelationId
    $Offset_Request->{ClientId}                              =  $data[ $i++ ];      # ClientId

    $CONSUMERS_REPLICAID                                     == $data[ $i++ ]       # ReplicaId
        or return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ReplicaId' );

    my $topics_array = $Offset_Request->{topics}             =  [];
    my $topics_array_size                                    =  $data[ $i++ ];      # topics array size
    while ( $topics_array_size-- ) {
        my $topic = {
            TopicName                                        => $data[ $i++ ],      # TopicName

        my $partitions_array = $topic->{partitions}          =  [];
        my $partitions_array_size                            =  $data[ $i++ ];      # partitions array size
        while ( $partitions_array_size-- ) {
            my $partition = {
                Partition                                    => $data[ $i++ ],      # Partition
                Time                              => _unpack64( $data[ $i++ ] ),    # Time
                MaxNumberOfOffsets                           => $data[ $i++ ],      # MaxNumberOfOffsets

            push @$partitions_array, $partition;

        push @$topics_array, $topic;

    return $Offset_Request;

# OFFSET Response --------------------------------------------------------------

=head3 C<encode_offset_response( $Produce_Response )>

Encodes the argument and returns a reference to the encoded binary string
representing a Response buffer.

This function take argument. The following argument is currently recognized:

=over 3

=item C<$Offset_Response>

C<$Offset_Response> is a reference to the hash representing
the structure of the OFFSET Response (examples see C<t/*_decode_encode.t>).


sub encode_offset_response {
    my ( $Offset_Response ) = @_;

    _HASH( $Offset_Response )
        or return _protocol_error( $ERROR_MISMATCH_ARGUMENT );

    my @data;
    my $response = {
                                                # template    => '...',
                                                # len         => ...,
        data        => \@data,

    ( defined( $Offset_Response->{CorrelationId} ) && isint( $Offset_Response->{CorrelationId} ) )  # CorrelationId
        ? push( @data, $Offset_Response->{CorrelationId} )
        : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'CorrelationId' );
    $response->{template}   = $_Response_header_template;
    $response->{len}        = $_Response_header_length;

    my $topics_array = $Offset_Response->{topics} // [];
    _ARRAY0( $topics_array )                                                # topics array size
        ? push( @data, scalar( @$topics_array ) )
        : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'topics' );

    foreach my $topic ( @$topics_array ) {
        $response->{template}   .= q{s>};                                   # string length
        $response->{len}        += 2;
        _encode_string( $response, $topic->{TopicName} );                   # TopicName

        my $PartitionOffsets_array = $topic->{PartitionOffsets} // [];
        _ARRAY0( $PartitionOffsets_array )
            ? push( @data, scalar( @$PartitionOffsets_array ) )
            : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'PartitionOffsets' );
        $response->{template}   .= q{l>};                                   # PartitionOffsets array size
        $response->{len}        += 4;   # [l] PartitionOffsets array size

        foreach my $PartitionOffsets ( @$PartitionOffsets_array ) {
            ( defined( $PartitionOffsets->{Partition} ) && isint( $PartitionOffsets->{Partition} ) )    # Partition
                ? push( @data, $PartitionOffsets->{Partition} )
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Partition' );
            ( defined( $PartitionOffsets->{ErrorCode} ) && isint( $PartitionOffsets->{ErrorCode} ) )    # ErrorCode
                ? push( @data, $PartitionOffsets->{ErrorCode} )
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ErrorCode' );
            my $Offset_array = $PartitionOffsets->{Offset} // [];
            _ARRAY0( $Offset_array )                                        # Offset array size
                ? push( @data, scalar( @$Offset_array ) )
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Offset array' );
            $response->{template}   .= $_OffsetResponse_body_template;
            $response->{len}        += $_OffsetResponse_body_length;

            foreach my $Offset ( @$Offset_array ) {
                _is_suitable_int( $Offset )                                 # Offset
                    ? push( @data, _pack64( $Offset ) )
                    : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Offset' );
                $response->{template}   .= q{a[8]};
                $response->{len}        += 8;

    return pack( $response->{template}, $response->{len}, @data );

# METADATA Request -------------------------------------------------------------

my $_decode_metadata_request_template = q{x[l]s>s>l>s>/al>X[l]l>/(s>/a)};
                                        # x[l]                    # Size (skip)
                                        # s>                      # ApiKey
                                        # s>                      # ApiVersion
                                        # l>                      # CorrelationId
                                        # s>/a                    # ClientId

                                        # l>                      # topics array size
                                        # X[l]
                                        # l>/(                    # topics array
                                        #     s>/a                    # TopicName
                                        # )

=head3 C<decode_metadata_request( $bin_stream_ref )>

Decodes the argument and returns a reference to the hash representing
the structure of the METADATA Request (examples see C<t/*_decode_encode.t>).

This function take argument. The following argument is currently recognized:

=over 3

=item C<$bin_stream_ref>

C<$bin_stream_ref> is a reference to the encoded Request buffer. The buffer
must be a non-empty binary string.


sub decode_metadata_request {
    my ( $bin_stream_ref ) = @_;

    _is_bin_stream_correct( $bin_stream_ref )
        or return _protocol_error( $ERROR_MISMATCH_ARGUMENT );

    my @data = unpack( $_decode_metadata_request_template, $$bin_stream_ref );

    my ( $i, $Metadata_Request ) = ( 0, {} );

    $APIKEY_METADATA                                     == $data[ $i++ ]   # ApiKey
        or return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ApiKey' );
    $APIVERSION                                          == $data[ $i++ ]   # ApiVersion
        or return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ApiVersion' );
    $Metadata_Request->{CorrelationId}                   =  $data[ $i++ ];  # CorrelationId
    $Metadata_Request->{ClientId}                        =  $data[ $i++ ];  # ClientId

    my $topics_array = $Metadata_Request->{topics}       =  [];
    my $topics_array_size                                =  $data[ $i++ ];  # topics array size
    while ( $topics_array_size-- ) {
        push( @$topics_array,                               $data[ $i++ ] );    # TopicName

    return $Metadata_Request;

# METADATA Response ------------------------------------------------------------

=head3 C<encode_metadata_response( $Metadata_Response )>

Encodes the argument and returns a reference to the encoded binary string
representing a Response buffer.

This function take argument. The following argument is currently recognized:

=over 3

=item C<$Metadata_Response>

C<$Metadata_Response> is a reference to the hash representing
the structure of the METADATA Response (examples see C<t/*_decode_encode.t>).


sub encode_metadata_response {
    my ( $Metadata_Response ) = @_;

    _HASH( $Metadata_Response )
        or return _protocol_error( $ERROR_MISMATCH_ARGUMENT );

    my @data;
    my $response = {
                                                # template    => '...',
                                                # len         => ...,
        data        => \@data,

    ( defined( $Metadata_Response->{CorrelationId} ) && isint( $Metadata_Response->{CorrelationId} ) )  # CorrelationId
        ? push( @data, $Metadata_Response->{CorrelationId} )
        : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'CorrelationId' );
    $response->{template}   = $_Response_header_template;
    $response->{len}        = $_Response_header_length;

    my $Broker_array = $Metadata_Response->{Broker} // [];
    _ARRAY0( $Broker_array )                                                # Broker array size
        ? push( @data, scalar( @$Broker_array ) )
        : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Broker' );

    foreach my $Broker ( @$Broker_array ) {
        ( defined( $Broker->{NodeId} ) && isint( $Broker->{NodeId} ) )      # NodeId
            ? push( @data, $Broker->{NodeId} )
            : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'NodeId' );
        defined( my $Host = $Broker->{Host} )
            or return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Host' );
        _verify_string( $Host, 'Host' )
            or return;
        my $Host_length = length $Broker->{Host};
        _STRING( $Broker->{Host} )                                          # Host
            ? push( @data, $Host_length, $Broker->{Host} )
            : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Host' );
        $response->{len} += $Host_length;
        _POSINT( $Broker->{Port} )                                          # Port
            ? push( @data, $Broker->{Port} )
            : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Port' );
        $response->{template}   .= $_MetadataResponse_Broker_body_template;
        $response->{len}        += $_MetadataResponse_Broker_body_length;

    my $TopicMetadata_array = $Metadata_Response->{TopicMetadata} // [];

    _ARRAY0( $TopicMetadata_array )                                         # TopicMetadata array size
        ? push( @data, scalar( @$TopicMetadata_array ) )
        : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'TopicMetadata' );
    $response->{template}   .= q{l>};
    $response->{len}        += 4;

    foreach my $TopicMetadata ( @$TopicMetadata_array ) {
        ( defined( $TopicMetadata->{ErrorCode} ) && isint( $TopicMetadata->{ErrorCode} ) )  # ErrorCode
            ? push( @data, $TopicMetadata->{ErrorCode} )
            : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ErrorCode' );
        $response->{template} .= q{s>s>};
                    # 2 ErrorCode
                    # 2 string length
        $response->{len} += 4;
        _encode_string( $response, $TopicMetadata->{TopicName} );           # TopicName

        my $PartitionMetadata_array = $TopicMetadata->{PartitionMetadata} // [];
        _ARRAY0( $PartitionMetadata_array )
            ? push( @data, scalar( @$PartitionMetadata_array ) )
            : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'PartitionMetadata' );
        $response->{template}   .= q{l>};                                   # PartitionMetadata array size
        $response->{len}        += 4;

        foreach my $PartitionMetadata ( @$PartitionMetadata_array ) {
            ( defined( $PartitionMetadata->{ErrorCode} ) && isint( $PartitionMetadata->{ErrorCode} ) )  # ErrorCode
                ? push( @data, $PartitionMetadata->{ErrorCode} )
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'ErrorCode' );
            ( defined( $PartitionMetadata->{Partition} ) && isint( $PartitionMetadata->{Partition} ) )  # Partition
                ? push( @data, $PartitionMetadata->{Partition} )
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Partition' );
            ( defined( $PartitionMetadata->{Leader} ) && isint( $PartitionMetadata->{Leader} ) )    # Leader
                ? push( @data, $PartitionMetadata->{Leader} )
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Leader' );
            my $Replicas_array = $PartitionMetadata->{Replicas} // [];
            _ARRAY0( $Replicas_array )                                      # Replicas array size
                ? push( @data, scalar( @$Replicas_array ) )
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Replicas' );
            $response->{template}   .= $_MetadataResponse_PartitionMetadata_body_template;
            $response->{len}        += $_MetadataResponse_PartitionMetadata_body_length;

            foreach my $Replica ( @$Replicas_array ) {
                ( defined( $Replica ) && isint( $Replica ) )                 # ReplicaId
                    ? push( @data, $Replica )
                    : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Replicas ReplicaId' );
                $response->{template}   .= q{l>};
                $response->{len}        += 4;

            my $Isr_array = $PartitionMetadata->{Isr} // [];
            _ARRAY0( $Isr_array )                                           # Isr array size
                ? push( @data, scalar( @$Isr_array ) )
                : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Isr' );
            $response->{template}   .= q{l>};
            $response->{len}        += 4;

            foreach my $Isr ( @$Isr_array ) {
                ( defined( $Isr ) && isint( $Isr ) )                        # ReplicaId
                    ? push( @data, $Isr )
                    : return _protocol_error( $ERROR_REQUEST_OR_RESPONSE, 'Isr ReplicaId' );
                $response->{template}   .= q{l>};
                $response->{len}        += 4;

    return pack( $response->{template}, $response->{len}, @data );

=head3 C<Kafka::MockProtocol::last_errorcode()>

This method returns an error code that specifies the
description in the C<%Kafka::ERROR> hash. Analysing this information
can be done to determine the cause of the error.

sub last_errorcode {
    return ( $_package_error // 0 ) + 0;

=head3 C<Kafka::MockProtocol::last_error()>

This method returns an error message that contains information about the
encountered failure. Messages returned from this method may contain
additional details and do not coincide completely with the C<%Kafka::ERROR> hash.

sub last_error {
    return ( $_package_error // q{} ).q{};

#-- private functions ----------------------------------------------------------

# Generates a template to decrypt the request body
sub _decode_produce_request_template {
    my ( $request ) = @_;

    my (

    $request->{template}        = $_ProduceRequest_header_template;
    $request->{stream_offset}   = $_ProduceRequest_header_length;   # bytes before ClientId length
                                                                                # [l] Size
                                                                                # [s] ApiKey
                                                                                # [s] ApiVersion
                                                                                # [l] CorrelationId

    $ClientId_length = unpack(
        .q{]s>},                        # ClientId length
        ${ $request->{bin_stream} }

    $request->{stream_offset} += 8      # bytes before RequiredAcks
                                                                                # [s] ClientId length
        + $ClientId_length                                                      # ClientId
                                        # bytes before topics array size
                                                                                # [s] RequiredAcks
                                                                                # [l] Timeout
    $topics_array_size = unpack(
        .q{]l>},                        # topics array size
        ${ $request->{bin_stream} }

    $request->{stream_offset} += 4;     # bytes before TopicName length
                                                                                # [l] topics array size

    while ( $topics_array_size-- ) {
        $request->{template} .= $_ProduceRequest_topic_body_template;
                                        # TopicName
                                        # partitions array size
                                        # Partition

        $TopicName_length = unpack(
            .q{]s>},                    # TopicName length
            ${ $request->{bin_stream} }
        $request->{stream_offset} +=    # bytes before partitions array size
              2                                                                 # [s] TopicName length
            + $TopicName_length                                                 # TopicName

        $partitions_array_size = unpack(
            .q{]l>},                    # partitions array size
            ${ $request->{bin_stream} }

        $request->{stream_offset} += 8; # bytes before Partition
                                                                                # [l] partitions array size
                                        # bytes before MessageSetSize
                                                                                # [l] Partition

        _decode_MessageSet_template( $request );

# Verifies that the argument is a reference to a string
sub _is_bin_stream_correct {
    my ( $bin_stream_ref ) = @_;

    return _SCALAR( $bin_stream_ref ) && _STRING( $$bin_stream_ref );

# Verifies that the first argument is the string does not contain Unicode data
sub _verify_string {
    my ( $string, $description ) = @_;

    return 1
        if $string eq q{};
    _STRING( $string )
        // return _protocol_error( $ERROR_MISMATCH_ARGUMENT, $description );
    utf8::is_utf8( $string )
        and return _protocol_error( $ERROR_NOT_BINARY_STRING, $description );

    return 1;

# Sets the value of an internal attribute describing the error:
#   $error_code     - error code
#   $description    - additional description of the error
sub _protocol_error {
    my ( $error_code, $description ) = @_;

    $_package_error = dualvar $error_code, $ERROR{ $error_code }.( $description ? ': '.$description : q{} );




Error will set the non-zero value returned by L</"Kafka::MockProtocol::last_errorcode()"> function.

Error messages are presented in the hash L<%Kafka::ERROR|%Kafka::ERROR>.
You may experience the following errors:

=over 3


This means that you didn't give the right argument to some of


This means that one of the fields of the request or response contains invalid data.


This means that one of the string fields of response does not contain a string
or contains a string with Unicode data.


For more error description, always look at the message
from the C<Kafka::MockProtocol::last_error()> function.

=head1 SEE ALSO

The basic operation of the Kafka package modules:

L<Kafka|Kafka> - constants and messages used by the Kafka package modules.

L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.

L<Kafka::Producer|Kafka::Producer> - interface for producing client.

L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.

L<Kafka::Message|Kafka::Message> - interface to access Kafka message

L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
protocol on 32 bit systems.

L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
Apache Kafka's Protocol.

L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server.

L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.

L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
by several package modules.

A wealth of detail about the Apache Kafka and the Kafka Protocol:

Main page at L<>

Kafka Protocol at L<>

=head1 AUTHOR

Sergey Gladkov, E<lt>sgladkov@trackingsoft.comE<gt>


Alexander Solovey

Jeremy Jordan

Sergiy Zuban

Vlad Marchenko


Copyright (C) 2012-2013 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under
the same terms as Perl itself. See I<perlartistic> at

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
