The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use strict;
use warnings;
use Test::More;

BEGIN {
    if ($^O eq 'MSWin32') {
        plan skip_all => 'Tests hang on Windows :(';
    } else {
        plan tests => 126;
    }
}

use File::Path;
use POE;
use POE::Session;
use YAML; # for Dump!
use lib 't/lib';

# We -will- get recursion warnings the way these tests are written - if you
# think there may be a bug involving runaway recursion, comment this out.
$SIG{__WARN__} = sub {
	my $m = shift;
	return if($m =~ m/recursion/i);
	warn $m;
};

BEGIN {
	my $prefix = 'POE::Component::MessageQueue';
	use_ok("POE::Component::MessageQueue::Test::EngineMaker");
	use_ok("POE::Component::MessageQueue::Test::ForkRun");
	require_ok("${prefix}::Message");
	require_ok("${prefix}::Logger");
	require_ok($_) foreach map { engine_package($_) } engine_names();
	require_ok("${prefix}::Storage::Default");
}
END {
	rmtree(DATA_DIR);	
}

my $remote = start_fork(sub {
	use POE::Component::MessageQueue::Storage::Remote::Server;
	POE::Component::MessageQueue::Storage::Remote::Server->new(port => 9321);
});
ok($remote, "Remote storage engine started.");

my $next_id = 0;
my $when = time();
my @destinations = map {"/queue/$_"} qw(foo bar baz grapefruit);
my %messages = map {
	my $destination = $_;
	map {(++$next_id, POE::Component::MessageQueue::Message->new(
		id          => $next_id,
		timestamp   => ++$when, # We'll fake it so there's a clear time order
		destination => $destination,
		persistent  => 1,
		body        => "I am the body of $next_id.\n".  
		               "I was created at $when.\n". 
		               "I am being sent to $destination.\n",
	))} (1..50);
} (@destinations);

sub message_is {
	my ($one, $two, $name) = @_;
	if(ref $one ne 'POE::Component::MessageQueue::Message') {
		return diag "message_is called with non-message argument: ".Dump($one);
	}
	return ok($one->equals($two), $name) or
	       diag("got: ", Dump($two), "\nexpected:", Dump($one), "\n");
}

sub run_in_order
{
	my ($tests, $done) = @_;
	if (my $test = shift(@$tests)) {
		$test->{'sub'}->(@{$test->{args}}, sub {
			$test->{callback}->(@_, sub {
				@_ = ($tests, $done);
				goto &run_in_order;
			});
		});
	}
	else {
		goto $done;
	}
}

sub disown_loop {
	my ($storage, $destination, $client, $done) = @_;

	if($client <= 50) {
		$storage->disown_destination($destination, $client, sub {
			@_ = ($storage, $destination, $client+1, $done);
			goto &disown_loop;
		});
	}
	else {
		goto $done;
	}
}

sub claim_test {
	my ($storage, $name, $destination, $count, $done) = @_;

	$storage->claim_and_retrieve($destination, $count, sub {
		if (my $message = $_[0]) {
			@_ = ($storage, $name, $destination, $count+1, $done);
			goto &claim_test;
		}
		else {
			is($count-1, 50, "$name: $destination");
			goto $done;
		}
	});	
}

sub destination_tests {
	my ($destinations, $storage, $name, $done) = @_;
	my $destination = pop(@$destinations) || goto $done;	

	claim_test($storage, "$name: claim_and_retrieve", $destination, 1, sub {
		disown_loop($storage, $destination, 1, sub {
			claim_test($storage, "$name: disown_destination", $destination, 1, sub {
				disown_loop($storage, $destination, 1, sub {
					@_ = ($destinations, $storage, $name, $done);
					goto &destination_tests;
				});
			});
		}); 	
	});
}

