View on
MetaCPAN is shutting down
For details read Perl NOC. After June 25th this page will redirect to
Sergey Gladkov > Kafka-0.12 > Kafka::Producer



Annotate this POD


Open  0
View/Report Bugs
Module Version: 0.12   Source   Latest Release: Kafka-1.07


Kafka::Producer - object interface to the producer client


This documentation refers to Kafka::Producer version 0.12


Setting up:

    #-- IO
    use Kafka::IO;
    my $io;
    $io = Kafka::IO->new(
        host        => "localhost",
        port        => KAFKA_SERVER_PORT,
        timeout     => DEFAULT_TIMEOUT, # Optional,
                                        # default = DEFAULT_TIMEOUT
        RaiseError  => 0                # Optional, default = 0


    #-- Producer
    use Kafka::Producer;
    my $producer = Kafka::Producer->new(
        IO          => $io,
        RaiseError  => 0    # Optional, default = 0
    # Sending a single message
        "test",             # topic
        0,                  # partition
        "Single message"    # message
    unless ( $producer )
        die "(",
            Kafka::Producer::last_errorcode(), .") ",
            Kafka::Producer::last_error(), "\n";
    # Sending a series of messages
        "test",             # topic
        0,                  # partition
        [                   # messages
            "The first message",
            "The second message",
            "The third message",
    # Closes the producer and cleans up

Use only one Kafka::Producer object at the same time.


Kafka producer API is implemented by Kafka::Producer class.

The main features of the Kafka::Producer class are:



Creates new producer client object. Returns the created Kafka::Producer object.

An error will cause the program to halt or the constructor will return the undefined value, depending on the value of the RaiseError attribute.

You can use the methods of the Kafka::Producer class - "last_errorcode" and "last_error" for information about the error.

new() takes arguments in key-value pairs. The following arguments are currently recognized:

IO => $io

$io is the Kafka::IO object that allow you to communicate to the Apache Kafka server without using the Apache ZooKeeper service.

RaiseError => $mode

Optional, default = 0 .

An error will cause the program to halt if RaiseError is true: confess if the argument is not valid or die in the other error case (this can always be trapped with eval).

It must be a non-negative integer. That is, a positive integer, or zero.

You should always check for errors, when not establishing the RaiseError mode to true.


The following methods are defined for the Kafka::Producer class:

send( $topic, $partition, $messages )

Sends a messages (coded according to the Apache Kafka Wire Format protocol) on a Kafka::IO object socket.

Returns 1 if the message is successfully sent. If there's an error, returns the undefined value if the RaiseError is not true.

send() takes arguments. The following arguments are currently recognized:


The $topic must be a normal non-false string of non-zero length.


The $partition must be a non-negative integer (of any length). That is, a positive integer, or zero.


The $messages is an arbitrary amount of data (a simple data string or a reference to an array of the data strings).


The method to close the Kafka::Producer object and clean up.


This method returns an error code that specifies the position of the description in the @Kafka::ERROR array. Analysing this information can be done to determine the cause of the error.

The server or the resource might not be available, access to the resource might be denied or other things might have failed for some reason.


This method returns an error message that contains information about the encountered failure. Messages returned from this method may contain additional details and do not comply with the Kafka::ERROR array.


Look at the RaiseError description for additional information on error handeling.

The methods for the possible error to analyse: "last_errorcode" and more descriptive "last_error".

Mismatch argument

This means that you didn't give the right argument to a new constructor or to other method.

IO errors

Look at Kafka::IO DIAGNOSTICS section to obtain information about IO errors.

For more error description, always look at the message from the "last_error" method or from the Kafka::Producer::last_error class method.


The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules

Kafka::IO - object interface to socket communications with the Apache Kafka server

Kafka::Producer - object interface to the producer client

Kafka::Consumer - object interface to the consumer client

Kafka::Message - object interface to the Kafka message properties

Kafka::Protocol - functions to process messages in the Apache Kafka's wire format

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems

Kafka::Mock - object interface to the TCP mock server for testing

A wealth of detail about the Apache Kafka and Wire Format:

Main page at

Wire Format at

Writing a Driver for Kafka at


Sergey Gladkov, <>


Alexander Solovey

Jeremy Jordan

Vlad Marchenko


Copyright (C) 2012-2013 by TrackingSoft LLC. All rights reserved.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

syntax highlighting: