The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use strict;
use warnings;
use Test::More;

use AnyEvent;
use Message::Passing::Input::Redis;
use Message::Passing::Output::Redis;
use Message::Passing::Output::Test;

my $cv = AnyEvent->condvar;
my $input = Message::Passing::Input::Redis->new(
    hostname => "127.0.0.1",
    topics => "log_stash.test",
    output_to => Message::Passing::Output::Test->new(
        cb => sub { $cv->send }
    ),
);

my $output = Message::Passing::Output::Redis->new(
    topic => "log_stash.test",
    hostname => "127.0.0.1",
);

my $this_cv = AnyEvent->condvar;
my $timer; $timer = AnyEvent->timer(after => 2, cb => sub {
    undef $timer;
    $this_cv->send;
});
$this_cv->recv;
$output->consume('bar');
$cv->recv;

is $input->output_to->message_count, 1;
is_deeply([$input->output_to->messages], ['bar']);

my $other_output = Message::Passing::Output::Redis->new(
    topic => "log_stash.foo",
    hostname => "127.0.0.1",
);

$cv = AnyEvent->condvar;
my $other_input = Message::Passing::Input::Redis->new(
    ptopics => "log_stash.*",
    output_to => Message::Passing::Output::Test->new(
        cb => sub { $cv->send }
    ),
    hostname => "127.0.0.1",
);

$this_cv = AnyEvent->condvar;
$timer = AnyEvent->timer(after => 2, cb => sub {
    undef $timer;
    $this_cv->send;
});
$this_cv->recv;
$timer = AnyEvent->timer(after => 10, cb => sub {
    undef $timer;
    fail "Timed out";
    $cv->throw;
});
$output->consume('quux');
$other_output->consume('fnord');
$cv->recv;

is $input->output_to->message_count, 2;
is_deeply([$input->output_to->messages], ['bar', 'quux']);

is $other_input->output_to->message_count, 2;
is_deeply([$other_input->output_to->messages], ['quux', 'fnord']);

done_testing;