The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use strict;
use warnings;
use FindBin;
use lib ("$FindBin::RealBin/lib");
use Test::More;

BEGIN {
    use_ok('Test::AQWrapper');
}

sub showResults {
    diag("results: ". join(" ", @_));
}

{
    my @results = ();
    my $q; $q = new_ok('Test::AQWrapper', [concurrency => 1, worker => sub {
        my ($task, $cb) = @_;
        $q->check();
        push(@results, $task);
        $cb->(uc($task));
    }]);
    $q->saturated(sub {
        $q->check();
        push(@results, "S");
    });
    $q->empty(sub {
        $q->check();
        push(@results, "E");
    });
    $q->drain(sub {
        $q->check();
        push(@results, "D");
    });

    note('--- event callbacks: local specs');
    @results = ();
    $q->clearCounter();
    $q->push('a', sub { $q->finish });
    is_deeply(\@results, [qw(S E a D)], '"saturated" before "empty".');
    @results = ();
    $q->push('b', sub { $q->finish });
    is_deeply(\@results, [qw(S E b D)], 'every task goes into the queue first. "empty" fires even if the task is served immediately.');
    $q->check(0, 0, 2, 2);


    note('--- event callbacks (concurrency 1)');
    @results = ();
    $q->clearCounter();
    $q->push($_, sub { $q->check(); push(@results, @_); $q->finish }) foreach qw(a b c);
    is_deeply(\@results, [qw(S E a A D S E b B D S E c C D)], "results OK.") or
        showResults(@results);
    $q->check(0, 0, 3, 3);
    

    note('--- event callbacks (concurrency 3)');
    @results = ();
    $q->clearCounter();
    $q->concurrency(3);
    $q->push($_, sub { $q->check(); push(@results, @_); $q->finish }) foreach qw(w x y z);
    is_deeply(\@results, [qw(E w W D E x X D E y Y D E z Z D)], "results OK") or
        showResults(@results);
    $q->check(0, 0, 4, 4);

    note('--- push inside finish callbacks (concurrency 1)');
    @results = ();
    $q->clearCounter();
    $q->concurrency(1);
    $q->push('a', sub {
        $q->check(0, 1, 0, 1);
        $q->push('b', sub {
            $q->check(1, 1, 1, 3);
            $q->push('d', sub {
                $q->check(0, 1, 3, 4);
                $q->finish;
            });
            $q->finish;
        });
        $q->push('c', sub {
            $q->check(1, 1, 2, 4);
            $q->finish;
        });
        $q->finish;
    });
    is_deeply(\@results, [qw(S E a b c E d D)], '"saturated" fires only when "running" increases to the max.') or
        showResults(@results);
    $q->check(0, 0, 4, 4);

    note('--- push inside finish callbacks (concurrency 3)');
    @results = ();
    $q->clearCounter();
    $q->concurrency(3);
    $q->push('a', sub {
        $q->check(0, 1, 0, 1);
        $q->push('b', sub {
            $q->check(0, 2, 0, 2);
            $q->push('c', sub {
                $q->check(0, 3, 0, 3);
                $q->push('d', sub {
                    $q->check(2, 3, 1, 6);
                    $q->finish;
                });
                $q->push('e', sub {
                    $q->check(1, 3, 2, 6);
                    $q->finish;
                });
                $q->push("f", sub {
                    $q->check(0, 3, 3, 6);
                    $q->finish;
                });
                $q->finish;
            });
            $q->push('g', sub {
                $q->check(0, 3, 4, 7);
                $q->push('h', sub {
                    $q->check(0, 3, 5, 8);
                    $q->push('i', sub {
                        $q->check(0, 3, 6, 9);
                        $q->finish;
                    });
                    $q->finish;
                });
                $q->finish;
            });
            $q->finish;
        });
        $q->finish;
    });
    is_deeply(\@results, [qw(E a E b S E c d e E f S E g E h E i D)]) or
        showResults(@results);
    $q->check(0, 0, 9, 9);

    note('--- "empty" event keeps firing until its "saturated"');
    $q->concurrency(3);
    $q->clearCounter;
    @results = ();
    $q->push("a", sub {
        $q->push("b", sub {
            $q->push("c", sub {
                $q->push($_, sub { $q->finish }) foreach qw(d e f g);
                $q->finish;
            });
            $q->finish;
        });
        $q->finish;
    });
    is_deeply(\@results, [qw(E a E b S E c d e f E g D)],
              'results OK. "empty" event keeps firing until "saturated"') or
                  showResults(@results);
}

