The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Kafka::Protocol - Functions to process messages in the Apache Kafka protocol.

VERSION

This documentation refers to Kafka::Protocol version 1.001013 .

SYNOPSIS

    use 5.010;
    use strict;
    use warnings;

    use Data::Compare;
    use Kafka qw(
        $COMPRESSION_NONE
        $ERROR_NO_ERROR
        $REQUEST_TIMEOUT
        $WAIT_WRITTEN_TO_LOCAL_LOG
    );
    use Kafka::Internals qw(
        $PRODUCER_ANY_OFFSET
    );
    use Kafka::Protocol qw(
        decode_produce_response
        encode_produce_request
    );

    # a encoded produce request hex stream
    my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' );

    # a decoded produce request
    my $decoded = {
        CorrelationId                       => 4,
        ClientId                            => q{},
        RequiredAcks                        => $WAIT_WRITTEN_TO_LOCAL_LOG,
        Timeout                             => $REQUEST_TIMEOUT * 100,  # ms
        topics                              => [
            {
                TopicName                   => 'mytopic',
                partitions                  => [
                    {
                        Partition           => 0,
                        MessageSet              => [
                            {
                                Offset          => $PRODUCER_ANY_OFFSET,
                                MagicByte       => 0,
                                Attributes      => $COMPRESSION_NONE,
                                Key             => q{},
                                Value           => 'Hello!',
                            },
                        ],
                    },
                ],
            },
        ],
    };

    my $encoded_request = encode_produce_request( $decoded );
    say 'encoded correctly' if $encoded_request eq $encoded;

    # a encoded produce response hex stream
    $encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' );

    # a decoded produce response
    $decoded = {
        CorrelationId                           => 4,
        topics                                  => [
            {
                TopicName                       => 'mytopic',
                partitions                      => [
                    {
                        Partition               => 0,
                        ErrorCode               => $ERROR_NO_ERROR,
                        Offset                  => 0,
                    },
                ],
            },
        ],
    };

    my $decoded_response = decode_produce_response( \$encoded );
    say 'decoded correctly' if Compare( $decoded_response, $decoded );

    # more examples, see t/*_decode_encode.t

DESCRIPTION

This module is not a user module.

In order to achieve better performance, functions of this module do not perform arguments validation.

The main features of the Kafka::Protocol module are:

  • Supports parsing the Apache Kafka protocol.

  • Supports Apache Kafka Requests and Responses (PRODUCE and FETCH). Within this package we currently support access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses.

  • Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.

EXPORT

The following constants are available for export

$APIVERSION

According to Apache Kafka documentation: 'This is a numeric version number for this api. Currently the supported version for all APIs is 0 .'

$CONSUMERS_REPLICAID

According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.'

$NULL_BYTES_LENGTH

According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.'

$BAD_OFFSET

According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes.'

FUNCTIONS

The following functions are available for Kafka::MockProtocol module.

encode_produce_request( $Produce_Request, $compression_codec )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function take argument. The following argument is currently recognized:

$Produce_Request

$Produce_Request is a reference to the hash representing the structure of the PRODUCE Request (examples see t/*_decode_encode.t).

$compression_codec

Optional.

$compression_codec sets the required type of $messages compression, if the compression is desirable.

Supported codecs: $COMPRESSION_NONE, $COMPRESSION_GZIP, $COMPRESSION_SNAPPY.

decode_produce_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the PRODUCE Response (examples see t/*_decode_encode.t).

This function take argument. The following argument is currently recognized:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_fetch_request( $Fetch_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function take argument. The following argument is currently recognized:

$Fetch_Request

$Fetch_Request is a reference to the hash representing the structure of the FETCH Request (examples see t/*_decode_encode.t).

decode_fetch_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the FETCH Response (examples see t/*_decode_encode.t).

This function take argument. The following argument is currently recognized:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_offset_request( $Offset_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function take argument. The following argument is currently recognized:

$Offset_Request

$Offset_Request is a reference to the hash representing the structure of the OFFSET Request (examples see t/*_decode_encode.t).

decode_offset_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the OFFSET Response (examples see t/*_decode_encode.t).

This function take argument. The following argument is currently recognized:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_metadata_request( $Metadata_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function take argument. The following argument is currently recognized:

$Metadata_Request

$Metadata_Request is a reference to the hash representing the structure of the METADATA Request (examples see t/*_decode_encode.t).

decode_metadata_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the METADATA Response (examples see t/*_decode_encode.t).

This function take argument. The following argument is currently recognized:

$bin_stream_ref

$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

DIAGNOSTICS

In order to achieve better performance, functions of this module do not perform arguments validation.

SEE ALSO

The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules.

Kafka::Connection - interface to connect to a Kafka cluster.

Kafka::Producer - interface for producing client.

Kafka::Consumer - interface for consuming client.

Kafka::Message - interface to access Kafka message properties.

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.

Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.

Kafka::IO - low-level interface for communication with Kafka server.

Kafka::Exceptions - module designated to handle Kafka exceptions.

Kafka::Internals - internal constants and functions used by several package modules.

A wealth of detail about the Apache Kafka and the Kafka Protocol:

Main page at http://kafka.apache.org/

Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

SOURCE CODE

Kafka package is hosted on GitHub: https://github.com/TrackingSoft/Kafka

AUTHOR

Sergey Gladkov, <sgladkov@trackingsoft.com>

Please use GitHub project link above to report problems or contact authors.

CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Sergiy Zuban

Vlad Marchenko

COPYRIGHT AND LICENSE

Copyright (C) 2012-2016 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.