The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl

use strict;
use warnings;

use IO::Async::Test;

use Test::More;
use Test::Fatal;
use Test::Refcount;

use File::Temp qw( tempdir );
use Time::HiRes qw( sleep );

use IO::Async::Function;

use IO::Async::OS;

use IO::Async::Loop;

use constant AUT => $ENV{TEST_QUICK_TIMERS} ? 0.1 : 1;

my $loop = IO::Async::Loop->new_builtin;

testing_loop( $loop );

# by future
{
   my $function = IO::Async::Function->new(
      min_workers => 1,
      max_workers => 1,
      code => sub { return $_[0] + $_[1] },
   );

   ok( defined $function, '$function defined' );
   isa_ok( $function, "IO::Async::Function", '$function isa IO::Async::Function' );

   is_oneref( $function, '$function has refcount 1' );

   $loop->add( $function );

   is_refcount( $function, 2, '$function has refcount 2 after $loop->add' );

   is( $function->workers, 1, '$function has 1 worker' );
   is( $function->workers_busy, 0, '$function has 0 workers busy' );
   is( $function->workers_idle, 1, '$function has 1 workers idle' );

   my $future = $function->call(
      args => [ 10, 20 ],
   );

   isa_ok( $future, "Future", '$future' );

   is_refcount( $function, 2, '$function has refcount 2 after ->call' );

   is( $function->workers_busy, 1, '$function has 1 worker busy after ->call' );
   is( $function->workers_idle, 0, '$function has 0 worker idle after ->call' );

   wait_for { $future->is_ready };

   my ( $result ) = $future->get;

   is( $result, 30, '$result after call returns by future' );

   is( $function->workers_busy, 0, '$function has 0 workers busy after call returns' );
   is( $function->workers_idle, 1, '$function has 1 workers idle after call returns' );

   $loop->remove( $function );
}

# by callback
{
   my $function = IO::Async::Function->new(
      min_workers => 1,
      max_workers => 1,
      code => sub { return $_[0] + $_[1] },
   );

   $loop->add( $function );

   my $result;

   $function->call(
      args => [ 10, 20 ],
      on_return => sub { $result = shift },
      on_error  => sub { die "Test failed early - @_" },
   );

   wait_for { defined $result };

   is( $result, 30, '$result after call returns by callback' );

   $loop->remove( $function );
}

# Test queueing
{
   my $function = IO::Async::Function->new(
      min_workers => 1,
      max_workers => 1,
      code => sub { return $_[0] + $_[1] },
   );

   $loop->add( $function );

   my @result;

   my $f1 = $function->call(
      args => [ 1, 2 ],
      on_return => sub { push @result, shift },
      on_error  => sub { die "Test failed early - @_" },
   );
   my $f2 = $function->call(
      args => [ 3, 4 ],
      on_return => sub { push @result, shift },
      on_error  => sub { die "Test failed early - @_" },
   );

   is( $function->workers, 1, '$function->workers is still 1 after 2 calls' );

   isa_ok( $f1, "Future", '$f1' );
   isa_ok( $f2, "Future", '$f2' );

   wait_for { @result == 2 };

   is_deeply( \@result, [ 3, 7 ], '@result after both calls return' );

   is( $function->workers, 1, '$function->workers is still 1 after 2 calls return' );

   $loop->remove( $function );
}

# References
{
   my $function = IO::Async::Function->new(
      code => sub { return ref( $_[0] ), \$_[1] },
   );

   $loop->add( $function );

   my @result;

   $function->call(
      args => [ \'a', 'b' ],
      on_return => sub { @result = @_ },
      on_error  => sub { die "Test failed early - @_" },
   );

   wait_for { scalar @result };

   is_deeply( \@result, [ 'SCALAR', \'b' ], 'Call and result preserves references' );

   $loop->remove( $function );
}

# Exception throwing
{
   my $line = __LINE__ + 2;
   my $function = IO::Async::Function->new(
      code => sub { die shift },
   );

   $loop->add( $function );

   my $err;

   my $f = $function->call(
      args => [ "exception name" ],
      on_return => sub { },
      on_error  => sub { $err = shift },
   );

   wait_for { defined $err };

   like( $err, qr/^exception name at \Q$0\E line \d+\.$/, '$err after exception' );

   is_deeply( [ $f->failure ],
              [ "exception name at $0 line $line.", error => ],
              '$f->failure after exception' );

   $loop->remove( $function );
}

