The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Kafka::Message;

use 5.010;
use strict;
use warnings;

our $VERSION = '0.12';

use Carp;
use Params::Util qw( _HASH );

use Kafka qw(
    ERROR_MISMATCH_ARGUMENT
    );

sub new {
    my $class   = shift;
    my $self    = _HASH( shift ) or confess $Kafka::ERROR[ERROR_MISMATCH_ARGUMENT];

    map {
        confess $Kafka::ERROR[ERROR_MISMATCH_ARGUMENT]
            unless( exists( $self->{ $_ } ) and defined( $self->{ $_ } ) )
        } qw( payload valid error offset next_offset );

    bless $self, $class;

    return $self;
}

sub payload {
    my $self = shift;

    return $self->{payload};
}

sub valid {
    my $self = shift;

    return $self->{valid};
}

sub error {
    my $self = shift;

    return $self->{error};
}

sub offset {
    my $self = shift;

    return $self->{offset};
}

sub next_offset {
    my $self = shift;

    return $self->{next_offset};
}

1;

__END__

=head1 NAME

Kafka::Message - object interface to the Kafka message properties

=head1 VERSION

This documentation refers to C<Kafka::Message> version 0.12

=head1 SYNOPSIS

The Kafka L<Consumer|Kafka::Consumer> response has an ARRAY reference type.
For the C<fetch> response array has the class name C<Kafka::Message> elements.

    # Consuming messages
    my $messages = $consumer->fetch(
        "test",             # topic
        0,                  # partition
        0,                  # offset
        DEFAULT_MAX_SIZE    # Maximum size of MESSAGE(s) to receive
        );
    if ( $messages )
    {
        foreach my $message ( @$messages )
        {
            if( $message->valid )
            {
                print "payload    : ", $message->payload,       "\n";
                print "offset     : ", $message->offset,        "\n";
                print "next_offset: ", $message->next_offset,   "\n";
            }
            else
            {
                print "error      : ", $message->error,         "\n";
            }
        }
    }

=head1 DESCRIPTION

L<Kafka|Kafka> message API is implemented by L<Kafka::Message|Kafka::Message> class.

The C<Kafka::Message> module in L<Kafka|Kafka> package provides an object
oriented access to the message properties.
Reference to an array of objects of class C<Kafka::Message> returned by the
C<fetch> method of the L<Consumer|Kafka::Consumer> client.
Package L<Kafka|Kafka> C<Kafka::Message> class is not otherwise used.

The main features of the C<Kafka::Message> class are:

=over 3

=item *

Provides representing the Apache Kafka Wire Format MESSAGE structure (with
no compression codec attribute now). Description of the structure is available at
L<http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/>

=item *

Support for working with 64 bit elements on 32 bit systems.
C<offset> and C<next_offset> methods return the
L<Math::BigInt|Math::BigInt> integer on 32 bit systems.

=back

=head2 CONSTRUCTOR

=head3 C<new ( \%arg )>

Creates a C<Kafka::Message>, which is a newly created message object.
C<new()> takes an argument, this argument is a HASH reference with the currently
used L<methods|/METHODS> entries.

Returns the created message as a C<Kafka::Message> object, or error will
cause the program to halt (C<confess>) if the argument is not a valid HASH
reference.

=head2 METHODS

The following methods are available for each C<Kafka::Message> object and are
specific to that object and the method calls invoked on it.

The available methods for objects of the C<Kafka::Message>
class are:

=head3 C<payload>

A simple message received from the Apache Kafka server.

=head3 C<valid>

A message entry is valid if the CRC32 of the message C<payload> matches
to the CRC stored with the message.

=head3 C<error>

A description of the message inconsistence (currently only for when
message is not valid or is compressed).

=head3 C<offset>

The offset beginning of the message in the Apache Kafka server.

=head3 C<next_offset>

The offset beginning of the next message in the Apache Kafka server.

=head1 DIAGNOSTICS

C<Kafka::Message> is not a user module and any L<constructor|/CONSTRUCTOR> error
is FATAL.
FATAL errors will cause the program to halt (C<confess>), since the
problem is so severe that it would be dangerous to continue. (This
can always be trapped with C<eval>. Under the circumstances, dying is the best
thing to do).

=over 3

=item C<Mismatch argument>

This means that you didn't give the right argument to a C<new>
L<constructor|/CONSTRUCTOR>, i.e. not a raw and unblessed HASH reference,
or a HASH key doesn't have valid L<methods|/METHODS> name, or not C<defined>
value.

=back

=head1 SEE ALSO

The basic operation of the Kafka package modules:

L<Kafka|Kafka> - constants and messages used by the Kafka package modules

L<Kafka::IO|Kafka::IO> - object interface to socket communications with
the Apache Kafka server

L<Kafka::Producer|Kafka::Producer> - object interface to the producer client

L<Kafka::Consumer|Kafka::Consumer> - object interface to the consumer client

L<Kafka::Message|Kafka::Message> - object interface to the Kafka message
properties

L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
Apache Kafka's wire format

L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
protocol on 32 bit systems 

L<Kafka::Mock|Kafka::Mock> - object interface to the TCP mock server for testing

A wealth of detail about the Apache Kafka and Wire Format:

Main page at L<http://incubator.apache.org/kafka/>

Wire Format at L<http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/>

Writing a Driver for Kafka at
L<http://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka>

=head1 AUTHOR

Sergey Gladkov, E<lt>sgladkov@trackingsoft.comE<gt>

=head1 CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Vlad Marchenko

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2012-2013 by TrackingSoft LLC.
All rights reserved.

This package is free software; you can redistribute it and/or modify it under
the same terms as Perl itself. See I<perlartistic> at
L<http://dev.perl.org/licenses/artistic.html>.

This program is
distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.

=cut