The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use strict;
use warnings;
use Test::More;
use AnyEvent;
use AnyMQ;
use AnyMQ::Queue;

my $tests = 2;

my $sequence = 0;

sub do_test {
    my ( $channel, $client ) = @_;
    my $seq = ++$sequence;
    my @send_events = ( { data1 => $seq }, { data2 => $seq }, { data3 => $seq } );

    my $cv = AE::cv;
    my $t  = AE::timer 5, 0, sub { $cv->croak( "timeout" ); };

    my $pub = AnyMQ->topic( $channel );
    my $sub = AnyMQ->new_listener( $pub );
    $sub->on_error(sub {
                       my ($queue, $error, @msg) = @_;
                       $queue->persistent(0);
                       $queue->append(@msg);
                   });
    $sub->poll(sub {
                   if ($_[0]{data2}) {
                       die "poll fail";
                   }
                   is $_[0]{data1}, $seq
               });
    $sub->timeout(1);
    $sub->on_timeout(sub {
                       isa_ok($_[0], 'AnyMQ::Queue');
                       ok(1, 'timeout triggered after downgrade');
                       like($_[1], qr'timeout');
                       $cv->send(1);
                   },
               );
    # Publish events before the client has connected.
    $pub->publish( $_ ) for @send_events;
    $cv->recv;
    ok( !$sub->destroyed );
    $cv = AE::cv;
    $sub->on_timeout(undef);
    $sub->poll_once(sub {
                        my @events = @_;
                        is_deeply \@events, [@send_events[1,2]], "got events";
                        $cv->send;
                    });
    $cv->recv;

}

plan tests => 6 * $tests;
do_test( 'comet', 'client_id' ) for 1 .. $tests;