# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
# (C) Paul Evans, 2013-2014 -- leonerd@leonerd.org.uk
package Protocol::CassandraCQL;
use strict;
use warnings;
our $VERSION = '0.12';
use Exporter 'import';
our @EXPORT_OK = qw(
parse_frame recv_frame
build_frame send_frame
lookup_consistency
);
=head1 NAME
C<Protocol::CassandraCQL> - wire protocol support functions for Cassandra CQL
=head1 DESCRIPTION
This module provides the basic constants and other support functions required
to communicate with a Cassandra database using C<CQL>. It is not in itself a
CQL client; it simply provides the necessary support functions to allow one to
be written. It supports the additions added by C<CQL> version 2.
For a complete client, see instead L<Net::Async::CassandraCQL>.
=cut
=head1 CONSTANTS
The following families of constants are defined, along with export tags:
=head2 FLAG_* (:flags)
Bitmask of flags used in message frames.
=head2 OPCODE_* (:opcodes)
Opcodes used in message frames.
=head2 QUERY_* (:queryflags)
Flag constants used in C<OPCODE_QUERY> frames.
=head2 BATCH_* (:batches)
Type constants used in C<OPCODE_BATCH> frames.
=head2 RESULT_* (:results)
Result codes used in C<OPCODE_RESULT> frames.
=head2 ROWS_* (:rowflags)
Flag constants used in C<RESULT_ROWS> frames.
=head2 TYPE_* (:types)
Type codes used in C<TYPE_ROWS> and C<TYPE_PREPARED> column metadata.
=head2 CONSISTENCY_* (:consistencies)
Consistency levels used in C<OPCODE_QUERY> and C<OPCODE_EXECUTE> frames.
=cut
# See also
# https://github.com/apache/cassandra/blob/cassandra-1.2/doc/native_protocol.spec
my %CONSTANTS = (
FLAG_COMPRESS => 0x01,
FLAG_TRACE => 0x02,
OPCODE_ERROR => 0x00,
OPCODE_STARTUP => 0x01,
OPCODE_READY => 0x02,
OPCODE_AUTHENTICATE => 0x03,
OPCODE_CREDENTIALS => 0x04,
OPCODE_OPTIONS => 0x05,
OPCODE_SUPPORTED => 0x06,
OPCODE_QUERY => 0x07,
OPCODE_RESULT => 0x08,
OPCODE_PREPARE => 0x09,
OPCODE_EXECUTE => 0x0A,
OPCODE_REGISTER => 0x0B,
OPCODE_EVENT => 0x0C,
OPCODE_BATCH => 0x0D,
OPCODE_AUTH_CHALLENGE => 0x0E,
OPCODE_AUTH_RESPONSE => 0x0F,
OPCODE_AUTH_SUCCESS => 0x10,
QUERY_VALUES => 0x0001,
QUERY_SKIP_METADATA => 0x0002,
QUERY_PAGE_SIZE => 0x0004,
QUERY_WITH_PAGING_STATE => 0x0008,
QUERY_WITH_SERIAL_CONSISTENCY => 0x0010,
BATCH_LOGGED => 0,
BATCH_UNLOGGED => 1,
BATCH_COUNTER => 2,
RESULT_VOID => 0x0001,
RESULT_ROWS => 0x0002,
RESULT_SET_KEYSPACE => 0x0003,
RESULT_PREPARED => 0x0004,
RESULT_SCHEMA_CHANGE => 0x0005,
ROWS_HAS_GLOBALTABLESPEC => 0x0001,
ROWS_HAS_MORE_PAGES => 0x0002,
ROWS_NO_METADATA => 0x0004,
TYPE_CUSTOM => 0x0000,
TYPE_ASCII => 0x0001,
TYPE_BIGINT => 0x0002,
TYPE_BLOB => 0x0003,
TYPE_BOOLEAN => 0x0004,
TYPE_COUNTER => 0x0005,
TYPE_DECIMAL => 0x0006,
TYPE_DOUBLE => 0x0007,
TYPE_FLOAT => 0x0008,
TYPE_INT => 0x0009,
TYPE_TEXT => 0x000A,
TYPE_TIMESTAMP => 0x000B,
TYPE_UUID => 0x000C,
TYPE_VARCHAR => 0x000D,
TYPE_VARINT => 0x000E,
TYPE_TIMEUUID => 0x000F,
TYPE_INET => 0x0010,
TYPE_LIST => 0x0020,
TYPE_MAP => 0x0021,
TYPE_SET => 0x0022,
CONSISTENCY_ANY => 0x0000,
CONSISTENCY_ONE => 0x0001,
CONSISTENCY_TWO => 0x0002,
CONSISTENCY_THREE => 0x0003,
CONSISTENCY_QUORUM => 0x0004,
CONSISTENCY_ALL => 0x0005,
CONSISTENCY_LOCAL_QUORUM => 0x0006,
CONSISTENCY_EACH_QUORUM => 0x0007,
CONSISTENCY_SERIAL => 0x0008,
CONSISTENCY_LOCAL_SERIAL => 0x0009,
CONSISTENCY_LOCAL_ONE => 0x000A,
);
require constant;
constant->import( $_, $CONSTANTS{$_} ) for keys %CONSTANTS;
push @EXPORT_OK, keys %CONSTANTS;
our %EXPORT_TAGS = (
'flags' => [ grep { m/^FLAG_/ } keys %CONSTANTS ],
'opcodes' => [ grep { m/^OPCODE_/ } keys %CONSTANTS ],
'queryflags' => [ grep { m/^QUERY_/ } keys %CONSTANTS ],
'batches' => [ grep { m/^BATCH_/ } keys %CONSTANTS ],
'results' => [ grep { m/^RESULT_/ } keys %CONSTANTS ],
'rowflags' => [ grep { m/^ROWS_/ } keys %CONSTANTS ],
'types' => [ grep { m/^TYPE_/ } keys %CONSTANTS ],
'consistencies' => [ grep { m/^CONSISTENCY_/ } keys %CONSTANTS ],
);
=head1 FUNCTIONS
=cut
=head2 ( $version, $flags, $streamid, $opcode, $body ) = parse_frame( $bytes )
Attempts to parse a complete message frame from the given byte string. If it
succeeds, it returns the header fields and the body as an opaque byte string.
If it fails, it returns an empty list.
If successful, it will remove the bytes of the message from the C<$bytes>
scalar, which must therefore be mutable.
=cut
sub parse_frame
{
return unless length $_[0] >= 8; # header length
my $bodylen = unpack( "x4 N", $_[0] );
return unless length $_[0] >= 8 + $bodylen;
# Now committed to extracting a frame
my ( $version, $flags, $streamid, $opcode ) = unpack( "C C C C x4", substr $_[0], 0, 8, "" );
my $body = substr $_[0], 0, $bodylen, "";
return ( $version, $flags, $streamid, $opcode, $body );
}
=head2 ( $version, $flags, $streamid, $opcode, $body ) = recv_frame( $fh )
Attempts to read a complete frame from the given filehandle, blocking until it
is available. If an IO error happens, returns an empty list. The results are
undefined if this method is called on a non-blocking filehandle.
=cut
sub recv_frame
{
my ( $fh ) = @_;
$fh->read( my $header, 8 ) or return;
my ( $version, $flags, $streamid, $opcode, $bodylen ) = unpack( "C C C C N", $header );
my $body = "";
$fh->read( $body, $bodylen ) or return if $bodylen;
return ( $version, $flags, $streamid, $opcode, $body );
}
=head2 $bytes = build_frame( $version, $flags, $streamid, $opcode, $body )
Returns a byte string containing a complete message with the given fields as
the header and body.
=cut
sub build_frame
{
my ( $version, $flags, $streamid, $opcode, $body ) = @_;
return pack "C C C C N a*", $version, $flags, $streamid, $opcode, length $body, $body;
}
=head2 send_frame( $fh, $version, $flags, $streamid, $opcode, $body )
Sends a complete frame to the given filehandle.
=cut
sub send_frame
{
my $fh = shift;
$fh->print( build_frame( @_ ) );
}
=head2 $consistency = lookup_consistency( $name )
Returns the C<CONSISTENCY_*> value for the given name (without the initial
C<CONSISTENCY_> prefix).
=cut
my %consvals = map { substr($_, 12) => __PACKAGE__->$_ } grep { m/^CONSISTENCY_/ } keys %CONSTANTS;
sub lookup_consistency
{
my ( $name ) = @_;
return $consvals{$name};
}
=head2 $name = typename( $type )
Returns the name of the given C<TYPE_*> value, without the initial C<TYPE_>
prefix.
=cut
my %typevals = map { substr($_, 5) => __PACKAGE__->$_ } grep { m/^TYPE_/ } keys %CONSTANTS;
my %typenames = reverse %typevals;
sub typename
{
my ( $type ) = @_;
return $typenames{$type};
}
=head1 TODO
=over 8
=item *
Reimplement L<Protocol::CassandraCQL::Frame> in XS code for better
performance.
=back
=cut
=head1 SPONSORS
This code was paid for by
=over 2
=item *
Perceptyx L<http://www.perceptyx.com/>
=item *
Shadowcat Systems L<http://www.shadow.cat>
=back
=head1 AUTHOR
Paul Evans <leonerd@leonerd.org.uk>
=cut
0x55AA;