Sergey Gladkov > Kafka-0.12 > Kafka

Download:
Kafka-0.12.tar.gz

Dependencies

Annotate this POD

CPAN RT

Open  0
View/Report Bugs
Module Version: 0.12   Source   Latest Release: Kafka-0.8007

NAME ^

Kafka - constants and messages used by the Kafka package modules

VERSION ^

This documentation refers to Kafka package version 0.12

SYNOPSIS ^

An example of Kafka usage:

    use Kafka qw(
        BITS64
        KAFKA_SERVER_PORT
        DEFAULT_TIMEOUT
        TIMESTAMP_EARLIEST
        DEFAULT_MAX_OFFSETS
        DEFAULT_MAX_SIZE
        );

    # common information
    print "This is Kafka package $Kafka::VERSION\n";
    print "You have a ", BITS64 ? "64" : "32", " bit system\n";

    use Kafka::IO;

    # connect to local server with the defaults
    my $io = Kafka::IO->new( host => "localhost" );

    # decoding of the error code
    unless ( $io )
    {
        print STDERR "last error: ",
            $Kafka::ERROR[Kafka::IO::last_errorcode], "\n";
    }

To see a brief but working code example of the Kafka package usage look at the "An Example" section.

ABSTRACT ^

The Kafka package is a set of Perl modules which provides a simple and consistent application programming interface (API) to Apache Kafka 0.7, a high-throughput distributed messaging system. This is a low-level API implementation which DOES NOT interract with an Apache ZooKeeper for consumer coordination and/or load balancing.

DESCRIPTION ^

The user modules in this package provide an object oriented API. The IO agents, requests sent, and responses received from the Apache Kafka or mock servers are all represented by objects. This makes a simple and powerful interface to these services.

The main features of the package are:

APACHE KAFKA'S STYLE COMMUNICATION ^

The Kafka package is based on Kafka's 0.7 Wire Format specification document at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/

Communication with Kafka always assumes to follow these steps: First the IO and client objects are created and configured.

The Kafka client has the class name Kafka::Producer or Kafka::Consumer.

Messages are the fundamental unit of communication. They are published to a topic by a producer, which means they are physically sent to a server acting as a broker. Some number of consumers subscribe to a topic, and each published message is delivered to all the consumers. The messages stream is partitioned on the brokers as a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition the messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order. In Apache Kafka, the consumers are responsible for maintaining state information (offset) on what has been consumed. A consumer can deliberately rewind back to an old offset and re-consume data. Each message is uniquely identified by a 64-bit integer offset giving the byte position of the start of this message in the stream of all messages ever sent to that topic on that partition. Reads are done by giving the 64-bit logical offset of a message and a max chunk size.

The request is then passed through the client to a server and we get the response in return to a consumer request that we can examine. A request is always independent of any previous requests, i.e. the service is stateless. This API is completely stateless, with the topic and partition being passed in on every request.

The IO Object

The clients uses the IO object to communicate with the Apache Kafka server. The IO object is an interface layer between your application code and the network.

IO object is required to create objects of classes Kafka::Producer and Kafka::Consumer.

Kafka IO API is implemented by Kafka::IO class.

    use Kafka::IO;

    # connect to local server with the defaults
    my $io = Kafka::IO->new( host => "localhost" );

The main attributes of the IO object are:

The Producer Object

Kafka producer API is implemented by Kafka::Producer class.

    use Kafka::Producer;

    #-- Producer
    my $producer = Kafka::Producer->new( IO => $io );

    # Sending a single message
    $producer->send(
        "test",             # topic
        0,                  # partition
        "Single message"    # message
        );

    # Sending a series of messages
    $producer->send(
        "test",             # topic
        0,                  # partition
        [                   # messages
            "The first message",
            "The second message",
            "The third message",
        ]
        );

The main attributes of the producer request are:

The Consumer Object

Kafka consumer API is implemented by Kafka::Consumer class.

    use Kafka::Consumer;

    $consumer = Kafka::Consumer->new( IO => $io );

