The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl

use strict;
use warnings;

use IO::Async::Loop;
use Net::Async::CassandraCQL;
use Protocol::CassandraCQL qw( :consistencies );

use Getopt::Long;

GetOptions(
   'host|h=s' => \(my $HOST = "localhost"),
   'user|u=s' => \my $USERNAME,
   'pass|p=s' => \my $PASSWORD,
   'keyspace|k=s' => \my $KEYSPACE,
   'interval|i=f' => \(my $INTERVAL = 1),
   'recreate' => \my $RECREATE,
   'primaries|P=i' => \(my $PRIMARIES = 1),
) or exit 1;

defined $KEYSPACE or die "Need a --keyspace\n";

my $loop = IO::Async::Loop->new;

my $cass = Net::Async::CassandraCQL->new(
   host     => $HOST,
   username => $USERNAME,
   password => $PASSWORD,
   keyspace => $KEYSPACE,

   primaries => $PRIMARIES,
);
$loop->add( $cass );

$cass->connect->get;

# We don't use counter columns for this test, because they don't appear to
# work properly when multiple cassandra are hosted on the same physical
# machine

if( $RECREATE ) {
   # don't care if it succeeded; if it fails that's probably because it existed
   $loop->await( $cass->query( "DROP TABLE pinger", CONSISTENCY_ALL ) );

   $cass->query( "CREATE TABLE pinger (key text PRIMARY KEY, count int)", CONSISTENCY_ALL )->get;

   sleep 10;
}

$cass->query( "INSERT INTO pinger (key, count) VALUES ('pingcount', 0)", CONSISTENCY_ALL )->get;

my ( $select_smt, $update_smt ) = Future->needs_all(
   $cass->prepare( "SELECT count FROM pinger WHERE key = 'pingcount'" ),
   $cass->prepare( "UPDATE pinger SET count = ? WHERE key = 'pingcount'" ),
)->get;

while(1) {
   my $f;

   $cass->_debug_print_nodes;

   $f = $select_smt->execute( [], CONSISTENCY_QUORUM );
   $f->await until $f->is_ready;
   if( $f->failure ) {
      warn scalar $f->failure;
   }
   else {
      my ( $type, $result ) = $f->get;
      my $count = $result->row_array(0)->[0];
      print "COUNT $count\n";

      $f = $update_smt->execute( [ $count+1 ], CONSISTENCY_QUORUM );
      $f->await until $f->is_ready;
      warn scalar $f->failure if $f->failure
   }

   $loop->delay_future( after => $INTERVAL )->get;
}