The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#  You may distribute under the terms of either the GNU General Public License
#  or the Artistic License (the same terms as Perl itself)
#
#  (C) Paul Evans, 2013-2014 -- leonerd@leonerd.org.uk

package Protocol::CassandraCQL::Client;

use strict;
use warnings;

our $VERSION = '0.11';

use base qw( IO::Socket::IP );

use Carp;

use Protocol::CassandraCQL qw(
   :opcodes :results
   send_frame recv_frame FLAG_COMPRESS
);
use Protocol::CassandraCQL::Frame;
use Protocol::CassandraCQL::Frames qw(
   build_startup_frame
   build_credentials_frame
   build_query_frame

   parse_error_frame
   parse_authenticate_frame
   parse_result_frame
);
use Protocol::CassandraCQL::Result;

use Compress::Snappy qw( compress decompress );

use constant DEFAULT_CQL_PORT => 9042;

use constant MAX_SUPPORTED_VERSION => 2;

=head1 NAME

C<Protocol::CassandraCQL::Client> - a minimal Cassandra CQL client

=head1 SYNOPSIS

 use Protocol::CassandraCQL::Client;
 use Protocol::CassandraCQL qw( CONSISTENCY_QUORUM );

 my $cass = Protocol::CassandraCQL::Client->new(
    PeerHost => "localhost",
    Keyspace => "my-keyspace",
 );

 my ( undef, $result ) = $cass->query( "SELECT v FROM numbers" );

 foreach my $row ( $result->rows_hash ) {
    say "We have a number $row->{v}";
 }

=head1 DESCRIPTION

This subclass of L<IO::Socket::IP> implements a client that can execute
queries on a Cassandra CQL database. It is not intended as a complete client,
is simply provides enough functionallity to test that the protocol handling is
working, and is used to implement the bundled F<examples/cqlsh> utility.

For a more complete client, see instead L<Net::Async::CassandraCQL>.

=cut

=head1 CONSTRUCTOR

=cut

=head2 $cass = Protocol::CassandraCQL::Client->new( %args )

Takes the following arguments in addition to those accepted by
L<IO::Socket::IP>:

=over 8

=item Username => STRING

=item Password => STRING

Authentication credentials if required by the server.

=item Keyspace => STRING

If defined, selects the keyspace to C<USE> after connection.

=item CQLVersion => INT

If defined, sets the CQL protocol version that will be negotiated. If omitted
will default to 1.

=back

=cut

sub new
{
   my $class = shift;
   my %args = @_ == 1 ? ( PeerHost => $_[0] ) : @_;

   $args{PeerService} ||= DEFAULT_CQL_PORT;

   my $self = $class->SUPER::new( %args ) or return;

   ${*$self}{Cassandra_version} = $args{CQLVersion} // 1; # default 1
   $self->_version <= MAX_SUPPORTED_VERSION or
      croak "CQLVersion too high - maximum supported is " . MAX_SUPPORTED_VERSION;

   $self->startup( %args );
   $self->use_keyspace( $args{Keyspace} ) if defined $args{Keyspace};

   return $self;
}

sub _version
{
   my $self = shift;
   return ${*$self}{Cassandra_version};
}

=head1 METHODS

=cut

=head2 ( $result_op, $result_frame ) = $cass->send_message( $opcode, $frame )

Sends a message with the given opcode and L<Protocol::CassandraCQL::Frame> for
the message body. Waits for a response to be received, and returns it.

If the response opcode is C<OPCODE_ERROR> then the error message string is
thrown directly as an exception; this method will only return in non-error
cases.

=cut

