The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl

use strict;
use warnings;

use IO::Async::Test;

use Test::More;
use Test::Fatal;
use Test::Refcount;

use Fcntl qw( SEEK_SET SEEK_END );
use File::Temp qw( tempfile );

use IO::Async::Loop;

use IO::Async::OS;

use IO::Async::FileStream;

use constant AUT => $ENV{TEST_QUICK_TIMERS} ? 0.1 : 1;

my $loop = IO::Async::Loop->new_builtin;

testing_loop( $loop );

sub mkhandles
{
   my ( $rd, $filename ) = tempfile( "tmpfile.XXXXXX", UNLINK => 1 );
   open my $wr, ">", $filename or die "Cannot reopen file for writing - $!";

   $wr->autoflush( 1 );

   return ( $rd, $wr, $filename );
}

{
   my ( $rd, $wr ) = mkhandles;

   my @lines;
   my $initial_size;

   my $filestream = IO::Async::FileStream->new(
      interval => 0.1 * AUT,
      read_handle => $rd,
      on_read => sub {
         my $self = shift;
         my ( $buffref, $eof ) = @_;

         push @lines, $1 while $$buffref =~ s/^(.*\n)//;
         return 0;
      },
      on_initial => sub { ( undef, $initial_size ) = @_ },
   );

   ok( defined $filestream, '$filestream defined' );
   isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' );

   is_oneref( $filestream, 'reading $filestream has refcount 1 initially' );

   $loop->add( $filestream );

   is_refcount( $filestream, 2, '$filestream has refcount 2 after adding to Loop' );

   is( $initial_size, 0, '$initial_size is 0' );

   $wr->syswrite( "message\n" );

   is_deeply( \@lines, [], '@lines before wait' );

   wait_for { scalar @lines };

   is_deeply( \@lines, [ "message\n" ], '@lines after wait' );

   $loop->remove( $filestream );
}

# on_initial
{
   my ( $rd, $wr ) = mkhandles;

   $wr->syswrite( "Some initial content\n" );

   my @lines;
   my $initial_size;

   my $filestream = IO::Async::FileStream->new(
      interval => 0.1 * AUT,
      read_handle => $rd,
      on_read => sub {
         my $self = shift;
         my ( $buffref, $eof ) = @_;

         push @lines, $1 while $$buffref =~ s/^(.*\n)//;
         return 0;
      },
      on_initial => sub { ( undef, $initial_size ) = @_ },
   );

   $loop->add( $filestream );

   is( $initial_size, 21, '$initial_size is 21' );

   $wr->syswrite( "More content\n" );

   wait_for { scalar @lines };

   is_deeply( \@lines, [ "Some initial content\n", "More content\n" ], 'All content is visible' );

   $loop->remove( $filestream );
}

