The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl

use strict;
use warnings;

use Test::More;
use Test::Identity;
use Test::Refcount;

use t::MockConnection;
use Socket qw( pack_sockaddr_in inet_aton );

use Net::Async::CassandraCQL;
use Protocol::CassandraCQL::Result 0.06;

# Mock the ->_connect_node method
no warnings 'redefine';
my %conns;
local *Net::Async::CassandraCQL::_connect_node = sub {
   my $self = shift;
   my ( $connect_host, $connect_service ) = @_;
   $conns{$connect_host} = my $conn = t::MockConnection->new( $connect_host );
   return Future->new->done( $conn );
};

my $cass = Net::Async::CassandraCQL->new(
   host => "10.0.0.1",
);

my $f = $cass->connect;

ok( my $c = $conns{"10.0.0.1"}, 'Connected to 10.0.0.1' );

# Initial nodelist query
$c->send_nodelist(
   local => { dc => "DC1", rack => "rack1" },
   peers => {
      "10.0.0.2" => { dc => "DC1", rack => "rack1" },
   },
);

$f->get;

ok( $c->is_registered, 'Using 10.0.0.1 for events' );

# status changes
{
   my $nodeid;
   $cass->configure(
      on_node_up   => sub { ( undef, $nodeid ) = @_; },
      on_node_down => sub { ( undef, $nodeid ) = @_; },
   );

   # DOWN
   ok( !defined $cass->{nodes}{"10.0.0.2"}{down_time}, 'Node 10.0.0.2 does not yet have down_time' );

   $conns{"10.0.0.1"}->invoke_event(
      on_status_change => DOWN => pack_sockaddr_in( 0, inet_aton( "10.0.0.2" ) ),
   );

   ok( defined $cass->{nodes}{"10.0.0.2"}{down_time}, 'Node 10.0.0.2 has down_time after STATUS_CHANGE DOWN' );
   is( $nodeid, "10.0.0.2", '$nodeid to cluster on_node_down' );
   undef $nodeid;

   # UP
   $conns{"10.0.0.1"}->invoke_event(
      on_status_change => UP => pack_sockaddr_in( 0, inet_aton( "10.0.0.2" ) ),
   );

   ok( !defined $cass->{nodes}{"10.0.0.2"}{down_time}, 'Node 10.0.0.2 no longer has down_time after STATUS_CHANGE UP' );
   is( $nodeid, "10.0.0.2", '$nodeid to cluster on_node_up' );
}

# topology changes
{
   my $nodeid;
   $cass->configure(
      on_node_new    => sub { ( undef, $nodeid ) = @_; },
      on_node_removed => sub { ( undef, $nodeid ) = @_; },
   );

   # NEW
   ok( !defined $cass->{nodes}{"10.0.0.4"}, 'Node 10.0.0.4 does not yet exist' );
   $conns{"10.0.0.1"}->invoke_event(
      on_topology_change => NEW_NODE => pack_sockaddr_in( 0, inet_aton( "10.0.0.4" ) ),
   );

   ok( defined $cass->{nodes}{"10.0.0.4"}, 'Node 10.0.0.4 exists' );

   ok( my $q = $conns{"10.0.0.1"}->next_query, 'Peerlist query exists' );
   is( $q->[1], "SELECT peer, data_center, rack FROM system.peers WHERE peer = '10.0.0.4'", 'Peerlist query CQL' );
   $q->[2]->done( rows =>
      Protocol::CassandraCQL::Result->new(
         columns => [
            [ system => peers => peer        => "VARCHAR" ],
            [ system => peers => data_center => "VARCHAR" ],
            [ system => peers => rack        => "VARCHAR" ],
         ],
         rows => [
            [ "\x0a\0\0\4", "DC1", "rack1" ],
         ],
      )
   );

   is( $nodeid, "10.0.0.4", '$nodeid to cluster on_node_new' );
   ok( defined $cass->{nodes}{"10.0.0.4"}{data_center}, 'Node 10.0.0.4 has known DC' );
   undef $nodeid;

   # DELETE
   $conns{"10.0.0.1"}->invoke_event(
      on_topology_change => REMOVED_NODE => pack_sockaddr_in( 0, inet_aton( "10.0.0.4" ) ),
   );

   ok( !defined $cass->{nodes}{"10.0.0.4"}, 'Node 10.0.0.4 no longer exists' );

   is( $nodeid, "10.0.0.4", '$nodeid to cluster on_node_removed' );
}


done_testing;