The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use Test::More;
use Test::Exception;

my %conf = (
    host  => 'localhost',
    port  => 5672,
    user  => 'guest',
    pass  => 'guest',
    vhost => '/',
);

eval {
    use IO::Socket::INET;

    my $socket = IO::Socket::INET->new(
        Proto    => 'tcp',
        PeerAddr => $conf{host},
        PeerPort => $conf{port},
        Timeout  => 1,
    ) or die 'Error connecting to AMQP Server!';

    close $socket;
};

plan skip_all => 'Connection failure: '
               . $conf{host} . ':' . $conf{port} if $@;
plan tests => 1;

use AnyEvent::RabbitMQ;

my $ar = connect_ar();

my @queues = map {
    my $ch = open_channel($ar);
    my $queue = 'test_q' . $_;
    declare_queue($ch, $queue,);

    my $done = AnyEvent->condvar;
    consume($ch, $queue, sub {
        my $response = shift;
        return if 'stop' ne $response->{body}->payload;
        $done->send();
    });
    {name => $queue, cv => $done};
} (1..5);

my $ch = open_channel($ar);
for my $queue (@queues) {
    publish($ch, $queue->{name}, 'hello');
    publish($ch, $queue->{name}, 'stop');
}

my $count = 0;
for my $queue (@queues) {
    $queue->{cv}->recv;
    $count++;
}

is($count, 5, 'consume count');

for my $queue (@queues) {
    delete_queue($ch, $queue->{name});
}

close_ar($ar);

sub connect_ar {
    my $done = AnyEvent->condvar;
    my $ar = AnyEvent::RabbitMQ->new()->load_xml_spec()->connect(
        (map {$_ => $conf{$_}} qw(host port user pass vhost)),
        timeout    => 1,
        on_success => sub {$done->send(1)},
        on_failure => sub {$done->send()},
        on_close   => \&handle_close,
    );
    die 'Connection failure' if !$done->recv;
    return $ar;
}

sub close_ar {
    my ($ar,) = @_;

    my $done = AnyEvent->condvar;
    $ar->close(
        on_success => sub {$done->send(1)},
        on_failure => sub {$done->send()},
    );
    die 'Close failure' if !$done->recv;

    return;
}

sub open_channel {
    my ($ar,) = @_;
    
    my $done = AnyEvent->condvar;
    $ar->open_channel(
        on_success => sub {$done->send(shift)},
        on_failure => sub {$done->send()},
        on_return  => sub {die 'Receive return'},
        on_close   => \&handle_close,
    );
    my $ch = $done->recv;
    die 'Open channel failure' if !$ch;

    return $ch;
}

sub declare_queue {
    my ($ch, $queue,) = @_;

    my $done = AnyEvent->condvar;
    $ch->declare_queue(
        queue      => $queue,
        on_success => sub {$done->send(1)},
        on_failure => sub {$done->send()},
    );
    die 'Declare queue failure' if !$done->recv;

    return;
}

sub delete_queue {
    my ($ch, $queue,) = @_;

    my $done = AnyEvent->condvar;
    $ch->delete_queue(
        queue      => $queue,
        on_success => sub {$done->send(1)},
        on_failure => sub {$done->send()},
    );
    die 'Delete queue failure' if !$done->recv;

    return;
}

sub consume {
    my ($ch, $queue, $handle_consume,) = @_;

    my $done = AnyEvent->condvar;
    $ch->consume(
        queue      => $queue,
        on_success => sub {$done->send(1)},
        on_failure => sub {$done->send()},
        on_consume => $handle_consume,
    );
    die 'Consume failure' if !$done->recv;

    return;
}

sub publish {
    my ($ch, $queue, $message,) = @_;

    $ch->publish(
        routing_key => $queue,
        body        => $message,
        mandatory   => 1,
    );

    return;
}

sub handle_close {
    my $method_frame = shift->method_frame;
    die $method_frame->reply_code, $method_frame->reply_text
      if $method_frame->reply_code;
}