The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

use 5.010;
use strict;
use warnings;

use lib qw(
    lib
    t/lib
    ../lib
);

use Test::More;

BEGIN {
    eval 'use Test::NoWarnings';    ## no critic
    plan skip_all => 'because Test::NoWarnings required for testing' if $@;
}

plan 'no_plan';

use Const::Fast;
#use Data::Dumper;
use Params::Util qw(
    _STRING
);

use Kafka qw(
    $BLOCK_UNTIL_IS_COMMITTED
    $COMPRESSION_NONE
    $DEFAULT_MAX_BYTES
    $DEFAULT_MAX_WAIT_TIME
    $KAFKA_SERVER_PORT
    $DEFAULT_MAX_NUMBER_OF_OFFSETS
    $MIN_BYTES_RESPOND_HAS_DATA
    $NOT_SEND_ANY_RESPONSE
    $RECEIVE_EARLIEST_OFFSET
    $RECEIVE_LATEST_OFFSETS
    $REQUEST_TIMEOUT
    $WAIT_WRITTEN_TO_LOCAL_LOG
);
use Kafka::Internals qw(
    $APIKEY_PRODUCE
    $APIKEY_FETCH
    $APIKEY_OFFSET
    $APIKEY_METADATA
    $PRODUCER_ANY_OFFSET
);
use Kafka::MockIO;
use Kafka::MockProtocol qw(
    encode_fetch_response
);
use Kafka::Protocol qw(
    decode_fetch_response
    decode_metadata_response
    decode_offset_response
    decode_produce_response
    encode_fetch_request
    encode_metadata_request
    encode_offset_request
    encode_produce_request
);

const my $TOPIC             => 'mytopic';
# Use Kafka::MockIO only with the following information:
const my $PARTITION         => $Kafka::MockIO::PARTITION;

my ( $io, $decoded_request, $encoded_request, $decoded_response, $encoded_response, $len );

sub fulfill_request {
    $len = $io->send( $encoded_request );
    is $len, length $encoded_request, 'request sent correctly';
    $len = 4;
    undef $encoded_response;
    $encoded_response = $io->receive( $len );
    $len = unpack( 'l>', $$encoded_response );
    $$encoded_response .= ${ $io->receive( $len ) };
    ok _STRING( $$encoded_response ), 'response received';
}

Kafka::MockIO::override();

$io = Kafka::IO->new(
    host        => 'localhost',
    port        => $KAFKA_SERVER_PORT,
    timeout     => $REQUEST_TIMEOUT,
    );
isa_ok( $io, 'Kafka::IO' );

#Kafka::IO->debug_level( 1 );

#-- Special cases --------------------------------------------------------------

# a decoded fetch request
$decoded_request = {
    ApiKey                              => $APIKEY_FETCH,
    CorrelationId                       => 0,
    ClientId                            => 'console-consumer-25555',
    MaxWaitTime                         => 100,
    MinBytes                            => $MIN_BYTES_RESPOND_HAS_DATA,
    topics                              => [
        {
            TopicName                   => 'mytopic',
            partitions                  => [
                {
                    Partition           => 0,
                    FetchOffset         => 0,
                    MaxBytes            => 1_048_576,
                },
            ],
        },
    ],
};
$encoded_request = encode_fetch_request( $decoded_request );

# a decoded fetch response
$decoded_response = {
    CorrelationId                           => 0,
    topics                                  => [
        {
            TopicName                       => 'mytopic',
            partitions                      => [
                {
                    Partition               => 0,
                    ErrorCode               => 0,
                    HighwaterMarkOffset     => 2,
                    MessageSet              => [
                        {
                            Offset          => 0,
                            MagicByte       => 0,
                            Attributes      => 0,
                            Key             => q{},
                            Value           => 'Hello!',
                        },
                        {
                            Offset          => 1,
                            MagicByte       => 0,
                            Attributes      => 0,
                            Key             => q{},
                            Value           => 'Hello, World!',
                        },
                    ],
                },
            ],
        },
    ],
};
$encoded_response = encode_fetch_response( $decoded_response );

is scalar( keys %{ Kafka::MockIO::special_cases() } ), 0, 'special case not present';
Kafka::MockIO::add_special_case( { $encoded_request => $encoded_response } );
is scalar( keys %{ Kafka::MockIO::special_cases() } ), 1, 'special case present';

fulfill_request();
is_deeply( decode_fetch_response( $encoded_response ), $decoded_response, 'decoded correctly' );

Kafka::MockIO::del_special_case( $encoded_request );
is scalar( keys %{ Kafka::MockIO::special_cases() } ), 0, 'special case deleted';

#-- MetadataRequest ------------------------------------------------------------

# a decoded metadata request
$decoded_request = {
    ApiKey                              => $APIKEY_METADATA,
    CorrelationId                       => 0,
    ClientId                            => 'console-consumer-25555',
    topics                              => [
                                        $TOPIC,
    ],
};

$encoded_request = encode_metadata_request( $decoded_request );
fulfill_request();
$decoded_response = decode_metadata_response( $encoded_response );

#-- ProduceRequest --------------------------------------------------------------

# a decoded produce request
$decoded_request = {
    ApiKey                              => $APIKEY_PRODUCE,
    CorrelationId                       => 4,
    ClientId                            => q{},
    RequiredAcks                        => $NOT_SEND_ANY_RESPONSE,
    Timeout                             => int( $REQUEST_TIMEOUT * 1000 ),
    topics                              => [
        {
            TopicName                   => $TOPIC,
            partitions                  => [
                {
                    Partition           => $PARTITION,
                    MessageSet              => [
                        {
                            Offset          => $PRODUCER_ANY_OFFSET,
                            MagicByte       => 0,
                            Attributes      => $COMPRESSION_NONE,
                            Key             => q{},
                            Value           => 'Hello!',
                        },
                    ],
                },
            ],
        },
    ],
};

$encoded_request = encode_produce_request( $decoded_request );
fulfill_request();
$decoded_response = decode_produce_response( $encoded_response );

#-- FetchRequest ---------------------------------------------------------------

# a decoded fetch request
$decoded_request = {
    ApiKey                              => $APIKEY_FETCH,
    CorrelationId                       => 0,
    ClientId                            => 'console-consumer-25555',
    MaxWaitTime                         => int( $DEFAULT_MAX_WAIT_TIME * 1000 ),
    MinBytes                            => $MIN_BYTES_RESPOND_HAS_DATA,
    topics                              => [
        {
            TopicName                   => $TOPIC,
            partitions                  => [
                {
                    Partition           => $PARTITION,
                    FetchOffset         => 0,
                    MaxBytes            => $DEFAULT_MAX_BYTES,
                },
            ],
        },
    ],
};

$encoded_request = encode_fetch_request( $decoded_request );
fulfill_request();
$decoded_response = decode_fetch_response( $encoded_response );

#-- OffsetRequest --------------------------------------------------------------

# a decoded offset request
$decoded_request = {
    ApiKey                              => $APIKEY_OFFSET,
    CorrelationId                       => 0,
    ClientId                            => 'console-consumer-25555',
    topics                              => [
        {
            TopicName                   => $TOPIC,
            partitions                  => [
                {
                    Partition           => $PARTITION,
#                    Time                => $RECEIVE_EARLIEST_OFFSET,
                    Time                => $RECEIVE_LATEST_OFFSETS,
                    MaxNumberOfOffsets  => $DEFAULT_MAX_NUMBER_OF_OFFSETS,
                },
            ],
        },
    ],
};

$encoded_request = encode_offset_request( $decoded_request );
fulfill_request();
$decoded_response = decode_offset_response( $encoded_response );

$io->close;

Kafka::MockIO::restore();