The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl

use strict;
use warnings;

use Test::More;
use Test::Identity;
use Test::HexString;
use Test::Refcount;

use IO::Async::Test;
use IO::Async::OS;
use IO::Async::Loop;
use IO::Async::Stream;

use Net::Async::CassandraCQL;
use Net::Async::CassandraCQL::Connection;
use Protocol::CassandraCQL qw( CONSISTENCY_ANY CONSISTENCY_ONE CONSISTENCY_TWO );

use constant CQL_STRING => "INSERT INTO t (f) = (?)";

my $loop = IO::Async::Loop->new();
testing_loop( $loop );

my ( $S1, $S2 ) = IO::Async::OS->socketpair() or die "Cannot create socket pair - $!";

my $cass = Net::Async::CassandraCQL->new;

# CHEATING
$cass->add_child( my $conn = Net::Async::CassandraCQL::Connection->new(
   handle => $S1,
) );
$cass->{nodes} = { NODEID => {
      conn    => $conn,
      ready_f => Future->new->done( $conn ),
} };
$cass->{primary_ids} = { NODEID => 1 };
# END CHEATING

$loop->add( $cass );

# ->prepare and ->execute
{
   my $f = $cass->prepare( CQL_STRING );

   my $stream = "";
   wait_for_stream { length $stream >= 8 + 27 } $S2 => $stream;

   # OPCODE_PREPARE
   is_hexstr( $stream,
              "\x01\x00\x01\x09\0\0\0\x1b" .
                 "\0\0\0\x17INSERT INTO t (f) = (?)",
              'stream after ->prepare' );

   # OPCODE_RESULT
   $S2->syswrite( "\x81\x00\x01\x08\0\0\0\x2c\0\0\0\4" .
                     "\x00\x100123456789ABCDEF" .
                     "\0\0\0\1\0\0\0\1\0\4test\0\1t\0\1f\x00\x0D" );

   wait_for { $f->is_ready };

   my $query = $f->get;
   undef $f;

   is_oneref( $query, '$query has refcount 1 after ->prepare' );

   is( $query->id, "0123456789ABCDEF", '$query->id after ->prepare->get' );
   is( $query->cql, "INSERT INTO t (f) = (?)", '$query->cql after ->prepare->get' );
   is( $query->params, 1, '$query->params' );
   is( scalar $query->param_name(0), "test.t.f", '$query->param_name(0)' );
   is( $query->param_type(0)->name, "VARCHAR", '$query->param_type(0)->name' );

   {
      my $f2 = $cass->prepare( CQL_STRING );

      ok( $f2->is_ready, 'Duplicate prepare is ready immediately' );

      identical( scalar $f2->get, $query, 'Duplicate prepare yields same object' );
   }

   # ->execute directly
   $f = $cass->execute( $query, [ "more-data" ], CONSISTENCY_ANY );

   $stream = "";
   wait_for_stream { length $stream >= 8 + 35 } $S2 => $stream;

   # OPCODE_EXECUTE
   is_hexstr( $stream,
              "\x01\x00\x01\x0A\0\0\0\x23" .
                 "\x00\x100123456789ABCDEF" .
                 "\x00\x01" . "\0\0\0\x09more-data" .
                 "\x00\x00",
              'stream after ->execute' );

   # OPCODE_RESULT
   $S2->syswrite( "\x81\x00\x01\x08\0\0\0\4\0\0\0\1" );

   wait_for { $f->is_ready };

   is_deeply( [ $f->get ], [],
              '->execute returns nothing' );

   # ->execute via $query from ARRAY
   $f = $query->execute( [ "data-array" ], CONSISTENCY_ANY );

   $stream = "";
   wait_for_stream { length $stream >= 8 + 36 } $S2 => $stream;

   # OPCODE_EXECUTE
   is_hexstr( $stream,
              "\x01\x00\x01\x0A\0\0\0\x24" .
                 "\x00\x100123456789ABCDEF" .
                 "\x00\x01" . "\0\0\0\x0adata-array" .
                 "\x00\x00",
              'stream after $query->execute(ARRAY)' );

   # OPCODE_RESULT
   $S2->syswrite( "\x81\x00\x01\x08\0\0\0\4\0\0\0\1" );

   wait_for { $f->is_ready };

   is_deeply( [ $f->get ], [],
              '->execute returns nothing' );

   # ->execute via $query from HASH
   $f = $query->execute( { f => "data-hash" }, CONSISTENCY_ANY );

   $stream = "";
   wait_for_stream { length $stream >= 8 + 35 } $S2 => $stream;

   # OPCODE_EXECUTE
   is_hexstr( $stream,
              "\x01\x00\x01\x0A\0\0\0\x23" .
                 "\x00\x100123456789ABCDEF" .
                 "\x00\x01" . "\0\0\0\x09data-hash" .
                 "\x00\x00",
              'stream after $query->execute(HASH)' );

   # OPCODE_RESULT
   $S2->syswrite( "\x81\x00\x01\x08\0\0\0\4\0\0\0\1" );

   wait_for { $f->is_ready };

   is_deeply( [ $f->get ], [],
              '->execute returns nothing' );

   is_oneref( $query, '$query has refcount 1 before EOF' );
   undef $query;

   # Should now be weak with a timer
   # CHEATING
   ok( defined $cass->{queries_by_cql}{+CQL_STRING}{pacemaker},
       'Query has expiry timer' );

   # A second ->prepare should re-vivify it
   $f = $cass->prepare( CQL_STRING );

   ok( $f->is_ready, '->prepare again is ready immediately' );
   ok( !defined $cass->{queries_by_cql}{+CQL_STRING}{pacemaker},
       'Expiry timer cancelled after re-vivify' );

   # Now drop it one last time
   undef $f;
   undef $query;

   # Rather than wait for the timer, just fire it now
   $cass->{queries_by_cql}{+CQL_STRING}{ttl} = 0;
   $cass->{queries_by_cql}{+CQL_STRING}{pacemaker}->done;

   ok( !keys %{ $cass->{queries_by_cql} },
       '$cass has no more cached queries after timer expire' );
}

