The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl

use strict;
use warnings;

use IO::Async::Test;

use Test::More;
use Test::Fatal;
use Test::Refcount;

use IO::File;
use Errno qw( EAGAIN EWOULDBLOCK );

use IO::Async::Loop;

use IO::Async::OS;

use IO::Async::Stream;

my $loop = IO::Async::Loop->new_builtin;

testing_loop( $loop );

my ( $S1, $S2 ) = IO::Async::OS->socketpair or die "Cannot create socket pair - $!";
my ( $S3, $S4 ) = IO::Async::OS->socketpair or die "Cannot create socket pair - $!";

# Need sockets in nonblocking mode
$_->blocking( 0 ) for $S1, $S2, $S3, $S4;

# useful test function
sub read_data
{
   my ( $s ) = @_;

   my $buffer;
   my $ret = $s->sysread( $buffer, 8192 );

   return $buffer if( defined $ret && $ret > 0 );
   die "Socket closed" if( defined $ret && $ret == 0 );
   return "" if $! == EAGAIN or $! == EWOULDBLOCK;
   die "Cannot sysread() - $!";
}

my @lines;

my $stream = IO::Async::Stream->new(
   read_handle => $S2,
   write_handle => $S3,
   on_read => sub {
      my $self = shift;
      my ( $buffref, $eof ) = @_;

      push @lines, $1 while $$buffref =~ s/^(.*\n)//;
      return 0;
   },
);

is_oneref( $stream, 'split read/write $stream has refcount 1 initially' );

undef @lines;

$loop->add( $stream );

is_refcount( $stream, 2, 'split read/write $stream has refcount 2 after adding to Loop' );

$stream->write( "message\n" );

$loop->loop_once( 0.1 );

is( read_data( $S4 ), "message\n", '$S4 receives data from split stream' );
is( read_data( $S1 ), "",          '$S1 empty from split stream' );

$S1->syswrite( "reverse\n" );

$loop->loop_once( 0.1 );

is_deeply( \@lines, [ "reverse\n" ], '@lines on response to split stream' );

is_refcount( $stream, 2, 'split read/write $stream has refcount 2 before removing from Loop' );

$loop->remove( $stream );

is_oneref( $stream, 'split read/write $stream refcount 1 finally' );

undef $stream;

my $buffer = "";
my $closed;

$stream = IO::Async::Stream->new(
   # No handle yet
   on_read => sub {
      my ( $self, $buffref, $eof ) = @_;
      $buffer .= $$buffref;
      $$buffref =  "";
      return 0;
   },
   on_closed => sub {
      my ( $self ) = @_;
      $closed = 1;
   },
);

is_oneref( $stream, 'latehandle $stream has refcount 1 initially' );

$loop->add( $stream );

is_refcount( $stream, 2, 'latehandle $stream has refcount 2 after adding to Loop' );

ok( exception { $stream->write( "some text" ) },
    '->write on stream with no IO handle fails' );

$stream->set_handle( $S1 );

is_refcount( $stream, 2, 'latehandle $stream has refcount 2 after setting a handle' );

$stream->write( "some text" );

$loop->loop_once( 0.1 );

my $buffer2;
$S2->sysread( $buffer2, 8192 );

is( $buffer2, "some text", 'stream-written text appears' );

$S2->syswrite( "more text" );

wait_for { length $buffer };

is( $buffer, "more text", 'stream-read text appears' );

$stream->close_when_empty;

is( $closed, 1, 'closed after close' );

ok( !defined $stream->loop, 'Stream no longer member of Loop' );

is_oneref( $stream, 'latehandle $stream refcount 1 finally' );

# Now try re-opening the stream with a new handle, and check it continues to
# work

$loop->add( $stream );

$stream->set_handle( $S3 );

$stream->write( "more text" );

$loop->loop_once( 0.1 );

undef $buffer2;
$S4->sysread( $buffer2, 8192 );

is( $buffer2, "more text", 'stream-written text appears after reopen' );

$loop->remove( $stream );

undef $stream;

( $S1, $S2 ) = IO::Async::OS->socketpair or die "Cannot socketpair - $!";

$stream = IO::Async::Stream->new(
   handle => $S1,
   on_read => sub { },
);

$stream->write( "hello" );

$loop->add( $stream );

is_refcount( $stream, 2, '$stream has two references' );
undef $stream; # Only ref is now in the Loop

$S2->close;

# $S1 should now be both read- and write-ready.
ok( !exception { $loop->loop_once }, 'read+write-ready closed Stream doesn\'t die' );

undef $stream;

binmode STDIN; # Avoid harmless warning in case -CS is in effect
$stream = IO::Async::Stream->new_for_stdio;
is( $stream->read_handle,  \*STDIN,  'Stream->new_for_stdio->read_handle is STDIN' );
is( $stream->write_handle, \*STDOUT, 'Stream->new_for_stdio->write_handle is STDOUT' );

done_testing;