The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/env perl

# mt-aws-glacier - Amazon Glacier sync client
# Copyright (C) 2012-2014  Victor Efimov
# http://mt-aws.com (also http://vs-dev.com) vs@vs-dev.com
# License: GPLv3
#
# This file is part of "mt-aws-glacier"
#
#    mt-aws-glacier is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    mt-aws-glacier is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.

use strict;
use warnings;
use Test::More;
use Test::Deep;
use FindBin;
use lib map { "$FindBin::RealBin/../$_" } qw{../lib ../../lib};
use TestUtils 'w_fatal';
use List::Util qw/min max/;
use App::MtAws::QueueJobResult;
use App::MtAws::QueueJob::Iterator;
use App::MtAws::QueueJob::MultipartPart;
use QueueHelpers;
use LCGRandom;




use Data::Dumper;

{
	package SimpleJob;
	use Carp;
	use App::MtAws::QueueJobResult;use Data::Dumper;
	use base 'App::MtAws::QueueJob';
	sub init {  };

	sub on_default
	{
		state 'wait', task($_[0]->{action}, {x => $_[0]->{n}}, sub {
			confess unless $_[0] && $_[0] =~ /^somedata\d$/;
			state 'done'
		});
	};

}

{
	package QE;
	use MyQueueEngine;
	use base q{MyQueueEngine};

	our $AUTOLOAD;

	sub AUTOLOAD
	{
		my $action = $AUTOLOAD;
		$action =~ s/^.*::on_//;
		my $self = shift;
		push @{$self->{res}}, { action => $action, data => [@_] };
		"somedata0"
	}
};

sub action_str
{
	"abc".join('', (map { sprintf("%04d", $_)  } @_));
}

sub create_iterator
{
	my ($maxcnt, $cnt, $jobs_count, $cb, @actions) = @_;

	my @orig_parts  = do {
		if ($jobs_count == 1) {
			map {
				my $a = action_str(@actions, $_);
				my $x = "x$a";
				SimpleJob->new(action => $a, n => $x)
			} (1..$cnt);
		} else {
			map { create_iterator($jobs_count+1, $jobs_count, 1, undef, @actions, $_) } (1..$cnt);
		}

	};
	App::MtAws::QueueJob::Iterator->new(maxcnt => $maxcnt, iterator => sub {
		$cb->() if $cb;
		@orig_parts ? shift @orig_parts : ()
	});
}

sub test_case_early_finish
{
	my ($maxcnt, $cnt, $jobs_count) = @_;

	my $live_counter = 0;
	my @live_counter_log;
	my $itt = create_iterator($maxcnt, $cnt, $jobs_count, sub { ++$live_counter });
	my @actions;
	while (1) {
		my $r = $itt->next;
		push @live_counter_log, $live_counter;
		ok $r->{code} eq JOB_OK || $r->{code} eq JOB_DONE;
		last if $r->{code} eq JOB_DONE;
		push @actions, $r->{task}{action};
		$r->{task}{cb_task_proxy}->("somedata1");
	}

	cmp_deeply [@live_counter_log], [map { $_ } 1..$cnt+1], "should not call itterator for all jobs at once";
	cmp_deeply [sort @actions], [sort map { action_str($_) } 1..$cnt], "test it works when callback called immediately";

}

sub test_late_finish
{
	my ($maxcnt, $cnt, $jobs_count) = @_;

	my $live_counter = 0;
	my @live_counter_log;
	my $itt = create_iterator($maxcnt, $cnt, $jobs_count, sub { ++$live_counter });
	my @actions = ();
	my @passes;
	while (@actions < $cnt) {
		my @callbacks = ();
		my $r;
		while (1) {
			$r = $itt->next;
			push @live_counter_log, $live_counter;
			ok $r->{code} eq JOB_OK || $r->{code} eq JOB_WAIT;
			last if $r->{code} eq JOB_WAIT;
			push @actions, $r->{task}{action};
			push @callbacks, $r->{task}{cb_task_proxy};
		}
		if ($r->{code} eq JOB_WAIT) {
			push @passes, scalar @callbacks;
			$_->("somedata2") for @callbacks;
			next;
		}
	}
	cmp_deeply [sort @actions], [sort map { action_str($_) } 1..$cnt];
	#print Dumper $maxcnt, $cnt, \@live_counter_log;

	if ($cnt % $maxcnt) {
		cmp_deeply {map { $_ => 1 } @live_counter_log}, {map { $_ => 1 } 1..$cnt+1}, "should not call itterator for all jobs at once";
	} else {
		cmp_deeply {map { $_ => 1 } @live_counter_log}, {map { $_ => 1 } 1..$cnt  }, "should not call itterator for all jobs at once";
	}

	is pop @passes, $cnt % $maxcnt, "last pass should contain cnt mod maxcnt items" if ($cnt % $maxcnt);
	is $_, $maxcnt, "all passes excapt last should contain maxcnt items (if more than one pass)" for (@passes);
	is $itt->next->{code}, JOB_DONE, "test it works when callback called later";

	is $live_counter, $cnt+1;
}

sub test_random_finish
{
	my ($maxcnt, $cnt, $jobs_count, $nworkers) = @_;
	my $itt = create_iterator($maxcnt, $cnt, $jobs_count);
	my $q = QE->new(n => $nworkers);
	$q->process($itt);

	for (@{ $q->{res} }) {
		ok $_->{action} =~ /^abc(\d{4})/;
		is $_->{data}[0], 'x';
		ok $_->{data}[1] =~ /^xabc$1/;
		is scalar @{ $_->{data} }, 2;
	}
	if ($jobs_count == 1) {
		cmp_deeply [sort map { $_->{action} } @{ $q->{res} }], [sort map { action_str($_) } 1..$cnt];
	} else {
		is scalar @{ $q->{res} }, $cnt*$jobs_count;
	}
}

plan_tests 928 => sub {

	ok ! eval { App::MtAws::QueueJob::Iterator->new(maxcnt => 30); 1; };
	like $@, qr/iterator required/;

	{
		my $itt = App::MtAws::QueueJob::Iterator->new(maxcnt => 20, iterator => sub {});
		is $itt->{maxcnt}, 20, "one should be able to override maxcnt";
		$itt = App::MtAws::QueueJob::Iterator->new(iterator => sub {});
		is $itt->{maxcnt}, 30, "default maxcnt should be 30";
	}


	my $maxcnt = 7;
	lcg_srand 777654 => sub {

		alarm 180;
		for my $n (0, 1, 2, 5, $maxcnt - 1, $maxcnt, $maxcnt+1, $maxcnt*2, $maxcnt*2+1, $maxcnt*3, $maxcnt*3-1) {
			test_case_early_finish($maxcnt, $n, 1);
			test_late_finish($maxcnt, $n, 1);
		}
		alarm 0;

		for my $n (0, 1, 2, 3, 4, 5) {
			# this already protected by alarm
			test_random_finish($maxcnt, $n, 1, $_) for (1..$n+1);
			test_random_finish($maxcnt, $n, 2, $n+1);
			test_random_finish($maxcnt, $n, 3, $n+1);
		}
	};
};

1;