The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!perl -w
use strict;
use Test::More;
use Test::SharedFork;

use AnyEvent;
use AnyEvent::ForkManager;
use Time::HiRes;

my $MAX_WORKERS = 4;
my $JOB_COUNT   = $MAX_WORKERS * 5;
my $TEST_COUNT  =
    ($JOB_COUNT)     + # in child process tests
    ($JOB_COUNT)     + # start method is non-blocking tests
    ($JOB_COUNT * 2) + # on_start
    ($JOB_COUNT * 3) + # on_finish
    ($JOB_COUNT > $MAX_WORKERS ? (($JOB_COUNT - $MAX_WORKERS) * 2) : 0) + # on_enqueue
    ($JOB_COUNT > $MAX_WORKERS ? (($JOB_COUNT - $MAX_WORKERS) * 2) : 0) + # on_dequeue
    ($JOB_COUNT > $MAX_WORKERS ? (($JOB_COUNT - $MAX_WORKERS) * 2) : 0) + # on_working_max
    4;# wait_all_children
plan tests => $TEST_COUNT;

my $pm = AnyEvent::ForkManager->new(
    max_workers => $MAX_WORKERS,
    on_start    => sub{
        my($pm, $pid, $exit_code) = @_;

        note 'start on_start';
        cmp_ok $pm->num_workers, '<', $pm->max_workers, 'not working max';
        is $$, $pm->manager_pid, 'called by manager';
        note 'end   on_start';
    },
    on_finish => sub{
        my($pm, $pid, $status, $exit_code) = @_;

        note 'start on_finish';
        is $status >> 8, $exit_code, 'status';
        cmp_ok $pm->num_workers, '<', $pm->max_workers, 'not working max';
        is $$, $pm->manager_pid, 'called by manager';
        note 'end   on_finish';
    },
    on_enqueue => sub{
        my($pm, $exit_code) = @_;

        note 'start on_enqueue';
        is $pm->num_workers, $pm->max_workers, 'working max';
        is $$, $pm->manager_pid, 'called by manager';
        note 'end   on_start';
    },
    on_dequeue => sub{
        my($pm, $exit_code) = @_;

        note 'start on_dequeue';
        cmp_ok $pm->num_workers, '<', $pm->max_workers, 'not working max';
        is $$, $pm->manager_pid, 'called by manager';
        note 'end   on_dequeue';
    },
    on_working_max => sub{
        my($pm, $exit_code) = @_;

        note 'start on_working_max';
        is $pm->num_workers, $pm->max_workers, 'working max';
        is $$, $pm->manager_pid, 'called by manager';
        note 'end   on_working_max';
    }
);

my $cv = AnyEvent->condvar;

my @all_data = (1 .. $JOB_COUNT);
my $started_all_process = 0;
foreach my $exit_code (@all_data) {
    select undef, undef, undef, 0.07;
    my $start_time = Time::HiRes::gettimeofday;
    $pm->start(
        cb => sub {
            my($pm, $exit_code) = @_;
            select undef, undef, undef, 0.5;
            isnt $$, $pm->manager_pid, 'called by child';
            local $SIG{USR1} = sub { $started_all_process = 1; };
            until ($started_all_process) {}; # wait
            $pm->finish($exit_code);
            fail 'finish failed';
        },
        args => [$exit_code]
    );
    my $end_time = Time::HiRes::gettimeofday;
    cmp_ok $end_time - $start_time, '<', 0.3, 'non-blocking';
}
$started_all_process = 1;
$pm->signal_all_children('USR1');

my $start_time = Time::HiRes::gettimeofday;
$pm->wait_all_children(
    cb => sub {
        my($pm) = @_;
        note 'start wait_all_children callback';
        is $$, $pm->manager_pid, 'called by manager';
        is $pm->num_workers, 0, 'finished all child process';
        is $pm->num_queues,  0, 'empty all child process queue';
        note 'end   wait_all_children callback';
        $cv->send;
    },
);
my $end_time = Time::HiRes::gettimeofday;
cmp_ok $end_time - $start_time, '<', 0.1, 'non-blocking';

$cv->recv;