
Kafka - constants and messages used by the Kafka package modules

This documentation refers to Kafka package version 0.12

An example of Kafka usage:
use Kafka qw(
BITS64
KAFKA_SERVER_PORT
DEFAULT_TIMEOUT
TIMESTAMP_EARLIEST
DEFAULT_MAX_OFFSETS
DEFAULT_MAX_SIZE
);
# common information
print "This is Kafka package $Kafka::VERSION\n";
print "You have a ", BITS64 ? "64" : "32", " bit system\n";
use Kafka::IO;
# connect to local server with the defaults
my $io = Kafka::IO->new( host => "localhost" );
# decoding of the error code
unless ( $io )
{
print STDERR "last error: ",
$Kafka::ERROR[Kafka::IO::last_errorcode], "\n";
}
To see a brief but working code example of the Kafka package usage look at the "An Example" section.

The Kafka package is a set of Perl modules which provides a simple and consistent application programming interface (API) to Apache Kafka 0.7, a high-throughput distributed messaging system. This is a low-level API implementation which DOES NOT interract with an Apache ZooKeeper for consumer coordination and/or load balancing.

The user modules in this package provide an object oriented API. The IO agents, requests sent, and responses received from the Apache Kafka or mock servers are all represented by objects. This makes a simple and powerful interface to these services.
The main features of the package are:

