The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl

use strict;
use warnings;

use Test::More tests => 38;

use ok 'POE::Component::ResourcePool::Resource::Semaphore';
use ok 'POE::Component::ResourcePool::Resource::Collection';
use ok 'POE::Component::ResourcePool';

use Scalar::Util qw(weaken);

use POE;

for my $refc_alloc ( 0, 1 ) {
	my $max = 5;
	my $delay = 0.0001;
	my $total = 80;
	my $running = 0;
	my $max_running = 0;
	my $dispatched = 0;

	my $children = $total;

	POE::Session->create(
		inline_states => {
			_start => sub { $poe_kernel->yield("begin") },
			begin => sub {
				$_[HEAP]{alarm_id} = $poe_kernel->delay_set("exit", ( $total / $max ) );

				my $sem1 = POE::Component::ResourcePool::Resource::Semaphore->new( initial_value => $max * 8 );
				my $sem2 = POE::Component::ResourcePool::Resource::Semaphore->new( initial_value => $max );

				my $pool = POE::Component::ResourcePool->new(
					refcount_allocated => $refc_alloc,
					resources => { sem1 => $sem1, sem2 => $sem2},
				);

				foreach my $thing ( 1 .. $total ) {
					POE::Session->create(
						inline_states => {
							_start => sub {
								my ( $kernel, $heap, $session ) = @_[KERNEL, HEAP, SESSION];

								$kernel->yield("begin");
							},
							begin => sub {
								my ( $kernel, $heap, $session ) = @_[KERNEL, HEAP, SESSION];

								$heap->{request} = $pool->request(
									params => { sem1 => 1 + int(rand($max / 5)), sem2 => 1 + int(rand($max / 2)) },
									event  => "foo",
								);
							},
							foo => sub {
								$running++;
								$max_running = $running if $running > $max_running;
								$poe_kernel->delay_set("bar", $delay);
							},
							bar => sub {
								my ( $kernel, $heap, $session ) = @_[KERNEL, HEAP, SESSION];

								$dispatched++;

								$running--;

								if ( $refc_alloc ) {
									$heap->{request}->dismiss;
								}
							},
						},
					);
				}
			},
			_child => sub {
				if ( --$children == 0 ) {
					$poe_kernel->alarm_remove($_[HEAP]{alarm_id});
				}
			},
			exit => sub {
				fail("timed out");
				$poe_kernel->stop;
			}
		},
	);

	$poe_kernel->run;

	is( $dispatched, $total, "actually dispatched $total times" );

	cmp_ok( $max_running, '<=', $max, "never ran more than $max session simultaneously" );

	is( $running, 0, "counters are consistent" );
}

{
	my $pool = POE::Component::ResourcePool->new(
		resources => {
			thingys => POE::Component::ResourcePool::Resource::Collection->new(
				values => [qw(1 2 3)],
			)
		},
	);

	my ( $two, $four, $got_two, $got_four );

	POE::Session->create(
		inline_states => {
			_start => sub {
				$four = $_[HEAP]{r} = $pool->request( params => { thingys => 4 }, event => "got" );
			},
			got => sub {
				$got_four = $four->results;
			}
		},
	);

	POE::Session->create(
		inline_states => {
			_start => sub {
				$two = $_[HEAP]{r} = $pool->request( params => { thingys => 2 }, event => "got" );
			},
			got => sub {
				$got_two = $two->results;
				$two->dismiss;
			}
		},
	);

	POE::Session->create(
		inline_states => {
			_start => sub {
				$poe_kernel->delay_set( cancel => 0.01 );
			},
			cancel => sub {
				$_->dismiss for $pool->pending_requests;
			},
		},
	);

	is_deeply( [ sort $pool->pending_requests ], [ sort $two, $four ], "pending requests" );
	is_deeply( [ sort $pool->all_requests ], [ sort $two, $four ], "all_requests" );
	is_deeply( [ sort $pool->allocated_requests ], [ ], "allocated_requests" );

	$poe_kernel->run;

	is( $got_four, undef, "didn't get 4" );
	is_deeply( $got_two, { thingys => [ qw(1 2) ] }, "got 2" );
	ok( $two->fulfilled, "resource is fulfilled" );
	ok( !$two->canceled, "resource is not canceled" );
	ok( $four->canceled, "resource is canceled" );
}

{
	my $pool = POE::Component::ResourcePool->spawn(
		resources => {
			one => my $one = POE::Component::ResourcePool::Resource::Collection->new(
				values => [qw(1 2 3)],
			),
			two => my $two = POE::Component::ResourcePool::Resource::Collection->new(
				values => [qw(foo bar gorch)],
			),
			three => POE::Component::ResourcePool::Resource::Semaphore->new( initial_value => 2 ),
		},
	);

	POE::Session->create(
		inline_states => {
			_start => sub {
				my @requests = (
					$pool->request( params => { one => 1 }, event => "got" ),
					$pool->request( params => { two => 5 }, event => "got" ),
				);

				eval { $pool->request( params => { two => 1 } ) };
				my $e = $@;
				ok( $@, "got an error" );
				like( $@, qr/event/, "the right error" );
				like( $@, qr/callback/, "the right error" );
				like( $@, qr/basic\.t/, "it's a croak, not a die" );

				eval { $pool->request( params => { doesnt_exist => 3 }, event => "got" ) };
				$e = $@;
				ok( $@, "got an error" );
				like( $@, qr/no resource/, "the right error" );
				like( $@, qr/doesnt_exist/, "the right resource in the error" );
				like( $@, qr/basic\.t/, "it's a croak, not a die" );

				eval { $pool->request( params => { three => 3 }, event => "got" ) };
				$e = $@;
				ok( $@, "got an error" );
				like( $@, qr/rejected/, "the right error" );
				like( $@, qr/three/, "the right resource in the error" );
				like( $@, qr/basic\.t/, "it's a croak, not a die" );

				is_deeply( [ sort $pool->pending_requests ], [ sort @requests ], "pending requests" );

				is_deeply( [ $pool->pending_requests($one) ], [ $requests[0] ], "pending requests for resource" );

				is_deeply( [ $pool->allocated_requests ], [ ], "allocated requests" );

				$_[HEAP]{requests} = \@requests;
			},
			got => sub {
				my @requests = @{ $_[HEAP]{requests} };

				is_deeply( [ $pool->pending_requests ], [ $requests[1] ], "pending requests" );
				is_deeply( [ $pool->pending_requests($one) ], [ ], "pending requests for resource" );
				is_deeply( [ $pool->pending_requests($two) ], [ $requests[1] ], "pending requests for resource" );

				is_deeply( [ $pool->allocated_requests ], [ $requests[0] ], "allocated requests" );
				is_deeply( [ $pool->allocated_requests($one) ], [ $requests[0] ], "allocated requests" );
				is_deeply( [ $pool->allocated_requests($two) ], [ ], "allocated requests" );

				$requests[1]->dismiss;

				$pool->shutdown;
			}
		},
	);

	$poe_kernel->run;
}