package Net::Amazon::DynamoDB;
=head1 NAME
Net::Amazon::DynamoDB - Simple interface for Amazon DynamoDB
=head1 DESCRIPTION
Simple to use interface for Amazon DynamoDB
If you want an ORM-like interface with real objects to work with, this is implementation is not for you. If you just want to access DynamoDB in a simple/quick manner - you are welcome.
See L<https://github.com/ukautz/Net-Amazon-DynamoDB> for latest release.
=head1 SYNOPSIS
my $ddb = Net::Amazon::DynamoDB->new(
access_key => $my_access_key,
secret_key => $my_secret_key,
tables => {
# table with only hash key
sometable => {
hash_key => 'id',
attributes => {
id => 'N',
name => 'S'
}
},
# table with hash and reange key key
othertable => {
hash_key => 'id',
range_key => 'range_id',
attributes => {
id => 'N',
range_id => 'N',
attrib1 => 'S',
attrib2 => 'S'
}
}
}
);
# create both tables with 10 read and 5 write unites
$ddb->exists_table( $_ ) || $ddb->create_table( $_, 10, 5 )
for qw/ sometable othertable /;
# insert something into tables
$ddb->put_item( sometable => {
id => 5,
name => 'bla'
} ) or die $ddb->error;
$ddb->put_item( sometable => {
id => 5,
range_id => 7,
attrib1 => 'It is now '. localtime(),
attrib2 => 'Or in unix timstamp '. time(),
} ) or die $ddb->error;
=cut
use Moose;
use v5.10;
use version 0.74; our $VERSION = qv( "v0.1.16" );
use Carp qw/ croak /;
use Data::Dumper;
use DateTime::Format::HTTP;
use DateTime::Format::Strptime;
use DateTime;
use Digest::SHA qw/ sha1_hex sha256_hex sha384_hex sha256 hmac_sha256_base64 /;
use HTTP::Request;
use JSON;
use LWP::UserAgent;
use LWP::ConnCache;
use Net::Amazon::AWSSign;
use Time::HiRes qw/ usleep /;
use XML::Simple qw/ XMLin /;
use Encode;
=head1 CLASS ATTRIBUTES
=head2 tables
The table definitions
=cut
has tables => ( isa => 'HashRef[HashRef]', is => 'rw', required => 1, trigger => sub {
my ( $self ) = @_;
# check table
while( my ( $table, $table_ref ) = each %{ $self->{ tables } } ) {
# determine primary keys
my @check_pk = ( 'hash' );
push @check_pk, 'range'
if defined $table_ref->{ range_key };
# check primary keys
foreach my $check_pk( @check_pk ) {
my $key_pk = "${check_pk}_key";
my $name_pk = $table_ref->{ $key_pk };
croak "Missing '$key_pk' attribute in '$table' table definition\n"
unless defined $table_ref->{ $key_pk };
croak "Missing $check_pk key attribute in '$table' table attribute declaration: "
. "{ $table => { attributes => { '$name_pk' => 'S|N' } }\n"
unless defined $table_ref->{ attributes }->{ $name_pk };
croak "Wrong data type for $check_pk key attribute. Got '$table_ref->{ attributes }->{ $name_pk }',"
. " expect 'S' or 'N'"
unless $table_ref->{ attributes }->{ $name_pk } =~ /^(S|N)$/;
}
# check attributes
while( my( $attr_name, $attr_type ) = each %{ $table_ref->{ attributes } } ) {
croak "Wrong data type for attribute '$attr_name' in table '$table': Got '$attr_type' was"
. " expecting 'S' or 'N' or 'SS' or 'NS'"
unless $attr_type =~ /^[NS]S?$/;
}
}
# no need to go further, if no namespace given
return unless $self->namespace;
# update table definitions with namespace
my %new_table = ();
my $updated = 0;
foreach my $table( keys %{ $self->{ tables } } ) {
my $table_updated = index( $table, $self->namespace ) == 0 ? $table : $self->_table_name( $table );
$new_table{ $table_updated } = $self->{ tables }->{ $table };
$updated ++ unless $table_updated eq $table;
}
if ( $updated ) {
$self->{ tables } = \%new_table;
}
} );
=head2 use_keep_alive
Use keep_alive connections to AWS (Uses C<LWP::ConnCache> experimental mechanism). 0 to disable, positive number sets value for C<LWP::UserAgent> attribute 'keep_alive'
Default: 0
=cut
has use_keep_alive => ( isa => 'Int', is => 'rw', default => 0 );
=head2 lwp
Contains C<LWP::UserAgent> instance.
=cut
has lwp => ( isa => 'LWP::UserAgent', is => 'rw', lazy => 1, default => sub { my ($self) = @_; LWP::UserAgent->new( timeout => 5, keep_alive => $self->use_keep_alive ) } );
has _lwpcache => ( isa => 'LWP::ConnCache', is => 'ro', lazy => 1, default => sub { my ($self) = @_; $self->lwp->conn_cache(); } );
=head2 json
Contains C<JSON> instance for decoding/encoding json.
JSON object needs to support: canonical, allow_nonref and utf8
=cut
has json => ( isa => 'JSON', is => 'rw', default => sub { JSON->new()->canonical( 1 )->allow_nonref( 1 )->utf8( 1 ) }, trigger => sub {
shift->json->canonical( 1 )->allow_nonref( 1 )->utf8( 1 );
} );
=head2 host
DynamoDB API Hostname
Default: dynamodb.us-east-1.amazonaws.com
=cut
has host => ( isa => 'Str', is => 'rw', default => 'dynamodb.us-east-1.amazonaws.com' );
=head2 access_key
AWS API access key
Required!
=cut
has access_key => ( isa => 'Str', is => 'rw', required => 1 );
=head2 secret_key
AWS API secret key
Required!
=cut
has secret_key => ( isa => 'Str', is => 'rw', required => 1 );
=head2 api_version
AWS API Version. Use format "YYYYMMDD"
Default: 20111205
=cut
has api_version => ( isa => 'Str', is => 'rw', default => '20111205' );
=head2 read_consistent
Whether reads (get_item, batch_get_item) consistent per default or not. This does not affect scan_items or query_items, which are always eventually consistent.
Default: 0 (eventually consistent)
=cut
has read_consistent => ( isa => 'Bool', is => 'rw', default => 0 );
=head2 namespace
Table prefix, prepended before table name on usage
Default: ''
=cut
has namespace => ( isa => 'Str', is => 'ro', default => '' );
=head2 raise_error
Whether database errors (eg 4xx Response from DynamoDB) raise errors or not.
Default: 0
=cut
has raise_error => ( isa => 'Bool', is => 'rw', default => 0 );
=head2 max_retries
Amount of retries a query will be tries if ProvisionedThroughputExceededException is raised until final error.
Default: 0 (do only once, no retries)
=cut
has max_retries => ( isa => 'Int', is => 'rw', default => 1 );
=head2 derive_table
Whether we parse results using table definition (faster) or without a known definition (still requires table definition for indexes)
Default: 0
=cut
has derive_table => ( isa => 'Bool', is => 'rw', default => 0 );
=head2 retry_timeout
Wait period in seconds between tries. Float allowed.
Default: 0.1 (100ms)
=cut
has retry_timeout => ( isa => 'Num', is => 'rw', default => 0.1 );
=head2 cache
Cache object using L<Cache> interface, eg L<Cache::File> or L<Cache::Memcached>
If set, caching is used for get_item, put_item, update_item and batch_get_item.
Default: -
=cut
has cache => ( isa => 'Cache', is => 'rw', predicate => 'has_cache' );
=head2 cache_disabled
If cache is set, you still can disable it per default and enable it per operation with "use_cache" option (see method documentation)
This way you have a default no-cache policy, but still can use cache in choosen operations.
Default: 0
=cut
has cache_disabled => ( isa => 'Bool', is => 'rw', default => 0 );
=head2 cache_key_method
Which one to use. Either sha1_hex, sha256_hex, sha384_hex or coderef
Default: sha1_hex
=cut
has cache_key_method => ( is => 'rw', default => sub { \&Digest::SHA::sha1_hex }, trigger => sub {
my ( $self, $method ) = @_;
if ( ( ref( $method ) ) ne 'CODE' ) {
if ( $method eq 'sha1_hex' ) {
$self->{ cache_key_method } = \&Digest::SHA::sha1_hex();
}
elsif ( $method eq 'sha256_hex' ) {
$self->{ cache_key_method } = \&Digest::SHA::sha256_hex();
}
elsif ( $method eq 'sha384_hex' ) {
$self->{ cache_key_method } = \&Digest::SHA::sha384_hex();
}
}
} );
#
# _aws_signer
# Contains C<Net::Amazon::AWSSign> instance.
#
has _aws_signer => ( isa => 'Net::Amazon::AWSSign', is => 'rw', predicate => '_has_aws_signer' );
#
# _security_token_url
# URL for receiving security token
#
has _security_token_url => ( isa => 'Str', is => 'rw', default => 'https://sts.amazonaws.com/?Action=GetSessionToken&Version=2011-06-15' );
#
# _credentials
# Contains credentials received by GetSession
#
has _credentials => ( isa => 'HashRef[Str]', is => 'rw', predicate => '_has_credentials' );
#
# _credentials_expire
# Time of credentials exiration
#
has _credentials_expire => ( isa => 'DateTime', is => 'rw' );
#
# _error
# Contains credentials received by GetSession
#
has _error => ( isa => 'Str', is => 'rw', predicate => '_has_error' );
=head1 METHODS
=head2 create_table $table_name, $read_amount, $write_amount
Create a new Table. Returns description of the table
my $desc_ref = $ddb->create_table( 'table_name', 10, 5 )
$desc_ref = {
count => 123, # amount of "rows"
status => 'CREATING', # or 'ACTIVE' or 'UPDATING' or some error state?
created => 1328893776, # timestamp
read_amount => 10, # amount of read units
write_amount => 5, # amount of write units
hash_key => 'id', # name of the hash key attribute
hash_key_type => 'S', # or 'N',
#range_key => 'id', # name of the hash key attribute (optional)
#range_key_type => 'S', # or 'N' (optional)
}
=cut
sub create_table {
my ( $self, $table, $read_amount, $write_amount ) = @_;
$table = $self->_table_name( $table );
$read_amount ||= 10;
$write_amount ||= 5;
# check & get table definition
my $table_ref = $self->_check_table( "create_table", $table );
# init create definition
my %create = (
TableName => $table,
ProvisionedThroughput => {
ReadCapacityUnits => $read_amount + 0,
WriteCapacityUnits => $write_amount + 0,
}
);
# build keys
$create{ KeySchema } = {
HashKeyElement => {
AttributeName => $table_ref->{ hash_key },
AttributeType => $table_ref->{ attributes }->{ $table_ref->{ hash_key } }
}
};
if ( defined $table_ref->{ range_key } ) {
$create{ KeySchema }->{ RangeKeyElement } = {
AttributeName => $table_ref->{ range_key },
AttributeType => $table_ref->{ attributes }->{ $table_ref->{ range_key } }
};
}
# perform create
my ( $res, $res_ok, $json_ref ) = $self->request( CreateTable => \%create );
# got res
if ( $res_ok && defined $json_ref->{ TableDescription } ) {
return {
status => $json_ref->{ TableDescription }->{ TableStatus },
created => int( $json_ref->{ TableDescription }->{ CreationDateTime } ),
read_amount => $json_ref->{ TableDescription }->{ ProvisionedThroughput }->{ ReadCapacityUnits },
write_amount => $json_ref->{ TableDescription }->{ ProvisionedThroughput }->{ WriteCapacityUnits },
hash_key => $json_ref->{ Table }->{ KeySchema }->{ HashKeyElement }->{ AttributeName },
hash_key_type => $json_ref->{ Table }->{ KeySchema }->{ HashKeyElement }->{ AttributeType },
( defined $json_ref->{ Table }->{ KeySchema }->{ RangeKeyElement }
? (
range_key => $json_ref->{ Table }->{ KeySchema }->{ RangeKeyElement }->{ AttributeName },
range_key_type => $json_ref->{ Table }->{ KeySchema }->{ RangeKeyElement }->{ AttributeType },
)
: ()
),
}
}
# set error
$self->error( 'create_table failed: '. $self->_extract_error_message( $res ) );
return ;
}
=head2 delete_table $table
Delete an existing (and defined) table.
Returns bool whether table is now in deleting state (succesfully performed)
=cut
sub delete_table {
my ( $self, $table ) = @_;
$table = $self->_table_name( $table );
# check & get table definition
my $table_ref = $self->_check_table( delete_table => $table );
# perform create
my ( $res, $res_ok, $json_ref ) = $self->request( DeleteTable => { TableName => $table } );
# got result
if ( $res_ok && defined $json_ref->{ TableDescription } ) {
return $json_ref->{ TableDescription }->{ TableStatus } eq 'DELETING';
}
# set error
$self->error( 'delete_table failed: '. $self->_extract_error_message( $res ) );
return ;
}
=head2 describe_table $table
Returns table information
my $desc_ref = $ddb->describe_table( 'my_table' );
$desc_ref = {
existing => 1,
size => 123213, # data size in bytes
count => 123, # amount of "rows"
status => 'ACTIVE', # or 'DELETING' or 'CREATING' or 'UPDATING' or some error state
created => 1328893776, # timestamp
read_amount => 10, # amount of read units
write_amount => 5, # amount of write units
hash_key => 'id', # name of the hash key attribute
hash_key_type => 'S', # or 'N',
#range_key => 'id', # name of the hash key attribute (optional)
#range_key_type => 'S', # or 'N' (optional)
}
If no such table exists, return is
{
existing => 0
}
=cut
sub describe_table {
my ( $self, $table ) = @_;
$table = $self->_table_name( $table );
# check table definition
$self->_check_table( "describe_table", $table );
my ( $res, $res_ok, $json_ref ) = $self->request( DescribeTable => { TableName => $table } );
# got result
if ( $res_ok ) {
if ( defined $json_ref->{ Table } ) {
no warnings 'uninitialized';
return {
existing => 1,
size => $json_ref->{ Table }->{ TableSizeBytes },
count => $json_ref->{ Table }->{ ItemCount },
status => $json_ref->{ Table }->{ TableStatus },
created => int( $json_ref->{ Table }->{ CreationDateTime } ),
read_amount => $json_ref->{ Table }->{ ProvisionedThroughput }->{ ReadCapacityUnits },
write_amount => $json_ref->{ Table }->{ ProvisionedThroughput }->{ WriteCapacityUnits },
hash_key => $json_ref->{ Table }->{ KeySchema }->{ HashKeyElement }->{ AttributeName },
hash_key_type => $json_ref->{ Table }->{ KeySchema }->{ HashKeyElement }->{ AttributeType },
( defined $json_ref->{ Table }->{ KeySchema }->{ RangeKeyElement }
? (
range_key => $json_ref->{ Table }->{ KeySchema }->{ RangeKeyElement }->{ AttributeName },
range_key_type => $json_ref->{ Table }->{ KeySchema }->{ RangeKeyElement }->{ AttributeType },
)
: ()
),
};
}
else {
return {
existing => 0
}
}
}
# set error
$self->error( 'describe_table failed: '. $self->_extract_error_message( $res ) );
return ;
}
=head2 update_table $table, $read_amount, $write_amount
Update read and write amount for a table
=cut
sub update_table {
my ( $self, $table, $read_amount, $write_amount ) = @_;
$table = $self->_table_name( $table );
my ( $res, $res_ok, $json_ref ) = $self->request( UpdateTable => {
TableName => $table,
ProvisionedThroughput => {
ReadCapacityUnits => $read_amount + 0,
WriteCapacityUnits => $write_amount + 0,
}
} );
if ( $res_ok ) {
return 1;
}
# set error
$self->error( 'update_table failed: '. $self->_extract_error_message( $res ) );
return ;
}
=head2 exists_table $table
Returns bool whether table exists or not
=cut
sub exists_table {
my ( $self, $table ) = @_;
$table = $self->_table_name( $table );
# check table definition
$self->_check_table( "exists_table", $table );
my ( $res, $res_ok, $json_ref );
eval {
( $res, $res_ok, $json_ref ) = $self->request( DescribeTable => { TableName => $table } );
};
return defined $json_ref->{ Table } && defined $json_ref->{ Table }->{ ItemCount } ? 1 : 0
if $res_ok;
# set error
return 0;
}
=head2 list_tables
Returns tables names as arrayref (or array in array context)
=cut
sub list_tables {
my ( $self ) = @_;
my ( $res, $res_ok, $json_ref ) = $self->request( ListTables => {} );
if ( $res_ok ) {
my $ns_length = length( $self->namespace );
my @table_names = map {
substr( $_, $ns_length );
} grep {
! $self->namespace || index( $_, $self->namespace ) == 0
} @{ $json_ref->{ TableNames } };
return wantarray ? @table_names : \@table_names;
}
# set error
$self->error( 'list_tables failed: '. $self->_extract_error_message( $res ) );
return ;
}
=head2 put_item $table, $item_ref, [$where_ref], [$args_ref]
Write a single item to table. All primary keys are required in new item.
# just write
$ddb->put_item( my_table => {
id => 123,
some_attrib => 'bla',
other_attrib => 'dunno'
} );
# write conditionally
$ddb->put_item( my_table => {
id => 123,
some_attrib => 'bla',
other_attrib => 'dunno'
}, {
some_attrib => { # only update, if some_attrib has the value 'blub'
value => 'blub'
},
other_attrib => { # only update, if a value for other_attrib exists
exists => 1
}
} );
=over
=item * $table
Name of the table
=item * $item_ref
Hashref containing the values to be inserted
=item * $where_ref [optional]
Filter containing expected values of the (existing) item to be updated
=item * $args_ref [optional]
HashRef with options
=over
=item * return_old
If true, returns old value
=item * no_cache
Force not using cache, if enabled per default
=item * use_cache
Force using cache, if disabled per default but setupped
=back
=back
=cut
sub put_item {
my ( $self, $table, $item_ref, $where_ref, $args_ref ) = @_;
$args_ref ||= {
return_old => 0,
no_cache => 0,
use_cache => 0,
max_retries => undef
};
$table = $self->_table_name( $table );
# check definition
my $table_ref = $self->_check_table( "put_item", $table );
# check primary keys
croak "put_item: Missing value for hash key '$table_ref->{ hash_key }'"
unless defined $item_ref->{ $table_ref->{ hash_key } }
&& length( $item_ref->{ $table_ref->{ hash_key } } );
# check other attributes
$self->_check_keys( "put_item: item values", $table, $item_ref );
# having where -> check now
$self->_check_keys( "put_item: where clause", $table, $where_ref ) if $where_ref;
# build put
my %put = (
TableName => $table,
Item => {}
);
# build the item
foreach my $key( keys %$item_ref ){
my $type = $self->_attrib_type( $table, $key );
my $value;
if ( $type eq 'SS' || $type eq 'NS' ) {
my @values = map { $_. '' } ( ref( $item_ref->{ $key } ) ? @{ $item_ref->{ $key } } : () );
$value = \@values;
}
else {
$value = $item_ref->{ $key } .'';
}
$put{ Item }->{ $key } = { $type => $value };
}
# build possible where clause
if ( $where_ref ) {
$self->_build_attrib_filter( $table, $where_ref, $put{ Expected } = {} );
}
# add return value, if set
$put{ ReturnValues } = 'ALL_OLD' if $args_ref->{ return_old };
# perform create
my ( $res, $res_ok, $json_ref ) = $self->request( PutItem => \%put, {
max_retries => $args_ref->{ max_retries },
} );
# get result
if ( $res_ok ) {
# clear cache
if ( $self->_cache_enabled( $args_ref ) ) {
my $cache_key = $self->_cache_key_single( $table, $item_ref );
$self->cache->remove( $cache_key );
}
if ( $args_ref->{ return_old } ) {
return defined $json_ref->{ Attributes }
? $self->_format_item( $table, $json_ref->{ Attributes } )
: undef;
}
else {
return $json_ref->{ ConsumedCapacityUnits } > 0;
}
}
# set error
$self->error( 'put_item failed: '. $self->_extract_error_message( $res ) );
return ;
}
=head2 batch_write_item $tables_ref, [$args_ref]
Batch put / delete items into one ore more tables.
Caution: Each batch put / delete cannot process more operations than you have write capacity for the table.
Example:
my ( $ok, $unprocessed_count, $next_query_ref ) = $ddb->batch_write_item( {
table_name => {
put => [
{
attrib1 => "Value 1",
attrib2 => "Value 2",
},
# { .. } ..
],
delete => [
{
hash_key => "Hash Key Value",
range_key => "Range Key Value",
},
# { .. } ..
]
},
# table2_name => ..
} );
if ( $ok ) {
if ( $unprocessed_count ) {
print "Ok, but $unprocessed_count still not processed\n";
$ddb->batch_write_item( $next_query_ref );
}
else {
print "All processed\n";
}
}
=over
=item $tables_ref
HashRef in the form
{ table_name => { put => [ { attribs }, .. ], delete => [ { primary keys } ] } }
=item $args_ref
HashRef
=over
=item * process_all
Keep processing everything which is returned as unprocessed (if you send more operations than your
table has write capability or you surpass the max amount of operations OR max size of request (see AWS API docu)).
Caution: Error handling
Default: 0
=back
=back
=cut
sub batch_write_item {
my ( $self, $tables_ref, $args_ref ) = @_;
$args_ref ||= {
process_all => 0,
max_retries => undef
};
# check definition
my %table_map;
foreach my $table( keys %$tables_ref ) {
$table = $self->_table_name( $table );
my $table_ref = $self->_check_table( "batch_write_item", $table );
$table_map{ $table } = $table_ref;
}
my %write = ( RequestItems => {} );
foreach my $table( keys %table_map ) {
my $table_out = $self->_table_name( $table, 1 );
my $t_ref = $tables_ref->{ $table_out };
my $table_requests_ref = $write{ RequestItems }->{ $table } = [];
foreach my $operation( qw/ put delete / ) {
next unless defined $t_ref->{ $operation };
my @operations = ref( $t_ref->{ $operation } ) eq 'ARRAY'
? @{ $t_ref->{ $operation } }
: ( $t_ref->{ $operation } );
# put ..
if ( $operation eq 'put' ) {
foreach my $put_ref( @operations ) {
push @$table_requests_ref, { 'PutRequest' => { Item => my $request_ref = {} } };
# build the item
foreach my $key( keys %$put_ref ){
my $type = $self->_attrib_type( $table, $key );
my $value;
if ( $type eq 'SS' || $type eq 'NS' ) {
my @values = map { $_. '' } ( ref( $put_ref->{ $key } ) ? @{ $put_ref->{ $key } } : () );
$value = \@values;
}
else {
$value = $put_ref->{ $key } .'';
}
$request_ref->{ $key } = { $type => $value };
}
}
}
# delete ..
else {
foreach my $delete_ref( @operations ) {
push @$table_requests_ref, { 'DeleteRequest' => { Key => my $request_ref = {} } };
$self->_build_pk_filter( $table, $delete_ref, $request_ref );
}
}
}
}
# perform create
my ( $res, $res_ok, $json_ref ) = $self->request( BatchWriteItem => \%write, {
max_retries => $args_ref->{ max_retries },
} );
# having more to process
while ( $args_ref->{ process_all }
&& $res_ok
&& defined $json_ref->{ UnprocessedItems }
&& scalar( keys %{ $json_ref->{ UnprocessedItems } } )
) {
( $res, $res_ok, $json_ref ) = $self->request( BatchWriteItem => {
RequestItems => $json_ref->{ UnprocessedItems }
}, {
max_retries => $args_ref->{ max_retries },
} );
}
# count unprocessed
my $unprocessed_count = 0;
my %next_query;
if ( $res_ok && defined $json_ref->{ UnprocessedItems } ) {
foreach my $table( keys %{ $json_ref->{ UnprocessedItems } } ) {
my @operations = @{ $json_ref->{ UnprocessedItems }->{ $table } };
next unless @operations;
$unprocessed_count += scalar( @operations );
$next_query{ $table } = {};
foreach my $operation_ref( @operations ) {
my ( $item_ref, $operation_name ) = defined $operation_ref->{ PutRequest }
? ( $operation_ref->{ PutRequest }->{ Item }, 'put' )
: ( $operation_ref->{ DeleteRequest }->{ Key }, 'delete' );
#print Dumper( [ $operation_ref, $operation_name, $item_ref ] );
push @{ $next_query{ $table }->{ $operation_name } ||= [] },
$self->_format_item( $table, $item_ref )
}
}
}
return wantarray ? ( $res_ok, $unprocessed_count, \%next_query ) : $res_ok;
}
=head2 update_item $table, $update_ref, $where_ref, [$args_ref]
Update existing item in database. All primary keys are required in where clause
# update existing
$ddb->update_item( my_table => {
some_attrib => 'bla',
other_attrib => 'dunno'
}, {
id => 123,
} );
# write conditionally
$ddb->update_item( my_table => {
some_attrib => 'bla',
other_attrib => 'dunno'
}, {
id => 123,
some_attrib => { # only update, if some_attrib has the value 'blub'
value => 'blub'
},
other_attrib => { # only update, if a value for other_attrib exists
exists => 1
}
} );
=over
=item * $table
Name of the table
=item * $update_ref
Hashref containing the updates.
=over
=item * delete a single values
{ attribname => undef }
=item * replace a values
{
attribname1 => 'somevalue',
attribname2 => [ 1, 2, 3 ]
}
=item * add values (arrays only)
{ attribname => \[ 4, 5, 6 ] }
=back
=item * $where_ref [optional]
Filter HashRef
=item * $args_ref [optional]
HashRef of options
=over
=item * return_mode
Can be set to on of "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"
=item * no_cache
Force not using cache, if enabled per default
=item * use_cache
Force using cache, if disabled per default but setupped
=back
=back
=cut
sub update_item {
my ( $self, $table, $update_ref, $where_ref, $args_ref ) = @_;
$args_ref ||= {
return_mode => '',
no_cache => 0,
use_cache => 0,
max_retries => undef
};
$table = $self->_table_name( $table );
# check definition
my $table_ref = $self->_check_table( "update_item", $table );
croak "update_item: Cannot update hash key value, do not set it in update-clause"
if defined $update_ref->{ $table_ref->{ hash_key } };
croak "update_item: Cannot update range key value, do not set it in update-clause"
if defined $table_ref->{ range_key }
&& defined $update_ref->{ $table_ref->{ range_key } };
# check primary keys
croak "update_item: Missing value for hash key '$table_ref->{ hash_key }' in where-clause"
unless defined $where_ref->{ $table_ref->{ hash_key } }
&& length( $where_ref->{ $table_ref->{ hash_key } } );
croak "update_item: Missing value for range key '$table_ref->{ hash_key }' in where-clause"
if defined $table_ref->{ range_key } && !(
defined $where_ref->{ $table_ref->{ range_key } }
&& length( $where_ref->{ $table_ref->{ range_key } } )
);
# check other attributes
$self->_check_keys( "update_item: item values", $table, $update_ref );
croak "update_item: Cannot update hash key '$table_ref->{ hash_key }'. You have to delete and put the item!"
if defined $update_ref->{ $table_ref->{ hash_key } };
croak "update_item: Cannot update range key '$table_ref->{ hash_key }'. You have to delete and put the item!"
if defined $table_ref->{ range_key } && defined $update_ref->{ $table_ref->{ range_key } };
# having where -> check now
$self->_check_keys( "update_item: where clause", $table, $where_ref );
# build put
my %update = (
TableName => $table,
AttributeUpdates => {},
Key => {}
);
# build the item
foreach my $key( keys %$update_ref ) {
my $type = $self->_attrib_type( $table, $key );
my $value = $update_ref->{ $key };
# delete
if ( ! defined $value ) {
$update{ AttributeUpdates }->{ $key } = {
Action => 'DELETE'
};
}
# if ++N or --N on numeric type, ADD to get inc/dec behavior
elsif ( $type eq 'N' && $value =~ /^(--|\+\+)(\d+)$/ ) {
$update{ AttributeUpdates }->{ $key } = {
Value => { $type => ($1 eq '--') ? "-$2" : "$2" },
Action => 'ADD'
};
}
# replace for scalar
elsif ( $type eq 'N' || $type eq 'S' ) {
$update{ AttributeUpdates }->{ $key } = {
Value => { $type => $value. '' },
Action => 'PUT'
};
}
# replace or add for array types
elsif ( $type =~ /^[NS]S$/ ) {
# add \[ qw/ value1 value2 / ]
if ( ref( $value ) eq 'REF' ) {
$update{ AttributeUpdates }->{ $key } = {
Value => { $type => [ map { "$_" } @$$value ] },
Action => 'ADD'
};
}
# replace [ qw/ value1 value2 / ]
else {
$update{ AttributeUpdates }->{ $key } = {
Value => { $type => [ map { "$_" } @$value ] },
Action => 'PUT'
};
}
}
}
# build possible where clause
my %where = %$where_ref;
# primary key
$self->_build_pk_filter( $table, \%where, $update{ Key } );
# additional filters
if ( keys %where ) {
$self->_build_attrib_filter( $table, \%where, $update{ Expected } = {} );
}
# add return value, if set
if ( $args_ref->{ return_mode } ) {
$update{ ReturnValues } = "$args_ref->{ return_mode }" =~ /^(?:ALL_OLD|UPDATED_OLD|ALL_NEW|UPDATED_NEW)$/i
? uc( $args_ref->{ return_mode } )
: "ALL_OLD";
}
# perform create
my ( $res, $res_ok, $json_ref ) = $self->request( UpdateItem => \%update, {
max_retries => $args_ref->{ max_retries },
} );
# get result
if ( $res_ok ) {
# clear cache
if ( $self->_cache_enabled( $args_ref ) ) {
my $cache_key = $self->_cache_key_single( $table, $where_ref );
$self->cache->remove( $cache_key );
}
if ( $args_ref->{ return_mode } ) {
return defined $json_ref->{ Attributes }
? $self->_format_item( $table, $json_ref->{ Attributes } )
: undef;
}
else {
return $json_ref->{ ConsumedCapacityUnits } > 0;
}
}
# set error
$self->error( 'put_item failed: '. $self->_extract_error_message( $res ) );
return ;
}
=head2 get_item $table, $pk_ref, [$args_ref]
Read a single item by hash (and range) key.
# only with hash key
my $item1 = $ddb->get_item( my_table => { id => 123 } );
print "Got $item1->{ some_key }\n";
# with hash and range key, also consistent read and only certain attributes in return
my $item2 = $ddb->get_item( my_other_table =>, {
id => $hash_value, # the hash value
title => $range_value # the range value
}, {
consistent => 1,
attributes => [ qw/ attrib1 attrib2 ]
} );
print "Got $item2->{ attrib1 }\n";
=over
=item * $table
Name of the table
=item * $pk_ref
HashRef containing all primary keys
# only hash key
{
$hash_key => $hash_value
}
# hash and range key
{
$hash_key => $hash_value,
$range_key => $range_value
}
=item * $args_ref [optional]
HashRef of options
=over
=item * consistent
Whether read shall be consistent. If set to 0 and read_consistent is globally enabled, this read will not be consistent
=item * attributes
ArrayRef of attributes to read. If not set, all attributes are returned.
=item * no_cache
Force not using cache, if enabled per default
=item * use_cache
Force using cache, if disabled per default but setupped
=back
=back
=cut
sub get_item {
my ( $self, $table, $pk_ref, $args_ref ) = @_;
$table = $self->_table_name( $table );
$args_ref ||= {
consistent => undef,
attributes => undef,
no_cache => 0,
use_cache => 0,
max_retries => undef
};
$args_ref->{ consistent } //= $self->read_consistent;
# check definition
my $table_ref = $self->_check_table( "get_item", $table );
# check primary keys
croak "get_item: Missing value for hash key '$table_ref->{ hash_key }'"
unless defined $pk_ref->{ $table_ref->{ hash_key } }
&& length( $pk_ref->{ $table_ref->{ hash_key } } );
croak "get_item: Missing value for Range Key '$table_ref->{ range_key }'"
if defined $table_ref->{ range_key } && !(
defined $pk_ref->{ $table_ref->{ range_key } }
&& length( $pk_ref->{ $table_ref->{ hash_key } } )
);
# use cache
my $use_cache = $self->_cache_enabled( $args_ref );
my $cache_key;
if ( $use_cache ) {
$cache_key = $self->_cache_key_single( $table, $pk_ref );
my $cached = $self->cache->thaw( $cache_key );
return $cached if defined $cached;
}
# build get
my %get = (
TableName => $table,
( defined $args_ref->{ attributes } ? ( AttributesToGet => $args_ref->{ attributes } ) : () ),
ConsistentRead => $args_ref->{ consistent } ? \1 : \0,
Key => {
HashKeyElement => {
$self->_attrib_type( $table, $table_ref->{ hash_key } ) =>
$pk_ref->{ $table_ref->{ hash_key } }
}
}
);
# add range key ?
if ( defined $table_ref->{ range_key } ) {
$get{ Key }->{ RangeKeyElement } = {
$self->_attrib_type( $table, $table_ref->{ range_key } ) =>
$pk_ref->{ $table_ref->{ range_key } }
};
}
# perform create
my ( $res, $res_ok, $json_ref ) = $self->request( GetItem => \%get, {
max_retries => $args_ref->{ max_retries },
} );
# return on success
my $item_ref = $self->_format_item( $table, $json_ref->{ Item } ) if $res_ok && defined $json_ref->{ Item };
if ( $use_cache ) {
$self->cache->freeze( $cache_key, $item_ref ) if $item_ref;
}
return $item_ref;
# return on success, but nothing received
return undef if $res_ok;
# set error
$self->error( 'get_item failed: '. $self->_extract_error_message( $res ) );
return ;
}
=head2 batch_get_item $tables_ref, [$args_ref]
Read multiple items (possible accross multiple tables) identified by their hash and range key (if required).
my $res = $ddb->batch_get_item( {
table_name => [
{ $hash_key => $value1 },
{ $hash_key => $value2 },
{ $hash_key => $value3 },
],
other_table_name => {
keys => [
{ $hash_key => $value1, $range_key => $rvalue1 },
{ $hash_key => $value2, $range_key => $rvalue2 },
{ $hash_key => $value3, $range_key => $rvalue3 },
],
attributes => [ qw/ attrib1 attrib2 / ]
]
} );
foreach my $table( keys %$res ) {
foreach my $item( @{ $res->{ $table } } ) {
print "$item->{ some_attrib }\n";
}
}
=over
=item $tables_ref
HashRef of tablename => primary key ArrayRef
=item $args_ref
HashRef
=over
=item * process_all
Batch request might not fetch all requested items at once. This switch enforces
to batch get the unprocessed items.
Default: 0
=back
=back
=cut
sub batch_get_item {
my ( $self, $tables_ref, $args_ref ) = @_;
$args_ref ||= {
max_retries => undef,
process_all => undef,
consistent => undef
};
$args_ref->{ consistent } //= $self->read_consistent();
# check definition
my %table_map;
foreach my $table( keys %$tables_ref ) {
$table = $self->_table_name( $table );
my $table_ref = $self->_check_table( "batch_get_item", $table );
$table_map{ $table } = $table_ref;
}
my %get = ( RequestItems => {} );
foreach my $table( keys %table_map ) {
my $table_out = $self->_table_name( $table, 1 );
my $t_ref = $tables_ref->{ $table_out };
# init items for table
$get{ RequestItems }->{ $table } = {};
# init / get keys
my $k_ref = $get{ RequestItems }->{ $table }->{ Keys } = [];
my @keys = ref( $t_ref ) eq 'ARRAY'
? @$t_ref
: @{ $t_ref->{ keys } };
# get mapping for table
my $m_ref = $table_map{ $table };
# get hash key
my $hash_key = $m_ref->{ hash_key };
my $hash_key_type = $self->_attrib_type( $table, $hash_key );
# get range key?
my ( $range_key, $range_key_type );
if ( defined $m_ref->{ range_key } ) {
$range_key = $m_ref->{ range_key };
$range_key_type = $self->_attrib_type( $table, $range_key );
}
# build request items
foreach my $key_ref( @keys ) {
push @$k_ref, {
HashKeyElement => { $hash_key_type => $key_ref->{ $hash_key }. '' },
( defined $range_key ? ( RangeKeyElement => { $range_key_type => $key_ref->{ $range_key }. '' } ) : () )
};
}
# having attributes limitation?
if ( ref( $t_ref ) eq 'HASH' && defined $t_ref->{ attributes } ) {
$get{ RequestItems }->{ $table }->{ AttributesToGet } = $t_ref->{ attributes };
}
# using consistent read?
if ( $args_ref->{ consistent } ) {
$get{ RequestItems }->{ $table }->{ ConsistentRead } = \1;
}
}
# perform create
my ( $res, $res_ok, $json_ref ) = $self->request( BatchGetItem => \%get, {
max_retries => $args_ref->{ max_retries },
} );
# return on success
if ( $res_ok && defined $json_ref->{ Responses } ) {
if ( $args_ref->{ process_all } && defined( my $ukeys_ref = $json_ref->{ UnprocessedKeys } ) ) {
while ( $ukeys_ref ) {
( $res, $res_ok, my $ujson_ref ) = $self->request( BatchGetItem =>
{
RequestItems => $ukeys_ref
}, {
max_retries => $args_ref->{ max_retries },
} );
if ( $res_ok && defined $ujson_ref->{ Responses } ) {
foreach my $table_out( keys %$tables_ref ) {
my $table = $self->_table_name( $table_out );
if ( defined $ujson_ref->{ Responses }->{ $table } && defined $ujson_ref->{ Responses }->{ $table }->{ Items } ) {
$json_ref->{ Responses }->{ $table } ||= {};
push @{ $json_ref->{ Responses }->{ $table }->{ Items } ||= [] },
@{ $ujson_ref->{ Responses }->{ $table }->{ Items } };
}
}
}
$ukeys_ref = $res_ok && defined $ujson_ref->{ UnprocessedKeys }
? $ujson_ref->{ UnprocessedKeys }
: undef;
}
}
my %res;
foreach my $table_out( keys %$tables_ref ) {
my $table = $self->_table_name( $table_out );
next unless defined $json_ref->{ Responses }->{ $table } && defined $json_ref->{ Responses }->{ $table }->{ Items };
my $items_ref = $json_ref->{ Responses }->{ $table };
$res{ $table_out } = [];
foreach my $item_ref( @{ $items_ref->{ Items } } ) {
my %res_item;
foreach my $attrib( keys %$item_ref ) {
my $type = $self->_attrib_type( $table, $attrib );
$res_item{ $attrib } = $item_ref->{ $attrib }->{ $type };
}
push @{ $res{ $table_out } }, \%res_item;
}
}
return \%res;
}
# set error
$self->error( 'batch_get_item failed: '. $self->_extract_error_message( $res ) );
return ;
}
=head2 delete_item $table, $where_ref, [$args_ref]
Deletes a single item by primary key (hash or hash+range key).
# only with hash key
=over
=item * $table
Name of the table
=item * $where_ref
HashRef containing at least primary key. Can also contain additional attribute filters
=item * $args_ref [optional]
HashRef containing options
=over
=item * return_old
Bool whether return old, just deleted item or not
Default: 0
=item * no_cache
Force not using cache, if enabled per default
=item * use_cache
Force using cache, if disabled per default but setupped
=back
=back
=cut
sub delete_item {
my ( $self, $table, $where_ref, $args_ref ) = @_;
$args_ref ||= {
return_old => 0,
no_cache => 0,
use_cache => 0,
max_retries => undef
};
$table = $self->_table_name( $table );
# check definition
my $table_ref = $self->_check_table( "delete_item", $table );
# check primary keys
croak "delete_item: Missing value for hash key '$table_ref->{ hash_key }'"
unless defined $where_ref->{ $table_ref->{ hash_key } }
&& length( $where_ref->{ $table_ref->{ hash_key } } );
croak "delete_item: Missing value for Range Key '$table_ref->{ range_key }'"
if defined $table_ref->{ range_key } && ! (
defined $where_ref->{ $table_ref->{ range_key } }
&& length( $where_ref->{ $table_ref->{ range_key } } )
);
# check other attributes
$self->_check_keys( "delete_item: where-clause", $table, $where_ref );
# build delete
my %delete = (
TableName => $table,
Key => {},
( $args_ref->{ return_old } ? ( ReturnValues => 'ALL_OLD' ) : () )
);
# setup pk
my %where = %$where_ref;
# for hash key
my $hash_value = delete $where{ $table_ref->{ hash_key } };
$delete{ Key }->{ HashKeyElement } = {
$self->_attrib_type( $table, $table_ref->{ hash_key } ) => $hash_value
};
# for range key
if ( defined $table_ref->{ range_key } ) {
my $range_value = delete $where{ $table_ref->{ range_key } };
$delete{ Key }->{ RangeKeyElement } = {
$self->_attrib_type( $table, $table_ref->{ range_key } ) => $range_value
};
}
# build filter for other attribs
if ( keys %where ) {
$self->_build_attrib_filter( $table, \%where, $delete{ Expected } = {} );
}
# perform create
my ( $res, $res_ok, $json_ref ) = $self->request( DeleteItem => \%delete, {
max_retries => $args_ref->{ max_retries },
} );
if ( $res_ok ) {
# use cache
if ( $self->_cache_enabled( $args_ref ) ) {
my $cache_key = $self->_cache_key_single( $table, $where_ref );
$self->cache->remove( $cache_key );
}
if ( defined $json_ref->{ Attributes } ) {
my %res;
foreach my $attrib( $self->_attribs( $table ) ) {
next unless defined $json_ref->{ Attributes }->{ $attrib };
$res{ $attrib } = $json_ref->{ Attributes }->{ $attrib }->{ $self->_attrib_type( $table, $attrib ) };
}
return \%res;
}
return {};
}
$self->error( 'delete_item failed: '. $self->_extract_error_message( $res ) );
return;
}
=head2 query_items $table, $where, $args
Search in a table with hash AND range key.
my ( $count, $items_ref, $next_start_keys_ref )
= $ddb->qyery_items( some_table => { id => 123, my_range_id => { GT => 5 } } );
print "Found $count items, where last id is ". $items_ref->[-1]->{ id }. "\n";
# iterate through al all "pages"
my $next_start_keys_ref;
do {
( my $count, my $items_ref, $next_start_keys_ref )
= $ddb->qyery_items( some_table => { id => 123, my_range_id => { GT => 5 } }, {
start_key => $next_start_keys_ref
} );
} while( $next_start_keys_ref );
=over
=item * $table
Name of the table
=item * $where
Search condition. Has to contain a value of the primary key and a search-value for the range key.
Search-value for range key can be formated in two ways
=over
=item * Scalar
Eg
{ $range_key_name => 123 }
Performs and EQ (equal) search
=item * HASHREF
Eg
{ $range_key_name => { GT => 1 } }
{ $range_key_name => { CONTAINS => "Bla" } }
{ $range_key_name => { IN => [ 1, 2, 5, 7 ] } }
See L<http://docs.amazonwebservices.com/amazondynamodb/latest/developerguide/API_Query.html>
=back
=item * $args
{
limit => 5,
consistent => 0,
backward => 0,
#start_key => { .. }
attributes => [ qw/ attrib1 attrib2 / ],
#count => 1
}
HASHREF containing:
=over
=item * limit
Amount of items to return
Default: unlimited
=item * consistent
If set to 1, consistent read is performed
Default: 0
=item * backward
Whether traverse index backward or forward.
Default: 0 (=forward)
=item * start_key
Contains start key, as return in C<LastEvaluatedKey> from previous query. Allows to iterate above a table in pages.
{ $hash_key => 5, $range_key => "something" }
=item * attributes
Return only those attributes
[ qw/ attrib attrib2 / ]
=item * count
Instead of returning the actual result, return the count.
Default: 0 (=return result)
=item * all
Iterate through all pages (see link to API above) and return them all.
Can take some time. Also: max_retries might be needed to set, as a scan/query create lot's of read-units, and an immediate reading of the next "pages" lead to an Exception due to too many reads.
Default: 0 (=first "page" of items)
=back
=back
=cut
sub query_items {
my ( $self, $table, $filter_ref, $args_ref ) = @_;
my $table_orig = $table;
$table = $self->_table_name( $table );
$args_ref ||= {
limit => undef, # amount of items
consistent => 0, # default: eventually, not hard, conistent
backward => 0, # default: forward
start_key => undef, # eg { pk_name => 123, pk_other => 234 }
attributes => undef, # eq [ qw/ attrib1 attrib2 / ]
count => 0, # returns amount instead of the actual result
all => 0, # read all entries (runs possibly multiple queries)
max_retries => undef, # overwrite default max rewrites
};
# check definition
croak "query_items: Table '$table' does not exist in table definition"
unless defined $self->tables->{ $table };
my $table_ref = $self->tables->{ $table };
# die "query_items: Can run query_items only on tables with range key! '$table' does not have a range key.."
# unless defined $table_ref->{ range_key };
# build put
my %query = (
TableName => $table,
ConsistentRead => $args_ref->{ consistent } ? \1 : \0,
ScanIndexForward => $args_ref->{ backward } ? \0 : \1,
( defined $args_ref->{ limit } ? ( Limit => $args_ref->{ limit } ) : () ),
);
# using filter
my %filter = %$filter_ref;
if ( defined $filter{ $table_ref->{ hash_key } } ) {
croak "query_items: Missing hash key value in filter-clause"
unless defined $filter{ $table_ref->{ hash_key } };
$query{ HashKeyValue } = {
$self->_attrib_type( $table, $table_ref->{ hash_key } ) =>
( delete $filter{ $table_ref->{ hash_key } } ) . ''
};
}
# adding range to filter
if ( defined $table_ref->{ range_key }) {
croak "query_items: Missing range key value in filter-clause"
unless defined $filter{ $table_ref->{ range_key } };
# r_ref = { GT => 1 } OR { BETWEEN => [ 1, 5 ] } OR { EQ => [ 1 ] } OR 5 FOR { EQ => 5 }
my $r_ref = delete $filter{ $table_ref->{ range_key } };
$r_ref = { EQ => $r_ref } unless ref( $r_ref );
my ( $op, $vals_ref ) = %$r_ref;
$vals_ref = [ $vals_ref ] unless ref( $vals_ref );
my $type = $self->_attrib_type( $table, $table_ref->{ range_key } );
$query{ RangeKeyCondition } = {
AttributeValueList => [ map {
{ $type => $_. '' }
} @$vals_ref ],
ComparisonOperator => uc( $op )
};
}
# too much keys
croak "query_items: Cannot use keys ". join( ', ', sort keys %filter ). " in in filter - only hash and range key allowed."
if keys %filter;
# with start key?
if( defined( my $start_key_ref = $args_ref->{ start_key } ) ) {
$self->_check_keys( "query_items: start_key", $table, $start_key_ref );
my $e_ref = $query{ ExclusiveStartKey } = {};
# add hash key
if ( defined $start_key_ref->{ $table_ref->{ hash_key } } ) {
my $type = $self->_attrib_type( $table, $table_ref->{ hash_key } );
$e_ref->{ HashKeyElement } = { $type => $start_key_ref->{ $table_ref->{ hash_key } } };
}
# add range key?
if ( defined $table_ref->{ range_key } && defined $start_key_ref->{ $table_ref->{ range_key } } ) {
my $type = $self->_attrib_type( $table, $table_ref->{ range_key } );
$e_ref->{ RangeKeyElement } = { $type => $start_key_ref->{ $table_ref->{ range_key } } };
}
}
# only certain attributes
if ( defined( my $attribs_ref = $args_ref->{ attributes } ) ) {
my @keys = $self->_check_keys( "query_items: attributes", $table, $attribs_ref );
$query{ AttributesToGet } = \@keys;
}
# or count?
elsif ( $args_ref->{ count } ) {
$query{ Count } = \1;
}
# perform query
#print Dumper( { QUERY => \%query } );
my ( $res, $res_ok, $json_ref ) = $self->request( Query => \%query, {
max_retries => $args_ref->{ max_retries },
} );
# format & return result
if ( $res_ok && defined $json_ref->{ Items } ) {
my @res;
foreach my $from_ref( @{ $json_ref->{ Items } } ) {
push @res, $self->_format_item( $table, $from_ref );
}
my $count = $json_ref->{ Count };
# build start key for return or use
my $next_start_key_ref;
if ( defined $json_ref->{ LastEvaluatedKey } ) {
$next_start_key_ref = {};
# add hash key to start key
my $hash_type = $self->_attrib_type( $table, $table_ref->{ hash_key } );
$next_start_key_ref->{ $table_ref->{ hash_key } } = $json_ref->{ LastEvaluatedKey }->{ HashKeyElement }->{ $hash_type };
# add range key to start key
if ( defined $table_ref->{ range_key } && defined $json_ref->{ LastEvaluatedKey }->{ RangeKeyElement } ) {
my $range_type = $self->_attrib_type( $table, $table_ref->{ range_key } );
$next_start_key_ref->{ $table_ref->{ range_key } } = $json_ref->{ LastEvaluatedKey }->{ RangeKeyElement }->{ $range_type };
}
}
# cycle through all?
if ( $args_ref->{ all } && $next_start_key_ref ) {
# make sure we do not run into a loop by comparing last and current start key
my $new_start_key = join( ';', map { sprintf( '%s=%s', $_, $next_start_key_ref->{ $_ } ) } sort keys %$next_start_key_ref );
my %key_cache = defined $args_ref->{ _start_key_cache } ? %{ $args_ref->{ _start_key_cache } } : ();
#print Dumper( { STARTKEY => $next_start_key_ref, LASTEVAL => $json_ref->{ LastEvaluatedKey }, KEYS => [ \%key_cache, $new_start_key ] } );
if ( ! defined $key_cache{ $new_start_key } ) {
$key_cache{ $new_start_key } = 1;
# perform sub-query
my ( $sub_count, $sub_res_ref ) = $self->query_items( $table_orig, $filter_ref, {
%$args_ref,
_start_key_cache => \%key_cache,
start_key => $next_start_key_ref
} );
#print Dumper( { SUB_COUNT => $sub_count } );
# add result
if ( $sub_count ) {
$count += $sub_count;
push @res, @$sub_res_ref;
}
}
}
return wantarray ? ( $count, \@res, $next_start_key_ref ) : \@res;
}
# error
$self->error( 'query_items failed: '. $self->_extract_error_message( $res ) );
return;
}
=head2 scan_items $table, $filter, $args
Performs scan on table. The result is B<eventually consistent>. Non hash or range keys are allowed in the filter.
See query_items for argument description.
Main difference to query_items: A whole table scan is performed, which is much slower. Also the amount of data scanned is limited in size; see L<http://docs.amazonwebservices.com/amazondynamodb/latest/developerguide/API_Scan.html>
=cut
sub scan_items {
my ( $self, $table, $filter_ref, $args_ref ) = @_;
my $table_orig = $table;
$table = $self->_table_name( $table );
$args_ref ||= {
limit => undef, # amount of items
start_key => undef, # eg { hash_key => 1, range_key => "bla" }
attributes => undef, # eq [ qw/ attrib1 attrib2 / ]
count => 0, # returns amount instead of the actual result
all => 0, # read all entries (runs possibly multiple queries)
max_retries => undef, # overwrite default max retries
};
# check definition
croak "scan_items: Table '$table' does not exist in table definition"
unless defined $self->tables->{ $table };
my $table_ref = $self->tables->{ $table };
# build put
my %query = (
TableName => $table,
ScanFilter => {},
( defined $args_ref->{ limit } ? ( Limit => $args_ref->{ limit } ) : () ),
);
# using filter
if ( $filter_ref && keys %$filter_ref ) {
my @filter_keys = $self->_check_keys( "scan_items: filter keys", $table, $filter_ref );
my $s_ref = $query{ ScanFilter };
foreach my $key( @filter_keys ) {
my $type = $self->_attrib_type( $table, $key );
my $val_ref = $filter_ref->{ $key };
my $rvalue = ref( $val_ref ) || '';
if ( $rvalue eq 'HASH' ) {
my ( $op, $value ) = %$val_ref;
$s_ref->{ $key } = {
AttributeValueList => [ { $type => $value. '' } ],
ComparisonOperator => uc( $op )
};
}
elsif( $rvalue eq 'ARRAY' ) {
$s_ref->{ $key } = {
AttributeValueList => [ { $type => $val_ref } ],
ComparisonOperator => 'IN'
};
}
else {
$s_ref->{ $key } = {
AttributeValueList => [ { $type => $val_ref. '' } ],
ComparisonOperator => 'EQ'
};
}
}
}
# with start key?
if( defined( my $start_key_ref = $args_ref->{ start_key } ) ) {
$self->_check_keys( "scan_items: start_key", $table, $start_key_ref );
my $e_ref = $query{ ExclusiveStartKey } = {};
# add hash key
if ( defined $start_key_ref->{ $table_ref->{ hash_key } } ) {
my $type = $self->_attrib_type( $table, $table_ref->{ hash_key } );
$e_ref->{ HashKeyElement } = { $type => $start_key_ref->{ $table_ref->{ hash_key } } };
}
# add range key?
if ( defined $table_ref->{ range_key } && defined $start_key_ref->{ $table_ref->{ range_key } } ) {
my $type = $self->_attrib_type( $table, $table_ref->{ range_key } );
$e_ref->{ RangeKeyElement } = { $type => $start_key_ref->{ $table_ref->{ range_key } } };
}
}
# only certain attributes
if ( defined( my $attribs_ref = $args_ref->{ attributes } ) ) {
my @keys = $self->_check_keys( "scan_items: attributes", $table, $attribs_ref );
$query{ AttributesToGet } = \@keys;
}
# or count?
elsif ( $args_ref->{ count } ) {
$query{ Count } = \1;
}
# perform query
my ( $res, $res_ok, $json_ref ) = $self->request( Scan => \%query, {
max_retries => $args_ref->{ max_retries },
} );
# format & return result
if ( $res_ok && defined $json_ref->{ Items } ) {
my @res;
foreach my $from_ref( @{ $json_ref->{ Items } } ) {
push @res, $self->_format_item( $table, $from_ref );
}
my $count = $json_ref->{ Count };
# build start key for return or use
my $next_start_key_ref;
if ( defined $json_ref->{ LastEvaluatedKey } ) {
$next_start_key_ref = {};
# add hash key to start key
my $hash_type = $self->_attrib_type( $table, $table_ref->{ hash_key } );
$next_start_key_ref->{ $table_ref->{ hash_key } } = $json_ref->{ LastEvaluatedKey }->{ HashKeyElement }->{ $hash_type };
# add range key to start key
if ( defined $table_ref->{ range_key } && defined $json_ref->{ LastEvaluatedKey }->{ RangeKeyElement } ) {
my $range_type = $self->_attrib_type( $table, $table_ref->{ range_key } );
$next_start_key_ref->{ $table_ref->{ range_key } } = $json_ref->{ LastEvaluatedKey }->{ RangeKeyElement }->{ $range_type };
}
}
# cycle through all?
if ( $args_ref->{ all } && $next_start_key_ref ) {
# make sure we do not run into a loop by comparing last and current start key
my $new_start_key = join( ';', map { sprintf( '%s=%s', $_, $next_start_key_ref->{ $_ } ) } sort keys %$next_start_key_ref );
my %key_cache = defined $args_ref->{ _start_key_cache } ? %{ $args_ref->{ _start_key_cache } } : ();
#print Dumper( { STARTKEY => $next_start_key_ref, LASTEVAL => $json_ref->{ LastEvaluatedKey }, KEYS => [ \%key_cache, $new_start_key ] } );
if ( ! defined $key_cache{ $new_start_key } ) {
$key_cache{ $new_start_key } = 1;
# perform sub-query
my ( $sub_count, $sub_res_ref ) = $self->scan_items( $table_orig, $filter_ref, {
%$args_ref,
_start_key_cache => \%key_cache,
start_key => $next_start_key_ref
} );
#print Dumper( { SUB_COUNT => $sub_count } );
# add result
if ( $sub_count ) {
$count += $sub_count;
push @res, @$sub_res_ref;
}
}
}
return wantarray ? ( $count, \@res, $next_start_key_ref ) : \@res;
}
# error
$self->error( 'scan_items failed: '. $self->_extract_error_message( $res ) );
return;
}
=head2 request
Arbitrary request to DynamoDB API
=cut
sub request {
my ( $self, $target, $json, $args_ref ) = @_;
$args_ref ||= {
max_retries => undef
};
# assure security token existing
unless( $self->_init_security_token() ) {
my %error = ( error => $self->error() );
return wantarray ? ( undef, 0, \%error ) : \%error;
}
# convert to string, if required
$json = $self->json->encode( $json ) if ref $json;
# get date
my $http_date = DateTime::Format::HTTP->format_datetime( DateTime->now );
# build signable content
#$json is already utf8 encoded via json encode
my $sign_content = encode_utf8(join( "\n",
'POST', '/', '',
'host:'. $self->host,
'x-amz-date:'. $http_date,
'x-amz-security-token:'. $self->_credentials->{ SessionToken },
'x-amz-target:DynamoDB_20111205.'. $target,
''
)) . "\n" . $json ;
my $signature = hmac_sha256_base64( sha256( $sign_content ), $self->_credentials->{ SecretAccessKey } );
$signature .= '=' while( length( $signature ) % 4 != 0 );
# build request
my $request = HTTP::Request->new( POST => 'http://'. $self->host. '/' );
# .. setup headers
$request->header( host => $self->host );
$request->header( 'x-amz-date' => $http_date );
$request->header( 'x-amz-target', 'DynamoDB_'. $self->api_version. '.'. $target );
$request->header( 'x-amzn-authorization' => join( ',',
'AWS3 AWSAccessKeyId='. $self->_credentials->{ AccessKeyId },
'Algorithm=HmacSHA256',
'SignedHeaders=host;x-amz-date;x-amz-security-token;x-amz-target',
'Signature='. $signature
) );
$request->header( 'x-amz-security-token' => $self->_credentials->{ SessionToken } );
$request->header( 'content-type' => 'application/x-amz-json-1.0' );
# .. add content
$request->content( $json );
my ( $json_ref, $response );
my $tries = defined $args_ref->{ max_retries }
? $args_ref->{ max_retries }
: $self->max_retries + 1;
while( 1 ) {
# run request
$response = $self->lwp->request( $request );
$ENV{ DYNAMO_DB_DEBUG } && warn Dumper( $response );
$ENV{ DYNAMO_DB_DEBUG_KEEPALIVE } && warn " LWP keepalives in use: ", scalar($self->_lwpcache()->get_connections()), "/", $self->_lwpcache()->total_capacity(), "\n";
# get json
$json_ref = $response
? eval { $self->json->decode( $response->decoded_content ) } || { error => "Failed to parse JSON result" }
: { error => "Failed to get result" };
if ( defined $json_ref->{ __type } && $json_ref->{ __type } =~ /ProvisionedThroughputExceededException/ && $tries-- > 0 ) {
$ENV{ DYNAMO_DB_DEBUG_RETRY } && warn "Retry $target: $json\n";
usleep( $self->retry_timeout * 1_000_000 );
next;
}
last;
}
# handle error
if ( defined $json_ref->{ error } && $json_ref->{ error } ) {
$self->error( $json_ref->{ error } );
}
# handle exception
elsif ( defined $json_ref->{ __type } && $json_ref->{ __type } =~ /Exception/ && $json_ref->{ Message } ) {
$self->error( $json_ref->{ Message } );
}
return wantarray ? ( $response, $response ? $response->is_success : 0, $json_ref ) : $json_ref;
}
=head2 error [$str]
Get/set last error
=cut
sub error {
my ( $self, $str ) = @_;
if ( $str ) {
croak $str if $self->raise_error();
$self->_error( $str );
}
return $self->_error if $self->_has_error;
return ;
}
#
# _init_security_token
# Creates new temporary security token (, access and secret key), if not exist
#
sub _init_security_token {
my ( $self ) = @_;
# wheter has valid credentials
if ( $self->_has_credentials() ) {
my $dt = DateTime->now( time_zone => 'local' )->add( seconds => 5 );
return 1 if $dt < $self->_credentials_expire;
}
# build aws signed request
$self->_aws_signer( Net::Amazon::AWSSign->new(
$self->access_key, $self->secret_key ) )
unless $self->_has_aws_signer;
my $url = $self->_aws_signer->addRESTSecret( $self->_security_token_url );
# get token
my $res = $self->lwp->get( $url );
# got response
if ( $res->is_success) {
my $content = $res->decoded_content;
my $result_ref = XMLin( $content );
# got valid result
if( ref $result_ref && defined $result_ref->{ GetSessionTokenResult }
&& defined $result_ref->{ GetSessionTokenResult }
&& defined $result_ref->{ GetSessionTokenResult }->{ Credentials }
) {
# SessionToken, AccessKeyId, Expiration, SecretAccessKey
my $cred_ref = $result_ref->{ GetSessionTokenResult }->{ Credentials };
if ( ref( $cred_ref )
&& defined $cred_ref->{ SessionToken }
&& defined $cred_ref->{ AccessKeyId }
&& defined $cred_ref->{ SecretAccessKey }
&& defined $cred_ref->{ Expiration }
) {
# parse expiration date
my $pattern = DateTime::Format::Strptime->new(
pattern => '%FT%T',
time_zone => 'UTC'
);
my $expire = $pattern->parse_datetime( $cred_ref->{ Expiration } );
$expire->set_time_zone( 'local' );
$self->_credentials_expire( $expire );
# set credentials
$self->_credentials( $cred_ref );
return 1;
}
}
else {
$self->error( "Failed to fetch credentials: ". $res->status_line. " ($content)" );
}
}
else {
my $content = eval { $res->decoded_content } || "No Content";
$self->error( "Failed to fetch credentials: ". $res->status_line. " ($content)" );
}
return 0;
}
#
# _check_table $table
# Check whether table exists and returns definition
#
sub _check_table {
my ( $self, $meth, $table ) = @_;
unless( $table ) {
$table = $meth;
$meth = "check_table";
}
croak "$meth: Table '$table' not defined"
unless defined $self->tables->{ $table };
return $self->tables->{ $table };
}
#
# _check_keys $meth, $table, $key_ref
# Check attributes. Dies on invalid (not registererd) attributes.
#
sub _check_keys {
my ( $self, $meth, $table, $key_ref ) = @_;
my $table_ref = $self->_check_table( $meth, $table );
my @keys = ref( $key_ref )
? ( ref( $key_ref ) eq 'ARRAY'
? @$key_ref
: keys %$key_ref
)
: ( $key_ref )
;
my @invalid_keys = grep { ! defined $table_ref->{ attributes }->{ $_ } } @keys;
croak "$meth: Invalid keys: ". join( ', ', @invalid_keys )
if @invalid_keys;
return wantarray ? @keys : \@keys;
}
#
# _build_pk_filter $table, $where_ref, $node_ref
# Build attribute filter "HashKeyElement" and "RangeKeyElement".
# Hash key and range key will be deleted from where clause
#
sub _build_pk_filter {
my ( $self, $table, $where_ref, $node_ref ) = @_;
# primary key
my $table_ref = $self->_check_table( $table );
my $hash_value = delete $where_ref->{ $table_ref->{ hash_key } };
my $hash_type = $self->_attrib_type( $table, $table_ref->{ hash_key } );
$node_ref->{ HashKeyElement } = { $hash_type => $hash_value . '' };
if ( defined $table_ref->{ range_key } ) {
my $range_value = delete $where_ref->{ $table_ref->{ range_key } };
my $range_type = $self->_attrib_type( $table, $table_ref->{ range_key } );
$node_ref->{ RangeKeyElement } = { $range_type => $range_value . '' };
}
}
#
# _build_attrib_filter $table, $where_ref, $node_ref
# Build attribute filter "Expected" from given where-clause-ref
# {
# attrib1 => 'somevalue', # -> { attrib1 => { Value => { S => 'somevalue' } } }
# attrib2 => \1, # -> { attrib2 => { Exists => true } }
# attrib3 => { # -> { attrib3 => { Value => { S => 'bla' } } }
# value => 'bla'
# }
# }
#
sub _build_attrib_filter {
my ( $self, $table, $where_ref, $node_ref ) = @_;
my $table_ref = $self->_check_table( $table );
foreach my $key( keys %$where_ref ){
my $type = $table_ref->{ attributes }->{ $key };
my %cur;
unless( ref( $where_ref->{ $key } ) ) {
$where_ref->{ $key } = { value => $where_ref->{ $key } };
}
if ( ref( $where_ref->{ $key } ) eq 'SCALAR' ) {
$cur{ Exists } = $where_ref->{ $key };
}
else {
if ( defined( my $value = $where_ref->{ $key }->{ value } ) ) {
$cur{ Value } = { $type => $value. '' };
}
if ( defined $where_ref->{ $key }->{ exists } ) {
$cur{ Exists } = $where_ref->{ $key }->{ exists } ? \1 : \0;
}
}
$node_ref->{ $key } = \%cur if keys %cur;
}
}
#
# _attrib_type $table, $key
# Returns type ("S", "N", "NS", "SS") of existing attribute in table
#
sub _attrib_type {
my ( $self, $table, $key ) = @_;
my $table_ref = $self->_check_table( $table );
return defined $table_ref->{ attributes }->{ $key } ? $table_ref->{ attributes }->{ $key } : "S";
}
#
# _attribs $table
# Returns list of attributes in table
#
sub _attribs {
my ( $self, $table ) = @_;
my $table_ref = $self->_check_table( $table );
return sort keys %{ $table_ref->{ attributes } };
}
#
# _format_item $table, $from_ref
#
# Formats result item into simpler format
# {
# attrib => { S => "bla" }
# }
#
# to
# {
# attrib => 'bla'
# }
#
sub _format_item {
my ( $self, $table, $from_ref ) = @_;
my $table_ref = $self->_check_table( format_item => $table );
my %formatted;
if ( defined $from_ref->{ HashKeyElement } ) {
my @keys = ( 'hash' );
push @keys, 'range' if defined $table_ref->{ range_key };
foreach my $key( @keys ) {
my $key_name = $table_ref->{ "${key}_key" };
my $key_type = $table_ref->{ attributes }->{ $key_name };
$formatted{ $key_name } = $from_ref->{ ucfirst( $key ). 'KeyElement' }->{ $key_type };
}
}
else {
if ( $self->derive_table() ) {
while ( my ( $key, $value ) = each %$from_ref ) {
$formatted{$key} = ( $value->{'S'} || $value->{'N'} || $value->{'NS'} || $value->{'SS'} );
}
}
else {
while( my( $attrib, $type ) = each %{ $table_ref->{ attributes } } ) {
next unless defined $from_ref->{ $attrib };
$formatted{ $attrib } = $from_ref->{ $attrib }->{ $type };
}
}
}
return \%formatted;
}
#
# _table_name
# Returns prefixed table name
#
sub _table_name {
my ( $self, $table, $remove ) = @_;
return $remove ? substr( $table, length( $self->namespace ) ) : $self->namespace. $table;
}
#
# _extract_error_message
#
sub _extract_error_message {
my ( $self, $response ) = @_;
my $msg = '';
if ( $response ) {
my $json = eval { $self->json->decode( $response->decoded_content ) } || { error => "Failed to parse JSON result" };
if ( defined $json->{ __type } ) {
$msg = join( ' ** ',
"ErrorType: $json->{ __type }",
"ErrorMessage: $json->{ message }",
);
}
else {
$msg = $json->{ error };
}
}
else {
$msg = 'No response received. DynamoDB down?'
}
}
#
# _cache_enabled
#
sub _cache_enabled {
my ( $self, $args_ref ) = @_;
return $self->has_cache && ! $args_ref->{ no_cache }
&& ( $args_ref->{ use_cache } || ! $self->cache_disabled );
}
#
# _cache_key_single
#
sub _cache_key_single {
my ( $self, $table, $hash_ref ) = @_;
my $table_ref = $self->_check_table( $table );
my @keys = ( $table_ref->{ hash_key } );
push @keys, $table_ref->{ range_key } if defined $table_ref->{ range_key };
my %pk = map { ( $_ => $hash_ref->{ $_ } || '' ) } @keys;
return $self->_cache_key( $table, 'single', \%pk );
}
#
# _cache_key
#
sub _cache_key {
my ( $self, $table, $name, $id_ref ) = @_;
my $method = $self->cache_key_method();
return sprintf( '%s-%s-%s', $table, $name, $method->( $self->json->encode( $id_ref ) ) );
}
__PACKAGE__->meta->make_immutable;
=head1 AUTHOR
=over
=item * Ulrich Kautz <uk@fortrabbit.de>
=item * Thanks to MadHacker L<http://stackoverflow.com/users/1139526/madhacker> (the signing code in request method)
=item * Benjamin Abbott-Scoot <benjamin@abbott-scott.net> (Keep Alive patch)
=back
=head1 COPYRIGHT
Copyright (c) 2012 the L</AUTHOR> as listed above
=head1 LICENCSE
Same license as Perl itself.
=cut
1;