# CQL v2 returns result metadata from PREPARED, so we should use no_metadata on
# execute
{
   $cass->configure( cql_version => 2 );
   $conn->configure( cql_version => 2 );

   my $f = $cass->prepare( "SELECT a, b FROM table WHERE b = ?" );

   my $stream = "";
   wait_for_stream { length $stream >= 8 + 38 } $S2 => $stream; # TODO

   # OPCODE_PREPARE
   is_hexstr( $stream,
              "\x02\x00\x01\x09\0\0\0\x26" .
                 "\0\0\0\x22SELECT a, b FROM table WHERE b = ?",
              'stream after ->prepare v2' );

   # OPCODE_RESULT
   $S2->syswrite( "\x82\x00\x01\x08\0\0\0\x4f\0\0\0\4" .
                     "\x00\x100123456789ABCDFE" .
                     "\0\0\0\1\0\0\0\1\0\4test\0\5table\0\1b\x00\x0D" .
                     "\0\0\0\1\0\0\0\2\0\4test\0\5table\0\1a\x00\x09\0\1b\x00\x0D" );

   wait_for { $f->is_ready };

   my $query = $f->get;

   is( $query->params, 1, '$query->params' );
   ok( defined $query->result_meta, '$query->result_meta defined' );
   is( $query->result_meta->columns, 2, '$query->result_meta->columns' );

   # ->execute
   $f = $query->execute( [ "the-key" ], CONSISTENCY_ANY );

   $stream = "";
   wait_for_stream { length $stream >= 8 + 34 } $S2 => $stream;

   # OPCODE_EXECUTE
   is_hexstr( $stream,
              "\x02\x00\x01\x0A\0\0\0\x22" .
                 "\x00\x100123456789ABCDFE" .
                 "\x00\x00\x03\x00\x01" .
                    "\0\0\0\7the-key",
              'stream after $query->execute(ARRAY) for v2' );

   # OPCODE_RESULT
   $S2->syswrite( "\x82\x00\x01\x08\0\0\0\x23\0\0\0\2" .
                     "\0\0\0\4\0\0\0\2" .
                     "\0\0\0\1" .
                     "\0\0\0\4\x00\x00\x02\x46\0\0\0\7the-key" );

   wait_for { $f->is_ready };

   my ( $type, $result ) = $f->get;

   is( $result->columns, 2, '$result->columns' );
   is( $result->rows,    1, '$result->rows' );

   is( $result->column_shortname( 0 ), "a", '$result->column_shortname' );

   is_deeply( $result->row_array( 0 ),
              [ 0x0246, "the-key" ],
              '$result->row_array' );
}

done_testing;