{
    note('--- push from the worker (concurrency 1)');
    my @results = ();
    my @tasks = qw(a b c d e f);
    my $q;
    my $shiftPush = sub {
        my $t = shift(@tasks);
        $q->push($t, sub { $q->finish }) if defined $t;
    };
    $q = new_ok('Test::AQWrapper', [
        concurrency => 1,
        worker => sub {
            my ($task, $cb) = @_;
            $q->check;
            push(@results, $task);
            $shiftPush->();
            $cb->();
        },
        saturated => sub { $q->check; push(@results, "S") },
        empty => sub { $q->check; push(@results, "E") },
        drain => sub { $q->check; push(@results, "D") },
    ]);
    $q->clearCounter();
    $q->push(shift(@tasks), sub { $q->finish });
    $q->check(0, 0, 6, 6);
    is_deeply(\@results, [qw(S E a E b E c E d E e E f D)],
              'results OK. No "saturated" event except for the beginning.') or
                  showResults(@results);

    
    note('--- push from the worker (concurrency 3)');
    $q->concurrency(3);
    @results = ();
    @tasks = 1..6;
    $q->clearCounter();
    $q->push(shift(@tasks), sub { $q->finish });
    $q->check(0, 0, 6, 6);
    is_deeply(\@results, [qw(E 1 E 2 S E 3 E 4 E 5 E 6 D)]) or
        showResults(@results);
    

    note('--- push in "empty" event (concurrency 1)');
    $q->concurrency(1);
    $q->worker(sub {
        my ($task, $cb) = @_;
        $q->check;
        push(@results, $task);
        $cb->();
    });
    $q->empty(sub {
        $q->check;
        push(@results, "E");
        $shiftPush->();
    });
    $q->clearCounter;
    @results = ();
    @tasks = 1..5;
    $q->push(shift(@tasks), sub { $q->finish });
    $q->check(0, 0, 5, 5);
    is_deeply(\@results, [qw(S E 1 E 2 E 3 E 4 E 5 D)], "results OK") or
        showResults(@results);

    note('--- push in "empty" event (concurrency 3)');
    $q->concurrency(3);
    $q->clearCounter;
    @results = ();
    @tasks = 1..6;
    $q->push(shift(@tasks), sub { $q->finish });
    $q->check(0, 0, 6, 6);
    is_deeply(\@results, [qw(E E S E 3 E 4 E 5 E 6 2 1 D)], 'early tasks filling the service quit last.') or
        showResults(@results);

    note('--- push in "saturated" event (concurrency 1)');
    $q->empty(sub { $q->check; push(@results, "E") });
    $q->saturated(sub {
        $q->check;
        push(@results, "S");
        $shiftPush->();
    });
    $q->concurrency(1);
    $q->clearCounter;
    @results = ();
    @tasks = 1..9;
    $q->push(shift(@tasks), sub { $q->finish });
    $q->check(0, 0, 2, 2);
    is_deeply(\@results, [qw(S 1 E 2 D)], '"saturated" event does not repeat.') or
        showResults(@results);

    note('--- push in "saturated" event (concurrency 3)');
    $q->concurrency(3);
    $q->clearCounter;
    @results = ();
    @tasks = 1..9;
    $q->push(shift(@tasks), sub {
        $q->check(0, 1, 0, 1);
        $q->push(shift(@tasks), sub {
            $q->check(0, 2, 0, 2);
            $q->push(shift(@tasks), sub {
                $q->check(1, 3, 0, 4);
                $q->finish;
            });
            $q->finish;
        });
        $q->finish;
    });
    $q->check(0, 0, 4, 4);
    is_deeply(\@results, [qw(E 1 E 2 S 3 E 4 D)],
              '"empty" does not fire before 3 because "saturated" event populates 4.') or
                  showResults(@results);

    note('--- push in "drain" event (concurrency 1)');
    $q->saturated(sub { $q->check; push(@results, "S") });
    $q->drain(sub {
        $q->check;
        push(@results, "D");
        $shiftPush->();
    });
    $q->concurrency(1);
    $q->clearCounter;
    @results = ();
    @tasks = 1..3;
    $q->push(shift(@tasks), sub { $q->finish });
    $q->check(0, 0, 3, 3);
    is_deeply(\@results, [qw(S E 1 D S E 2 D S E 3 D)], 'results OK. "drain" feeds the queue with tasks.') or
        showResults(@results);


    note('--- push in "drain" event (concurrency 3)');
    $q->concurrency(3);
    $q->clearCounter;
    @results = ();
    @tasks = 1..7;
    $q->push(shift(@tasks), sub {
        $q->check(0, 1, 0, 1);
        $q->push(shift(@tasks), sub {
            $q->check(0, 2, 0, 2);
            $q->push(shift(@tasks), sub {
                $q->check(0, 3, 0, 3);
                $q->push(shift(@tasks), sub {
                    $q->check(0, 3, 1, 4);
                    $q->finish;
                });
                $q->finish;
            });
            $q->finish;
        });
        $q->finish;
    });
    $q->check(0, 0, 7, 7);
    is_deeply(\@results, [qw(E 1 E 2 S E 3 E 4 D E 5 D E 6 D E 7 D)], 'results OK.') or
        showResults(@results);
}

{
    note('--- infinite concurrency');
    foreach my $conc_val (0, -10) {
        my @results = ();
        my $q; $q = new_ok('Test::AQWrapper', [
            concurrency => $conc_val, worker => sub {
                my ($task, $cb) = @_;
                $q->check;
                push(@results, $task);
                $cb->();
            },
            map { my $e = $_; $e => sub {
                $q->check;
                push(@results, $e);
            } } qw(saturated empty drain)
        ]);
        is($q->concurrency, $conc_val, "concurrency is $conc_val, meaning infinite concurrency.");
        
        my @tasks = (1 .. 15);
        my @orig_tasks = @tasks;
        my $finish_cb; $finish_cb = sub {
            my $t = shift(@tasks);
            $q->push($t, $finish_cb) if defined $t;
            $q->finish;
        };
        $q->push(shift(@tasks), $finish_cb);
        $q->check(0, 0, int(@orig_tasks), int(@orig_tasks));
        is_deeply(\@results, [(map { ("empty", $_) } @orig_tasks), "drain"],
              'results OK. Never saturated.') or
                  showResults(@results);
    }
}

done_testing();