The Kafka package is based on Kafka's 0.7 Wire Format specification document at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/
Communication with Kafka always assumes to follow these steps: First the IO and client objects are created and configured.
The Kafka client has the class name Kafka::Producer or Kafka::Consumer.
Messages are the fundamental unit of communication. They are published to a topic by a producer, which means they are physically sent to a server acting as a broker. Some number of consumers subscribe to a topic, and each published message is delivered to all the consumers. The messages stream is partitioned on the brokers as a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition the messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order. In Apache Kafka, the consumers are responsible for maintaining state information (offset) on what has been consumed. A consumer can deliberately rewind back to an old offset and re-consume data. Each message is uniquely identified by a 64-bit integer offset giving the byte position of the start of this message in the stream of all messages ever sent to that topic on that partition. Reads are done by giving the 64-bit logical offset of a message and a max chunk size.
The request is then passed through the client to a server and we get the response in return to a consumer request that we can examine. A request is always independent of any previous requests, i.e. the service is stateless. This API is completely stateless, with the topic and partition being passed in on every request.
The clients uses the IO object to communicate with the Apache Kafka server. The IO object is an interface layer between your application code and the network.
IO object is required to create objects of classes Kafka::Producer and Kafka::Consumer.
Kafka IO API is implemented by Kafka::IO class.
use Kafka::IO;
# connect to local server with the defaults
my $io = Kafka::IO->new( host => "localhost" );
The main attributes of the IO object are:
Kafka producer API is implemented by Kafka::Producer class.
use Kafka::Producer;
#-- Producer
my $producer = Kafka::Producer->new( IO => $io );
# Sending a single message
$producer->send(
"test", # topic
0, # partition
"Single message" # message
);
# Sending a series of messages
$producer->send(
"test", # topic
0, # partition
[ # messages
"The first message",
"The second message",
"The third message",
]
);
The main attributes of the producer request are:
send().Kafka consumer API is implemented by Kafka::Consumer class.
use Kafka::Consumer;
$consumer = Kafka::Consumer->new( IO => $io );
The request methods of the consumer object are offsets() and fetch().
offsets method returns a reference to the list of offsets of received messages.
fetch method returns a reference to the list of received Kafka::Message objects.
# Get a list of valid offsets up to max_number before the given time
if ( my $offsets = $consumer->offsets(
"test", # topic
0, # partition
TIMESTAMP_EARLIEST, # time
DEFAULT_MAX_OFFSETS # max_number
) )
{
foreach my $offset ( @$offsets )
{
print "Received offset: $offset\n";
}
}
# Consuming messages
if ( my $messages = $consumer->fetch(
"test", # topic
0, # partition
0, # offset
DEFAULT_MAX_SIZE # max_size
) )
{
foreach my $message ( @$messages )
{
if( $message->valid )
{
print "payload : ", $message->payload, "\n";
print "offset : ", $message->offset, "\n";
print "next_offset: ", $message->next_offset, "\n";
}
}
}
The arguments:
Kafka message API is implemented by Kafka::Message class.
if( $message->valid )
{
print "payload : ", $message->payload, "\n";
print "offset : ", $message->offset, "\n";
print "next_offset: ", $message->next_offset, "\n";
}
else
{
print "error : ", $message->error, "\n";
}
Available methods of Kafka::Message object are:
payload A simple message received from the Apache Kafka server.valid A message entry is valid if the CRC32 of the message payload matches to the CRC stored with the message.error A description of the message inconsistence (currently only for message is not valid or compressed).offset The offset beginning of the message in the Apache Kafka server.next_offset The offset beginning of the next message in the Apache Kafka server.Both Kafka::Producer and Kafka::Consumer objects described above also have the following common methods:
RaiseError is a method which causes Kafka to die if an error is detected.last_errorcode and last_error diagnostic methods. Use them to get detailed error message if server or the resource might not be available, access to the resource might be denied, or other things might have failed for some reason.close method: terminates connection with Kafka and clean up.
my $producer = Kafka::Producer->new(
IO => $io,
RaiseError => 1
);
unless ( $producer->send( "test", 0, "Single message" )
{
print
"error code : ", $producer->last_errorcode, "\n",
"error description: ", $producer->last_error, "\n";
}
$producer->close;
None by default.
Additional constants are available for import, which can be used to define some type of parameters, and to identify various error cases.
These are the defaults:
KAFKA_SERVER_PORTdefault Apache Kafka server port - 9092.
DEFAULT_TIMEOUTtimeout in secs, for gethostbyname, connect, blocking receive and send calls (could be any integer or floating-point type) - 0.5 sec.
TIMESTAMP_LATESTtimestamp of the offsets before this time (ms) special value -1 : latest
TIMESTAMP_EARLIESTtimestamp of the offsets before this time (ms) special value -2 : earliest
DEFAULT_MAX_SIZEmaximum size of message(s) to receive - 1MB
DEFAULT_MAX_OFFSETSmaximum number of offsets to retrieve - 100
MAX_SOCKET_REQUEST_BYTESThe maximum size of a request that the socket server will accept (protection against OOM). Default limit (as configured in server.properties) is 104857600
Possible error codes returned by last_errorcode method (complies with an array of descriptions @Kafka::ERROR):
ERROR_INVALID_MESSAGE_CODE0 - Invalid message
ERROR_MISMATCH_ARGUMENT1 - Mismatch argument
ERROR_WRONG_CONNECT2 - You must configure a host to connect to!
ERROR_CANNOT_SEND3 - Can't send
ERROR_CANNOT_RECV4 - Can't receive
ERROR_CANNOT_BIND5 - Can't bind
ERROR_CHECKSUM_ERROR6 - Checksum error
ERROR_COMPRESSED_PAYLOAD7 - Compressed payload
ERROR_NUMBER_OF_OFFSETS7 - Amount received offsets does not match 'NUMBER of OFFSETS'
ERROR_NOTHING_RECEIVE8 - Nothing to receive
ERROR_IN_ERRORCODE9 - Response contains an error in 'ERROR_CODE'
Support for working with 64 bit elements of the Kafka Wire Format protocol on 32 bit systems:
BITS64Know you are working on 64 or 32 bit system
@Kafka::ERRORContain the descriptions for possible error codes returned by last_errorcode methods and functions of the package modules.
%Kafka::ERROR_CODEContain the descriptions for possible error codes in the ERROR_CODE box of Apache Kafka Wire Format protocol responses.
use Kafka qw(
KAFKA_SERVER_PORT
DEFAULT_TIMEOUT
TIMESTAMP_EARLIEST
DEFAULT_MAX_OFFSETS
DEFAULT_MAX_SIZE
);
use Kafka::IO;
use Kafka::Producer;
use Kafka::Consumer;
#-- IO
my $io = Kafka::IO->new( host => "localhost" );
#-- Producer
my $producer = Kafka::Producer->new( IO => $io );
# Sending a single message
$producer->send(
"test", # topic
0, # partition
"Single message" # message
);
# Sending a series of messages
$producer->send(
"test", # topic
0, # partition
[ # messages
"The first message",
"The second message",
"The third message",
]
);
$producer->close;
#-- Consumer
my $consumer = Kafka::Consumer->new( IO => $io );
# Get a list of valid offsets up max_number before the given time
my $offsets;
if ( $offsets = $consumer->offsets(
"test", # topic
0, # partition
TIMESTAMP_EARLIEST, # time
DEFAULT_MAX_OFFSETS # max_number
) )
{
foreach my $offset ( @$offsets )
{
print "Received offset: $offset\n";
}
}
if ( !$offsets or $consumer->last_error )
{
print
"(", $consumer->last_errorcode, ") ",
$consumer->last_error, "\n";
}
# Consuming messages
if ( my $messages = $consumer->fetch(
"test", # topic
0, # partition
0, # offset
DEFAULT_MAX_SIZE # Maximum size of MESSAGE(s) to receive
) )
{
foreach my $message ( @$messages )
{
if( $message->valid )
{
print "payload : ", $message->payload, "\n";
print "offset : ", $message->offset, "\n";
print "next_offset: ", $message->next_offset, "\n";
}
else
{
print "error : ", $message->error, "\n";
}
}
}
$consumer->close;
$io, $producer, and $consumer are created once when the application starts up.

In order to install and use this package you will need Perl version 5.10 or later. Some modules within this package depend on other packages that are distributed separately from Perl. We recommend that you have the following packages installed before you install Kafka:
Params::Util
String::CRC32
Kafka package has the following optional dependencies:
Test::Pod
Test::Pod::Coverage
Test::Exception
CPAN::Meta
Test::Deep
Test::Distribution
Test::Kwalitee
If the optional modules are missing, some "prereq" tests are skipped.

Currently, the package does not implement send and response of compressed messages. Also does not implement the MULTIFETCH and MULTIPRODUCE requests.
Use only one Kafka::Mock object at the same time (it has class variables for the exchange of TCP server processes).
The Kafka package was written, tested, and found working on recent Linux distributions.
There are no known bugs in this package.
Please report problems to the "AUTHOR".
Patches are welcome.

All modules contain detailed information on the interfaces they provide.

The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules
Kafka::IO - object interface to socket communications with the Apache Kafka server
Kafka::Producer - object interface to the producer client
Kafka::Consumer - object interface to the consumer client
Kafka::Message - object interface to the Kafka message properties
Kafka::Protocol - functions to process messages in the Apache Kafka's wire format
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems
Kafka::Mock - object interface to the TCP mock server for testing
A wealth of detail about the Apache Kafka and Wire Format:
Main page at http://incubator.apache.org/kafka/
Wire Format at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/
Writing a Driver for Kafka at http://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka

Sergey Gladkov, <sgladkov@trackingsoft.com>

Alexander Solovey
Jeremy Jordan
Vlad Marchenko

Copyright (C) 2012-2013 by TrackingSoft LLC. All rights reserved.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.