The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use Test::More;
use Time::HiRes 'time';
use strict;
use AnyEvent;
use AnyMQ;
use AnyEvent::RabbitMQ;

my $bus = AnyMQ->new_with_traits(traits => ['AMQP'],
                                 host   => 'localhost',
                                 port   => 5672,
                                 user   => 'guest',
                                 pass   => 'guest',
                                 vhost  => '/',
                                 exchange => 'foo',
                             );

my $bus2 = AnyMQ->new_with_traits(traits => ['AMQP'],
                                 host   => 'localhost',
                                 port   => 5672,
                                 user   => 'guest',
                                 pass   => 'guest',
                                 vhost  => '/',
                                 exchange => 'foo',
                             );

my $ext_channel = $bus2->_rf_channel;

is($bus->host, 'localhost');

#$cv->recv;
$bus->cv(AE::cv);
my $test = $bus->topic({ name => 'test_q', publisher_only => 0});
$bus->cv->recv;

my $q = AE::cv;
my $q2 = AE::cv;
my $c2 = AE::cv;
$c2->begin(sub { $q2->send([1])});
$c2->begin;

my $client = $bus->new_listener; $client->subscribe($test);
my $cnt = 0;
$client->poll( sub {
                   my $timestamp = shift->{time};
                   ++$cnt;
                   diag "client latency($cnt): ".(time() - $timestamp);
                   if ($cnt == 10) {
                       $q->send([1]);
                   }
                   elsif ($cnt > 10) {
                       $c2->end;
                   }
                   elsif ($cnt > 11) {
                       diag "something is wrong: ".$cnt;
                       $q2->send([0, 'extra message received.']);
                   }
               });

# messages coming to the exchange should be bound to our queue
$ext_channel->publish( routing_key => 'test_q',
                       exchange => 'foo',
                       body => JSON::to_json({ time => time() }),
                   ) for 1..10;

my $w; $w = AnyEvent->timer( after => 5,
                             cb => sub {
                                 $q->send([0, 'test timeout']);
                             } );
my ($ok, $msg) = @{$q->recv};
ok($ok, $msg);

$bus2->cv(AE::cv);
my $test2 = $bus2->topic({name => 'test_q', publisher_only => 0});
$bus2->cv->recv;

my $client2 = $bus2->new_listener; $client2->subscribe($test2);

$client2->poll( sub {
                    my $timestamp = shift->{time};
                    diag "client2 latency: ".(time() - $timestamp);
                    $c2->end;
                });

$test->publish({ time => time() });

$w = AnyEvent->timer( after => 5,
                      cb => sub {
                          $q2->send([0, 'test timeout']);
                      } );

($ok, $msg) = @{$q2->recv};
ok($ok, $msg);

undef $bus; undef $bus2;

done_testing;