The request methods of the consumer object are offsets() and fetch().

offsets method returns a reference to the list of offsets of received messages.

fetch method returns a reference to the list of received Kafka::Message objects.

    # Get a list of valid offsets up to max_number before the given time
    if ( my $offsets = $consumer->offsets(
        "test",             # topic
        0,                  # partition
        TIMESTAMP_EARLIEST, # time
        DEFAULT_MAX_OFFSETS # max_number
        ) )
    {
        foreach my $offset ( @$offsets )
        {
            print "Received offset: $offset\n";
        }
    }

    # Consuming messages
    if ( my $messages = $consumer->fetch(
        "test",             # topic
        0,                  # partition
        0,                  # offset
        DEFAULT_MAX_SIZE    # max_size
        ) )
    {
        foreach my $message ( @$messages )
        {
            if( $message->valid )
            {
                print "payload    : ", $message->payload,       "\n";
                print "offset     : ", $message->offset,        "\n";
                print "next_offset: ", $message->next_offset,   "\n";
            }
        }
    }

The arguments:

The Message Object

Kafka message API is implemented by Kafka::Message class.

    if( $message->valid )
    {
        print "payload    : ", $message->payload,       "\n";
        print "offset     : ", $message->offset,        "\n";
        print "next_offset: ", $message->next_offset,   "\n";
    }
    else
    {
        print "error      : ", $message->error,         "\n";
    }

Available methods of Kafka::Message object are:

Common

Both Kafka::Producer and Kafka::Consumer objects described above also have the following common methods:

EXPORT

None by default.

Additional constants are available for import, which can be used to define some type of parameters, and to identify various error cases.

These are the defaults:

KAFKA_SERVER_PORT

default Apache Kafka server port - 9092.

DEFAULT_TIMEOUT

timeout in secs, for gethostbyname, connect, blocking receive and send calls (could be any integer or floating-point type) - 0.5 sec.

TIMESTAMP_LATEST

timestamp of the offsets before this time (ms) special value -1 : latest

TIMESTAMP_EARLIEST

timestamp of the offsets before this time (ms) special value -2 : earliest

DEFAULT_MAX_SIZE

maximum size of message(s) to receive - 1MB

DEFAULT_MAX_OFFSETS

maximum number of offsets to retrieve - 100

MAX_SOCKET_REQUEST_BYTES

The maximum size of a request that the socket server will accept (protection against OOM). Default limit (as configured in server.properties) is 104857600

Possible error codes returned by last_errorcode method (complies with an array of descriptions @Kafka::ERROR):

ERROR_INVALID_MESSAGE_CODE

0 - Invalid message

ERROR_MISMATCH_ARGUMENT

1 - Mismatch argument

ERROR_WRONG_CONNECT

2 - You must configure a host to connect to!

ERROR_CANNOT_SEND

3 - Can't send

ERROR_CANNOT_RECV

4 - Can't receive

ERROR_CANNOT_BIND

5 - Can't bind

ERROR_CHECKSUM_ERROR

6 - Checksum error

ERROR_COMPRESSED_PAYLOAD

7 - Compressed payload

ERROR_NUMBER_OF_OFFSETS

7 - Amount received offsets does not match 'NUMBER of OFFSETS'

ERROR_NOTHING_RECEIVE

8 - Nothing to receive

ERROR_IN_ERRORCODE

9 - Response contains an error in 'ERROR_CODE'

Support for working with 64 bit elements of the Kafka Wire Format protocol on 32 bit systems:

BITS64

Know you are working on 64 or 32 bit system

GLOBAL VARIABLES

@Kafka::ERROR

Contain the descriptions for possible error codes returned by last_errorcode methods and functions of the package modules.

%Kafka::ERROR_CODE

Contain the descriptions for possible error codes in the ERROR_CODE box of Apache Kafka Wire Format protocol responses.

