The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl

use strict;
use warnings;

use Test::More;
use Test::HexString;

use IO::Async::Test;
use IO::Async::OS;
use IO::Async::Loop;
use IO::Async::Stream;

use Net::Async::CassandraCQL::Connection;
use Protocol::CassandraCQL qw( CONSISTENCY_ANY CONSISTENCY_ONE CONSISTENCY_TWO );

my $loop = IO::Async::Loop->new();
testing_loop( $loop );

my ( $S1, $S2 ) = IO::Async::OS->socketpair() or die "Cannot create socket pair - $!";

my $conn = Net::Async::CassandraCQL::Connection->new(
   handle => $S1,
);

$loop->add( $conn );

# ->startup
{
   my $f = $conn->startup;

   my $stream = "";
   wait_for_stream { length $stream >= 8 + 22 } $S2 => $stream;

   # OPCODE_STARTUP
   # Since this map will contain 2 elements, we'll have to detect the order
   # and supply the appropriate bytestring
   is_hexstr( $stream,
              "\x01\x00\x01\x01\0\0\0\x2b" .
                 "\x00\x02" . ( $stream =~ m/CQL_VERSION.*COMPRESSION/
                    ? "\x00\x0bCQL_VERSION\x00\x053.0.5\x00\x0bCOMPRESSION\x00\x06Snappy"
                    : "\x00\x0bCOMPRESSION\x00\x06Snappy\x00\x0bCQL_VERSION\x00\x053.0.5" ),
              'stream after ->startup' );

   # OPCODE_READY
   $S2->syswrite( "\x81\x00\x01\x02\0\0\0\0" );

   wait_for { $f->is_ready };

   is_deeply( [ $f->get ], [],
              '->startup->get returns nothing' );

   $conn->configure( username => "the-user", password => "the-pass" );

   $f = $conn->startup;

   $stream = "";
   wait_for_stream { length $stream >= 8 + 22 } $S2 => $stream;

   # OPCODE_AUTHENTICATE
   $S2->syswrite( "\x81\x00\x01\x03\0\0\0\x31" .
      "\0\x2forg.apache.cassandra.auth.PasswordAuthenticator" );

   $stream = "";
   wait_for_stream { length $stream >= 8 + 0 } $S2 => $stream;

   # OPCODE_CREDENTIALS
   is_hexstr( $stream,
              "\x01\x00\x01\x04\0\0\0\x2a" .
                 "\0\2\0\x08password\0\x08the-pass" .
                     "\0\x08username\0\x08the-user",
              'stream has credentials after authentication required' );

   # OPCODE_READY
   $S2->syswrite( "\x81\x00\x01\x02\0\0\0\0" );

   wait_for { $f->is_ready };

   is_deeply( [ $f->get ], [],
              '->startup->get returns nothing after authentication' );
}

# ->options
{
   my $f = $conn->options;

   my $stream = "";
   wait_for_stream { length $stream >= 8 } $S2 => $stream;

   # OPCODE_OPTIONS
   is_hexstr( $stream,
              "\x01\x00\x01\x05\0\0\0\0",
              'stream after ->options' );

   # OPCODE_READY
   $S2->syswrite( "\x81\x00\x01\x06\0\0\0\x2f\0\2" .
                  "\x00\x0bCOMPRESSION\0\1\x00\x06snappy" .
                  "\x00\x0bCQL_VERSION\0\1\x00\x053.0.0" );

   wait_for { $f->is_ready };

   is_deeply( scalar $f->get,
              { COMPRESSION => ["snappy"], CQL_VERSION => ["3.0.0"] },
              '->options->get returns HASH of options' );
}

# ->query returning void
{
   my $f = $conn->query( "INSERT INTO things (name) VALUES ('thing')", CONSISTENCY_ANY );

   my $stream = "";
   wait_for_stream { length $stream >= 8 + 48 } $S2 => $stream;

   # OPCODE_QUERY
   is_hexstr( $stream,
              "\x01\x00\x01\x07\0\0\0\x30" .
                 "\0\0\0\x2aINSERT INTO things (name) VALUES ('thing')\0\0",
              'stream after ->query INSERT' );

   # OPCODE_RESULT
   $S2->syswrite( "\x81\x00\x01\x08\0\0\0\4\0\0\0\1" );

   wait_for { $f->is_ready };

   is_deeply( [ $f->get ], [],
              '->query returns nothing' );
}