# max_workers
{
   my $count = 0;

   my $function = IO::Async::Function->new(
      max_workers => 1,
      code => sub { $count++; die "$count\n" },
      exit_on_die => 0,
   );

   $loop->add( $function );

   my @errs;
   $function->call(
      args => [],
      on_return => sub { },
      on_error  => sub { push @errs, shift },
   );
   $function->call(
      args => [],
      on_return => sub { },
      on_error  => sub { push @errs, shift },
   );

   undef @errs;
   wait_for { scalar @errs == 2 };

   is_deeply( \@errs, [ "1", "2" ], 'Closed variables preserved when exit_on_die => 0' );

   $loop->remove( $function );
}

# exit_on_die
{
   my $count = 0;

   my $function = IO::Async::Function->new(
      max_workers => 1,
      code => sub { $count++; die "$count\n" },
      exit_on_die => 1,
   );

   $loop->add( $function );

   my @errs;
   $function->call(
      args => [],
      on_return => sub { },
      on_error  => sub { push @errs, shift },
   );
   $function->call(
      args => [],
      on_return => sub { },
      on_error  => sub { push @errs, shift },
   );

   undef @errs;
   wait_for { scalar @errs == 2 };

   is_deeply( \@errs, [ "1", "1" ], 'Closed variables preserved when exit_on_die => 1' );

   $loop->remove( $function );
}

# restart after exit
SKIP: {
   skip "This Perl does not support fork()", 4
      if not IO::Async::OS->HAVE_POSIX_FORK;

   my $function = IO::Async::Function->new(
      model => "fork",
      min_workers => 0,
      max_workers => 1,
      code => sub { $_[0] ? exit shift : return 0 },
   );

   $loop->add( $function );

   my $err;

   $function->call(
      args => [ 16 ],
      on_return => sub { $err = "" },
      on_error  => sub { $err = [ @_ ] },
   );

   wait_for { defined $err };

   # Not sure what reason we might get - need to check both
   ok( $err->[0] eq "closed" || $err->[0] eq "exit", '$err->[0] after child death' )
      or diag( 'Expected "closed" or "exit", found ' . $err->[0] );

   is( scalar $function->workers, 0, '$function->workers is now 0' );

   $function->call(
      args => [ 0 ],
      on_return => sub { $err = "return" },
      on_error  => sub { $err = [ @_ ] },
   );

   is( scalar $function->workers, 1, '$function->workers is now 1 again' );

   undef $err;
   wait_for { defined $err };

   is( $err, "return", '$err is "return" after child nondeath' );

   $loop->remove( $function );
}

## Now test that parallel runs really are parallel
{
   # touch $dir/$n in each worker, touch $dir/done to finish it
   sub touch
   {
      my ( $file ) = @_;

      open( my $fh, ">", $file ) or die "Cannot write $file - $!";
      close( $fh );
   }

   my $function = IO::Async::Function->new(
      min_workers => 3,
      code => sub {
         my ( $dir, $n ) = @_;
         my $file = "$dir/$n";

         touch( $file );

         # Wait for synchronisation
         sleep 0.1 while ! -e "$dir/done";

         unlink( $file );

         return $n;
      },
   );

   $loop->add( $function );

   is( scalar $function->workers, 3, '$function->workers is 3' );

   my $dir = tempdir( CLEANUP => 1 );

   my %ret;

   foreach my $id ( 1, 2, 3 ) {
      $function->call(
         args => [ $dir, $id ],
         on_return => sub { $ret{$id} = shift },
         on_error  => sub { die "Test failed early - @_" },
      );
   }

   wait_for { -e "$dir/1" and -e "$dir/2" and -e "$dir/3" };

   ok( 1, 'synchronise files created' );

   # Synchronize deleting them;
   touch( "$dir/done" );

   undef %ret;
   wait_for { keys %ret == 3 };

   unlink( "$dir/done" );

   is_deeply( \%ret, { 1 => 1, 2 => 2, 3 => 3 }, 'ret keys after parallel run' );

   is( scalar $function->workers, 3, '$function->workers is still 3' );

   $loop->remove( $function );
}