sub delay_tests {
	my ($storage, $name, $done) = @_;
	
	my $time = time();
	my $delay = 2;
	my $destination = '/queue/delay';
	my $client_id = 9876;

	my $message = POE::Component::MessageQueue::Message->new(
		id          => ++$next_id,
		timestamp   => $time,
		destination => '/queue/delay',
		persistent  => 1,
		deliver_at  => $time + $delay,
		body        => "I am the body of $next_id.\n".  
		               "I was created at $time.\n". 
		               "I am being sent to $destination.\n",
	);

	my $claim;
	$claim = sub {
		my ($cb) = shift;
		$storage->claim_and_retrieve($destination, $client_id, sub {
			my ($message) = @_;
			goto $cb if (defined $message);

			sleep 1;

			# and repeat..
			@_ = ($cb);
			goto $claim;
		});
	};

	$storage->store($message->clone, sub {
		$claim->(sub {
			my $received = time();
			ok($received >= ($time + $delay), "$name: message delayed $delay seconds");
			goto $done;
		});
	});
}

sub api_test {
	my ($storage, $name, $done) = @_;

	my $ordered_tests = [
		{ 
			'sub'      => sub { $storage->get_oldest(@_) }, 
			'args'     => [],
			'callback' => sub { 
				my $cb = pop;
				my $msg = shift;
				message_is($msg, $messages{'1'}, "$name: get_oldest");
				goto $cb;
			},
		}, 
		{
			'sub'      => sub { $storage->get(@_) }, 
			'args'     => ['20'],
			'callback' => sub { 
				my $cb = pop;
				my $msg = shift;
				message_is($msg, $messages{'20'}, "$name: get");
				goto $cb;
			},
		},
		{
			'sub'      => sub { $storage->get_all(@_) },
			'args'     => [],
			'callback' => sub {
				my $cb = pop;
				my $aref = shift;
				my $all_equal = 1;
				foreach my $msg (@$aref)
				{
					my $compare = $messages{$msg->id};
					unless ($msg->equals($compare))
					{
						use YAML;
						print STDERR "Unexpected mismatch: got";
						print STDERR Dump($msg);
						print STDERR "expected";
						print STDERR Dump($compare);
						$all_equal = 0;
					}
				}
				ok($all_equal && @$aref == scalar keys %messages, "$name: get_all");
				goto $cb;
			},
		},
		{
			'sub'      => sub { $storage->claim(@_) },
			'args'     => [1 => 14],
			'callback' => sub {
				my $cb = pop;
				$storage->get(1 => sub {
					my $msg = $_[0];
					is($msg && $msg->claimant, 14, "$name: claim");
					$storage->disown_all(14, $cb);	
				});
			},
		},
		{
			'sub'      => sub { $storage->remove(@_) }, 
			'args'     => [[qw(20 25 30)]],
			'callback' => sub { 
				my $cb = pop;
				$storage->get_all(sub {
					my $messages = $_[0];
					my %hash = map {$_->id => $_} @$messages;
					ok((not exists $hash{'20'}) &&
					   (not exists $hash{'25'}) &&
					   (not exists $hash{'30'}) &&
					   (keys %hash == 197),
					   "$name: remove");
					goto $cb;
				});
			},
		},
		{
			'sub'      => sub { $storage->empty(@_) }, 
			'args'     => [],
			'callback' => sub { 
				my $cb = pop;
				$storage->get_all(sub {
					is(scalar @{$_[0]}, 0, "$name: empty");
					goto $cb;
				});
			},
		},
	];
	my @dclone = @destinations;
	destination_tests(\@dclone, $storage, $name, sub {
		run_in_order($ordered_tests, sub {
			delay_tests($storage, $name, $done);
		});
	});
}

sub store_loop {
	my ($storage, $messages, $done) = @_;
	my $message = pop(@$messages);
	
	if ($message) {
		$storage->store($message->clone, sub {
			@_ = ($storage, $messages, $done);
			goto &store_loop;
		});	
	}
	else {
		goto $done;
	}
}

sub engine_loop {
	my $names = shift;
	my $name = pop(@$names);
	unless ($name)
	{
		ok(stop_fork($remote), "remote storage engine stopped.");
		return;
	}

	rmtree(DATA_DIR);
	mkpath(DATA_DIR);
	make_db();

	my $storage = make_engine($name);
	my $clones = [values %messages];

	store_loop($storage, $clones, sub {
		api_test($storage, $name, sub {
			$storage->storage_shutdown(sub {
				ok(1, "$name: storage_shutdown");
				@_ = ($names);
				goto &engine_loop;	
			});
		});
	});
}


POE::Session->create(
	inline_states => { _start => sub {
		engine_loop([$ARGV[0] || engine_names()]);
	}},
);

$poe_kernel->run();