sub send_message
{
   my $self = shift;
   my ( $opcode, $frame ) = @_;

   {
      my $flags = 0;
      my $body = $frame->bytes;

      my $body_compressed = compress( $body );
      if( length $body_compressed < length $body ) {
         $body = $body_compressed;
         $flags |= FLAG_COMPRESS;
      }

      send_frame( $self, $self->_version, $flags, 0, $opcode, $body );
   }

   my ( $version, $flags, $streamid, $result_op, $body ) = recv_frame( $self ) or croak "Unable to ->recv: $!";

   $version & 0x80 or croak "Expected response frame to have RESPONSE bit set";
   $version &= 0x7f;

   $version <= $self->_version or
      croak sprintf "Received message version too high to parse (%d)", $version;

   if( $flags & FLAG_COMPRESS ) {
      $body = decompress( $body );
      $flags &= ~FLAG_COMPRESS;
   }
   $flags == 0 or
      croak sprintf "Unexpected flags 0x%02x", $flags;

   $streamid == 0 or
      croak "Unexpected stream ID $streamid";

   my $response = Protocol::CassandraCQL::Frame->new( $body );

   if( $result_op == OPCODE_ERROR ) {
      my ( undef, $message ) = parse_error_frame( $version, $response );
      croak "OPCODE_ERROR: $message";
   }

   # Version check after OPCODE_ERROR in case of "insupported version" error
   $version == $self->_version or
      croak sprintf "Unexpected message version %#02x", $version;

   return ( $result_op, $response );
}

sub startup
{
   my $self = shift;
   my %args = @_;

   my ( $op, $response ) = $self->send_message( OPCODE_STARTUP,
      build_startup_frame( $self->_version, options => {
         CQL_VERSION => "3.0.5",
         COMPRESSION => "Snappy",
      } ),
   );

   if( $op == OPCODE_AUTHENTICATE ) {
      my ( $authenticator ) = parse_authenticate_frame( $self->_version, $response );
      if( $authenticator eq "org.apache.cassandra.auth.PasswordAuthenticator" ) {
         defined $args{Username} and defined $args{Password} or
            croak "Cannot authenticate without a username/password";

         ( $op, $response ) = $self->send_message( OPCODE_CREDENTIALS,
            build_credentials_frame( $self->_version, credentials => {
               username => $args{Username},
               password => $args{Password},
            } )
         );
      }
      else {
         croak "Unrecognised authenticator $authenticator";
      }
   }

   $op == OPCODE_READY or croak "Expected OPCODE_READY";
}

=head2 ( $type, $result ) = $cass->query( $cql, $consistency )

Performs a CQL query and returns the result, as decoded by
L<Protocol::CassandraCQL::Frames/parse_result_frame>.

For C<USE> queries, the type is C<RESULT_SET_KEYSPACE> and C<$result> is a
string giving the name of the new keyspace.

For C<CREATE>, C<ALTER> and C<DROP> queries, the type is
C<RESULT_SCHEMA_CHANGE> and C<$result> is a 3-element ARRAY reference
containing the type of change, the keyspace and the table name.

For C<SELECT> queries, the type is C<RESULT_ROWS> and C<$result> is an
instance of L<Protocol::CassandraCQL::Result> containing the returned row
data.

For other queries, such as C<INSERT>, C<UPDATE> and C<DELETE>, the method
returns C<RESULT_VOID> and C<$result> is C<undef>.

=cut

sub query
{
   my $self = shift;
   my ( $cql, $consistency ) = @_;

   my ( $op, $response ) = $self->send_message( OPCODE_QUERY,
      build_query_frame( $self->_version, cql => $cql, consistency => $consistency )
   );

   $op == OPCODE_RESULT or croak "Expected OPCODE_RESULT";
   return parse_result_frame( $self->_version, $response );
}

=head2 ( $type, $result ) = $cass->use_keyspace( $keyspace )

A convenient shortcut to the C<USE $keyspace> query which escapes the keyspace
name.

=cut

sub use_keyspace
{
   my $self = shift;
   my ( $keyspace ) = @_;

   # CQL's "quoting" handles any character except quote marks, which have to
   # be doubled
   $keyspace =~ s/"/""/g;

   $self->query( qq(USE "$keyspace"), 0 );
}

=head1 TODO

=over 8

=item *

Consider how the server's maximum supported CQL version can be detected on
startup. This is made hard by the fact that the server closes the connection
if the version is too high, so we'll have to reconnect it.

=back

=cut

=head1 SPONSORS

This code was paid for by

=over 2

=item *

Perceptyx L<http://www.perceptyx.com/>

=item *

Shadowcat Systems L<http://www.shadow.cat>

=back

=head1 AUTHOR

Paul Evans <leonerd@leonerd.org.uk>

=cut

0x55AA;