The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl

use strict;
use warnings;

use Test::More;
use Test::Identity;
use Test::Refcount;

use t::MockConnection;

use Net::Async::CassandraCQL;
use Protocol::CassandraCQL::Result 0.06;

# Mock the ->_connect_node method
no warnings 'redefine';
my %conns;
local *Net::Async::CassandraCQL::_connect_node = sub {
   my $self = shift;
   my ( $connect_host, $connect_service ) = @_;
   $conns{$connect_host} = my $conn = t::MockConnection->new( $connect_host );
   $self->add_child( $conn );
   return Future->new->done( $conn );
};

my $cass = Net::Async::CassandraCQL->new(
   host => "10.0.0.1",
);

my $f = $cass->connect;

ok( my $c = $conns{"10.0.0.1"}, 'have a connection to 10.0.0.1' );

# Initial nodelist query
while( my $q = $c->next_query ) {
   if( $q->[1] eq "SELECT data_center, rack FROM system.local" ) {
      pass( "Query on system.local" );
      $q->[2]->done( rows =>
         Protocol::CassandraCQL::Result->new(
            columns => [
               [ system => local => data_center => "VARCHAR" ],
               [ system => local => rack        => "VARCHAR" ],
            ],
            rows => [
               [ "DC1", "rack1" ],
            ],
         )
      );
   }
   elsif( $q->[1] eq "SELECT peer, data_center, rack FROM system.peers" ) {
      pass( "Query on system.peers" );
      $q->[2]->done( rows =>
         Protocol::CassandraCQL::Result->new(
            columns => [
               [ system => peers => peer        => "VARCHAR" ],
               [ system => peers => data_center => "VARCHAR" ],
               [ system => peers => rack        => "VARCHAR" ],
            ],
            rows => [
               [ "\x0a\0\0\2", "DC1", "rack1" ],
            ],
         ),
      );
   }
   else {
      fail( "Unexpected initial query $q->[1]" );
   }
}

ok( $f->is_ready, '->connect now done after initial queries' );
$f->get;

my $query;
{
   my $f = $cass->prepare( "INSERT INTO t (f) = (?)" );

   ok( my $p = $c->next_prepare, '->prepare pending' );

   is( $p->[0], "10.0.0.1", 'nodeid of pending prepare' );
   is( $p->[1], "INSERT INTO t (f) = (?)", 'cql of pending prepare' );

   $p->[2]->done(
      Net::Async::CassandraCQL::Query->new(
         cassandra   => $cass,
         cql         => $p->[1],
         id          => "0123456789ABCDEF",
         params_meta => Protocol::CassandraCQL::ColumnMeta->new(
            columns   => [
               [ test => t => f => "VARINT" ],
            ],
         ),
      )
   );

   $query = $f->get;

   ok( defined $query, '$query defined after ->prepare->get' );

   undef $f;
   undef $p;
   is_oneref( $query, '$query has refcount 1 initially' );
}

# Fake closure
undef $conns{"10.0.0.1"};
$cass->_closed_node( "10.0.0.1" );

ok( $c = $conns{"10.0.0.2"}, 'new primary node picked' );

# Prepared statements
{
   ok( my $p = $c->next_prepare, 'prepare pending after reconnect' );

   is( $p->[0], "10.0.0.2", 'nodeid of pending prepare after reconnect' );
   is( $p->[1], "INSERT INTO t (f) = (?)", 'cql of pending prepare after reconnect' );

   $p->[2]->done;
}

# ->query after reconnect
{
   my $f = $cass->query( "GET THING", 0 );
   ok( defined $f, 'defined ->query after reconnect' );

   ok( my $q = $c->next_query, '->query after reconnect creates query' );

   like( $q->[0], qr/^10\.0\.0\.[23]$/, '$q conn' );
   is( $q->[1], "GET THING", '$q cql' );
   $q->[2]->done( result => "here" );

   is_deeply( [ $f->get ], [ result => "here" ], '$q result' );
}

# CHEATING
# $query->DESTROY will try to register it for late expiry on the underlying
# Cassandra object. We can convince it not to do that
undef $query->{cassandra};

{
   require IO::Async::Loop;
   my $loop = IO::Async::Loop->new;
   $loop->add( $cass );

   my $f = $cass->close_when_idle;

   identical( scalar $f->get, $cass, '->close_when_idle future yields $cass' );

   $loop->remove( $cass );
}

done_testing;