# ->query returning rows
{
   my $f = $conn->query( "SELECT a,b FROM c", CONSISTENCY_ONE );

   my $stream = "";
   wait_for_stream { length $stream >= 8 + 23 } $S2 => $stream;

   # OPCODE_QUERY
   is_hexstr( $stream,
              "\x01\x00\x01\x07\0\0\0\x17" .
                 "\0\0\0\x11SELECT a,b FROM c\0\1",
              'stream after ->query SELECT' );

   # OPCODE_RESULT
   $S2->syswrite( "\x81\x00\x01\x08\0\0\0\x34\0\0\0\2" .
                     "\0\0\0\1\0\0\0\2\0\4test\0\1c\0\1a\x00\x0D\0\1b\x00\x09" . # metadata
                     "\0\0\0\1" . # row count
                     "\0\0\0\5hello\0\0\0\4\0\0\0\x64" # row 0
                  );

   wait_for { $f->is_ready };

   is( scalar $f->get, "rows", '->query SELECT returns rows' );

   my $result = ( $f->get )[1];
   is( $result->columns, 2, '$result->columns' );
   is( $result->rows, 1, '$result->rows' );
}

# ->query returning set_keyspace
{
   my $f = $conn->query( "USE test", CONSISTENCY_ANY );

   my $stream = "";
   wait_for_stream { length $stream >= 8 + 12 } $S2 => $stream;

   # OPCODE_QUERY
   is_hexstr( $stream,
              "\x01\x00\x01\x07\0\0\0\x0e" .
                 "\0\0\0\x08USE test\0\0",
              'stream after ->query USE' );

   # OPCODE_RESULT
   $S2->syswrite( "\x81\x00\x01\x08\0\0\0\x0a\0\0\0\3\0\4test" );

   wait_for { $f->is_ready };

   is_deeply( [ $f->get ], [ keyspace => "test" ],
              '->query USE returns keyspace' );
}

# ->query returning schema_change
{
   my $f = $conn->query( "DROP TABLE users", CONSISTENCY_ANY );

   my $stream = "";
   wait_for_stream { length $stream >= 8 + 20 } $S2 => $stream;

   # OPCODE_QUERY
   is_hexstr( $stream,
              "\x01\x00\x01\x07\0\0\0\x16" .
                 "\0\0\0\x10DROP TABLE users\0\0",
              'stream after ->query DROP TABLE' );

   # OPCODE_RESULT
   $S2->syswrite( "\x81\x00\x01\x08\0\0\0\x1a\0\0\0\5\0\7DROPPED\0\4test\0\5users" );

   wait_for { $f->is_ready };

   is_deeply( [ $f->get ], [ schema_change => [qw( DROPPED test users )] ],
              '->query DROP TABLE returns schema change' );
}

# ->prepare and ->execute
{
   my $f = $conn->prepare( "INSERT INTO t (f) = (?)" );

   my $stream = "";
   wait_for_stream { length $stream >= 8 + 27 } $S2 => $stream;

   # OPCODE_PREPARE
   is_hexstr( $stream,
              "\x01\x00\x01\x09\0\0\0\x1b" .
                 "\0\0\0\x17INSERT INTO t (f) = (?)",
              'stream after ->prepare' );

   # OPCODE_RESULT
   $S2->syswrite( "\x81\x00\x01\x08\0\0\0\x2c\0\0\0\4" .
                     "\x00\x100123456789ABCDEF" .
                     "\0\0\0\1\0\0\0\1\0\4test\0\1t\0\1f\x00\x0D" );

   wait_for { $f->is_ready };

   my $query = $f->get;
   is( $query->id, "0123456789ABCDEF", '$query->id after ->prepare->get' );

   # ->execute directly
   $f = $conn->execute( "0123456789ABCDEF", [ "more-data" ], CONSISTENCY_ANY );

   $stream = "";
   wait_for_stream { length $stream >= 8 + 35 } $S2 => $stream;

   # OPCODE_EXECUTE
   is_hexstr( $stream,
              "\x01\x00\x01\x0A\0\0\0\x23" .
                 "\x00\x100123456789ABCDEF" .
                 "\x00\x01" . "\0\0\0\x09more-data" .
                 "\x00\x00",
              'stream after ->execute' );

   # OPCODE_RESULT
   $S2->syswrite( "\x81\x00\x01\x08\0\0\0\4\0\0\0\1" );

   wait_for { $f->is_ready };

   is_deeply( [ $f->get ], [],
              '->execute returns nothing' );
}

done_testing;