# seek_to_last
{
   my ( $rd, $wr ) = mkhandles;

   $wr->syswrite( "Some skipped content\nWith a partial line" );

   my @lines;

   my $filestream = IO::Async::FileStream->new(
      interval => 0.1 * AUT,
      read_handle => $rd,
      on_read => sub {
         my $self = shift;
         my ( $buffref, $eof ) = @_;

         return 0 unless( $$buffref =~ s/^(.*\n)// );

         push @lines, $1;
         return 1;
      },
      on_initial => sub {
         my $self = shift;
         # Give it a tiny block size, forcing it to have to seek harder to find the \n
         ok( $self->seek_to_last( "\n", blocksize => 8 ), 'FileStream successfully seeks to last \n' );
      },
   );

   $loop->add( $filestream );

   $wr->syswrite( " finished here\n" );

   wait_for { scalar @lines };

   is_deeply( \@lines, [ "With a partial line finished here\n" ], 'Partial line completely returned' );

   $loop->remove( $filestream );
}

# on_initial can skip content
{
   my ( $rd, $wr ) = mkhandles;

   $wr->syswrite( "Some skipped content\n" );

   my @lines;

   my $filestream = IO::Async::FileStream->new(
      interval => 0.1 * AUT,
      read_handle => $rd,
      on_read => sub {
         my $self = shift;
         my ( $buffref, $eof ) = @_;

         return 0 unless( $$buffref =~ s/^(.*\n)// );

         push @lines, $1;
         return 1;
      },
      on_initial => sub { my $self = shift; $self->seek( 0, SEEK_END ); },
   );

   $loop->add( $filestream );

   $wr->syswrite( "Additional content\n" );

   wait_for { scalar @lines };

   is_deeply( \@lines, [ "Additional content\n" ], 'Initial content is skipped' );

   $loop->remove( $filestream );
}

# Truncation
{
   my ( $rd, $wr ) = mkhandles;

   my @lines;
   my $truncated;

   my $filestream = IO::Async::FileStream->new(
      interval => 0.1 * AUT,
      read_handle => $rd,
      on_read => sub {
         my $self = shift;
         my ( $buffref, $eof ) = @_;

         return 0 unless( $$buffref =~ s/^(.*\n)// );

         push @lines, $1;
         return 1;
      },
      on_truncated => sub { $truncated++ },
   );

   $loop->add( $filestream );

   $wr->syswrite( "Some original lines\nin the file\n" );

   wait_for { scalar @lines };
   
   $wr->truncate( 0 );
   sysseek( $wr, 0, SEEK_SET );
   $wr->syswrite( "And another\n" );

   wait_for { @lines == 3 };

   is( $truncated, 1, 'File content truncation detected' );
   is_deeply( \@lines,
      [ "Some original lines\n", "in the file\n", "And another\n" ],
      'All three lines read' );

   $loop->remove( $filestream );
}

# Follow by name
SKIP: {
   skip "OS is unable to rename open files", 7 unless IO::Async::OS->HAVE_RENAME_OPEN_FILES;

   my ( undef, $wr, $filename ) = mkhandles;

   my @lines;

   my $filestream = IO::Async::FileStream->new(
      interval => 0.1 * AUT,
      filename => $filename,
      on_read => sub {
         my $self = shift;
         my ( $buffref, $eof ) = @_;

         push @lines, $1 while $$buffref =~ s/^(.*\n)//;
         return 0;
      },
   );

   ok( defined $filestream, '$filestream defined for filenaem' );
   isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' );

   is_oneref( $filestream, 'reading $filestream has refcount 1 initially' );

   $loop->add( $filestream );

   is_refcount( $filestream, 2, '$filestream has refcount 2 after adding to Loop' );

   $wr->syswrite( "message\n" );
   wait_for { scalar @lines };

   is_deeply( \@lines, [ "message\n" ], '@lines after wait' );
   shift @lines;

   $wr->syswrite( "last line of old file\n" );
   close $wr;
   rename( $filename, "$filename.old" ) or die "Cannot rename $filename - $!";
   END { defined $filename and -f $filename and unlink $filename }
   END { defined $filename and -f "$filename.old" and unlink "$filename.old" }
   open $wr, ">", $filename or die "Cannot reopen $filename for writing - $!";
   $wr->syswrite( "first line of new file\n" );

   wait_for { scalar @lines };
   is_deeply( $lines[0], "last line of old file\n", '@lines sees last line of old file' );
   wait_for { scalar @lines >= 2 };
   is_deeply( $lines[1], "first line of new file\n", '@lines sees first line of new file' );

   $loop->remove( $filestream );
}

# Subclass
my @sub_lines;

{
   my ( $rd, $wr ) = mkhandles;

   my $filestream = TestStream->new(
      interval => 0.1 * AUT,
      read_handle => $rd,
   );

   ok( defined $filestream, 'subclass $filestream defined' );
   isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' );

   is_oneref( $filestream, 'subclass $filestream has refcount 1 initially' );

   $loop->add( $filestream );

   is_refcount( $filestream, 2, 'subclass $filestream has refcount 2 after adding to Loop' );

   $wr->syswrite( "message\n" );

   is_deeply( \@sub_lines, [], '@sub_lines before wait' );

   wait_for { scalar @sub_lines };

   is_deeply( \@sub_lines, [ "message\n" ], '@sub_lines after wait' );

   $loop->remove( $filestream );
}

done_testing;

package TestStream;
use base qw( IO::Async::FileStream );

sub on_read
{
   my $self = shift;
   my ( $buffref ) = @_;

   return 0 unless $$buffref =~ s/^(.*\n)//;

   push @sub_lines, $1;
   return 1;
}