# Test for idle timeout
{
   my $function = IO::Async::Function->new(
      min_workers => 0,
      max_workers => 1,
      idle_timeout => 2 * AUT,
      code => sub { return $_[0] },
   );

   $loop->add( $function );

   my $result;

   $function->call(
      args => [ 1 ],
      on_result => sub { $result = $_[0] },
   );

   wait_for { defined $result };

   is( $function->workers, 1, '$function has 1 worker after call' );

   my $waited;
   $loop->watch_time( after => 1 * AUT, code => sub { $waited++ } );

   wait_for { $waited };

   is( $function->workers, 1, '$function still has 1 worker after short delay' );

   undef $result;
   $function->call(
      args => [ 1 ],
      on_result => sub { $result = $_[0] },
   );

   wait_for { defined $result };

   undef $waited;
   $loop->watch_time( after => 3 * AUT, code => sub { $waited++ } );

   wait_for { $waited };

   is( $function->workers, 0, '$function has 0 workers after longer delay' );

   $loop->remove( $function );
}

# Restart
{
   my $value = 1;

   my $function = IO::Async::Function->new(
      code => sub { return $value },
   );

   $loop->add( $function );

   my $result;
   $function->call(
      args => [],
      on_return => sub { $result = shift },
      on_error  => sub { die "Test failed early - @_" },
   );

   wait_for { defined $result };

   is( $result, 1, '$result before restart' );

   $value = 2;
   $function->restart;

   undef $result;
   $function->call(
      args => [],
      on_return => sub { $result = shift },
      on_error  => sub { die "Test failed early - @_" },
   );

   wait_for { defined $result };

   is( $result, 2, '$result after restart' );

   undef $result;
   $function->call(
      args => [],
      on_return => sub { $result = shift },
      on_error  => sub { die "Test failed early - @_" },
   );

   $function->restart;

   wait_for { defined $result };

   is( $result, 2, 'call before restart still returns result' );

   $loop->remove( $function );
}

# max_worker_calls
{
   my $counter;
   my $function = IO::Async::Function->new(
      max_workers      => 1,
      max_worker_calls => 2,
      code => sub { return ++$counter; }
   );

   $loop->add( $function );

   my $result;
   $function->call(
      args => [],
      on_return => sub { $result = shift },
      on_error  => sub { die "Test failed early - @_" },
   );
   wait_for { defined $result };
   is( $result, 1, '$result from first call' );

   undef $result;
   $function->call(
      args => [],
      on_return => sub { $result = shift },
      on_error  => sub { die "Test failed early - @_" },
   );
   wait_for { defined $result };
   is( $result, 2, '$result from second call' );

   undef $result;
   $function->call(
      args => [],
      on_return => sub { $result = shift },
      on_error  => sub { die "Test failed early - @_" },
   );
   wait_for { defined $result };
   is( $result, 1, '$result from third call' );

   $loop->remove( $function );
}

# Cancellation of sent calls
{
   my $function = IO::Async::Function->new(
      max_workers => 1,
      code => sub {
         return 123;
      },
   );

   $loop->add( $function );

   my $f1 = $function->call( args => [] );
   $f1->cancel;

   my $f2 = $function->call( args => [] );

   wait_for { $f2->is_ready };

   is( scalar $f2->get, 123, 'Result of function call after cancelled call' );

   $loop->remove( $function );
}

# Cancellation of pending calls
{
   my $function = IO::Async::Function->new(
      max_workers => 1,
      code => do { my $state; sub {
         my $oldstate = $state;
         $state = shift;
         return $oldstate;
      } },
   );

   $loop->add( $function );

   # Queue 3 calls but immediately cancel the middle one
   my ( $f1, $f2, $f3 ) = map {
      $function->call( args => [ $_ ] )
   } 1 .. 3;

   $f2->cancel;

   wait_for { $f1->is_ready and $f3->is_ready };

   is( scalar $f1->get, undef, '$f1 result is undef' );
   is( scalar $f3->get, 1, '$f3 result is 1' );

   $loop->remove( $function );
}

done_testing;