An Example

    use Kafka qw(
        KAFKA_SERVER_PORT
        DEFAULT_TIMEOUT
        TIMESTAMP_EARLIEST
        DEFAULT_MAX_OFFSETS
        DEFAULT_MAX_SIZE
        );
    use Kafka::IO;
    use Kafka::Producer;
    use Kafka::Consumer;

    #-- IO
    my $io = Kafka::IO->new( host => "localhost" );

    #-- Producer
    my $producer = Kafka::Producer->new( IO => $io );

    # Sending a single message
    $producer->send(
        "test",             # topic
        0,                  # partition
        "Single message"    # message
        );

    # Sending a series of messages
    $producer->send(
        "test",             # topic
        0,                  # partition
        [                   # messages
            "The first message",
            "The second message",
            "The third message",
        ]
        );

    $producer->close;

    #-- Consumer
    my $consumer = Kafka::Consumer->new( IO => $io );

    # Get a list of valid offsets up max_number before the given time
    my $offsets;
    if ( $offsets = $consumer->offsets(
        "test",             # topic
        0,                  # partition
        TIMESTAMP_EARLIEST, # time
        DEFAULT_MAX_OFFSETS # max_number
        ) )
    {
        foreach my $offset ( @$offsets )
        {
            print "Received offset: $offset\n";
        }
    }
    if ( !$offsets or $consumer->last_error )
    {
        print
            "(", $consumer->last_errorcode, ") ",
            $consumer->last_error, "\n";
    }

    # Consuming messages
    if ( my $messages = $consumer->fetch(
        "test",             # topic
        0,                  # partition
        0,                  # offset
        DEFAULT_MAX_SIZE    # Maximum size of MESSAGE(s) to receive
        ) )
    {
        foreach my $message ( @$messages )
        {
            if( $message->valid )
            {
                print "payload    : ", $message->payload,       "\n";
                print "offset     : ", $message->offset,        "\n";
                print "next_offset: ", $message->next_offset,   "\n";
            }
            else
            {
                print "error      : ", $message->error,         "\n";
            }
        }
    }

    $consumer->close;

$io, $producer, and $consumer are created once when the application starts up.

DEPENDENCIES ^

In order to install and use this package you will need Perl version 5.10 or later. Some modules within this package depend on other packages that are distributed separately from Perl. We recommend that you have the following packages installed before you install Kafka:

    Params::Util
    String::CRC32

Kafka package has the following optional dependencies:

    Test::Pod
    Test::Pod::Coverage
    Test::Exception
    CPAN::Meta
    Test::Deep
    Test::Distribution
    Test::Kwalitee

If the optional modules are missing, some "prereq" tests are skipped.

BUGS AND LIMITATIONS ^

Currently, the package does not implement send and response of compressed messages. Also does not implement the MULTIFETCH and MULTIPRODUCE requests.

Use only one Kafka::Mock object at the same time (it has class variables for the exchange of TCP server processes).

The Kafka package was written, tested, and found working on recent Linux distributions.

There are no known bugs in this package.

Please report problems to the "AUTHOR".

Patches are welcome.

MORE DOCUMENTATION ^

All modules contain detailed information on the interfaces they provide.

SEE ALSO ^

The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules

Kafka::IO - object interface to socket communications with the Apache Kafka server

Kafka::Producer - object interface to the producer client

Kafka::Consumer - object interface to the consumer client

Kafka::Message - object interface to the Kafka message properties

Kafka::Protocol - functions to process messages in the Apache Kafka's wire format

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems

Kafka::Mock - object interface to the TCP mock server for testing

A wealth of detail about the Apache Kafka and Wire Format:

Main page at http://incubator.apache.org/kafka/

Wire Format at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/

Writing a Driver for Kafka at http://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka

AUTHOR ^

Sergey Gladkov, <sgladkov@trackingsoft.com>

CONTRIBUTORS ^

Alexander Solovey

Jeremy Jordan

Vlad Marchenko

COPYRIGHT AND LICENSE ^

Copyright (C) 2012-2013 by TrackingSoft LLC. All rights reserved.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

syntax highlighting: