The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use strict;
use Test::More;

my $messages;
BEGIN
{
    eval { require Cache::Memcached };
    if ($@) {
        plan(skip_all => "Cache::Memcached not installed");
    } elsif (! $ENV{MVALVE_Q4M_DSN} ) {
        plan(skip_all => "Define MVALVE_Q4M_DSN to run this test");
    } else {
        $messages = $ENV{MVALVE_MESSAGE_COUNT} || 32;
        plan(tests => 3 + 3 * $messages);
    }

    $ENV{MEMCACHED_SERVERS} ||= '127.0.0.1:11211';
    $ENV{MEMCACHED_NAMESPACE} ||= join('_', __FILE__, $$, {}, rand());
    $ENV{MEMCACHED_SERVERS} = [
        split(/\s*,\s*/, $ENV{MEMCACHED_SERVERS}) ];

    use_ok("Mvalve::Reader");
    use_ok("Mvalve::Writer");
}

{
    my %q_config = (
        args => {
            connect_info => [ 
                $ENV{MVALVE_Q4M_DSN},
                $ENV{MVALVE_Q4M_USERNAME},
                $ENV{MVALVE_Q4M_PASSWORD},
                { RaiseError => 1, AutoCommit => 1 },
            ]
        }
    );

    my $writer = Mvalve::Writer->new( queue => \%q_config );
    my $reader = Mvalve::Reader->new(
        timeout   => 1,
        throttler => {
            module    => 'Data::Valve',
            args => {
                max_items => 1,
                interval  => 1.4,
            }
        },
        state => {
            module => 'Memcached',
            args   => {
                memcached => {
                    servers => $ENV{MEMCACHED_SERVERS},
                    namespace => $ENV{MEMCACHED_NAMESPACE},
                }
            }
        },
        queue => \%q_config
    );
    $reader->clear_all;

    my $count = $messages;
    diag( "Generating $count messages...." );
    my %messages;
    for my $i (1..$count) {
        my $message = Mvalve::Message->new(
            headers => {
                'X-Mvalve-Destination' => 'test'
            },
            content => $i,
        );
        ok( $writer->insert( message => $message ), "insert data $i");
        $messages{ $message->id } = $message;
    }

    {
        my $message = $reader->next;
        ok( $message, 'first message should not be throttled' );
        if ($message) {
            delete $messages{ $message->id };
        }

        $count--;
        for my $i (1..$count) {
            my $message = $reader->next;
            ok( ! $message, "subsequent messages should be throttled" );
        }
    }

    diag("Going to receive messages as they are being throttled. This may take a few moments...");

    {
        my $i = 0;
        while ($i < $count) {
            my $message = $reader->next;
            next unless $message;

            ok( delete $messages{ $message->id }, "Deleting a proper (unhandled) message");
            $i++;
            
        }
        is( $i, $count, "count matches" );
        is (keys %messages, 0, "consumed all messages");
    }
}