The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#
# This file is part of AnyEvent-Riak
#
# This software is copyright (c) 2014 by Damien Krotkine.
#
# This is free software; you can redistribute it and/or modify it under
# the same terms as the Perl 5 programming language system itself.
#
package AnyEvent::Riak;
$AnyEvent::Riak::VERSION = '0.93';
# ABSTRACT: AnyEvent ProtocolBuffers Riak Client

use 5.010;
use strict;
use warnings;
use AnyEvent::Riak::PBC;
use Types::Standard -types;
use Carp;
require bytes;
use Moo;

use AnyEvent::Handle;

my $message_codes = {
  RpbErrorResp => 0,
  RpbPingReq => 1,
  RpbPingResp => 2,
  RpbGetClientIdReq => 3,
  RpbGetClientIdResp => 4,
  RpbSetClientIdReq => 5,
  RpbSetClientIdResp => 6,
  RpbGetServerInfoReq => 7,
  RpbGetServerInfoResp => 8,
  RpbGetReq => 9,
  RpbGetResp => 10,
  RpbPutReq => 11,
  RpbPutResp => 12,
  RpbDelReq => 13,
  RpbDelResp => 14,
  RpbListBucketsReq => 15,
  RpbListBucketsResp => 16,
  RpbListKeysReq => 17,
  RpbListKeysResp => 18,
  RpbGetBucketReq => 19,
  RpbGetBucketResp => 20,
  RpbSetBucketReq => 21,
  RpbSetBucketResp => 22,
  RpbMapRedReq => 23,
  RpbMapRedResp => 24,
  RpbIndexReq => 25,
  RpbIndexResp => 26,
  RpbSearchQueryReq => 27,
  RbpSearchQueryResp => 28,
  RpbResetBucketReq => 29,
  RpbResetBucketResp => 30,
  RpbGetBucketTypeReq => 31,
  RpbSetBucketTypeResp => 32,
  RpbCSBucketReq => 40,
  RpbCSUpdateReq => 41,
  RpbCounterUpdateReq => 50,
  RpbCounterUpdateResp => 51,
  RpbCounterGetReq => 52,
  RpbCounterGetResp => 53,
  RpbYokozunaIndexGetReq => 54,
  RpbYokozunaIndexGetResp => 55,
  RpbYokozunaIndexPutReq => 56,
  RpbYokozunaIndexPutResp => 57,
  RpbYokozunaSchemaGetReq => 58,
  RpbYokozunaSchemaGetResp => 59,
  RpbYokozunaSchemaPutReq => 60,
  DtFetchReq => 80,
  DtFetchResp => 81,
  DtUpdateReq => 82,
  DtUpdateResp => 83,
  RpbAuthReq => 253,
  RpbAuthResp => 254,
  RpbStartTls => 255,
};


has host                => ( is => 'ro', isa => Str,  default => sub { '127.0.0.1'} );
has port                => ( is => 'ro', isa => Int,  default => sub { 8087 } );
has on_connect          => ( is => 'ro', isa => CodeRef,  required => 1 );
has on_connect_error    => ( is => 'ro', isa => CodeRef,  required => 1 );
has connect_timeout     => ( is => 'ro', isa => Num,  default  => sub {5} );
has timeout             => ( is => 'ro', isa => Num,  default  => sub {5} );
has no_delay            => ( is => 'ro', isa => Bool, default  => sub {0} );

has _handle => ( is => 'ro', lazy => 1, clearer => 1, builder => sub {
    my ($self) = @_;
    my ($host, $port) = ($self->host, $self->port);

    my $on_connect = $self->on_connect;
    my $on_connect_error = $self->on_connect_error;
    my $c_timeout = $self->connect_timeout;
    my $no_delay = $self->no_delay;

    AnyEvent::Handle->new (
      connect  => [$host, $port],
      no_delay => $no_delay,
      on_connect => $on_connect,
      on_connect_error => $on_connect_error,
      on_prepare => sub { $c_timeout },
      on_error => sub { my ($handle, $fatal, $message) = @_;
                        croak "Panic: no special on_error has been set yet! error: $message"; },
    );

});


sub BUILD {
    my ($self) = @_;
    $self->_handle();
}


sub close {
    my ($self, $callback) = @_;
    defined $callback && ref($callback) eq 'CODE'
      or croak "last parameter must be a CoderRef callback";
    $self->_handle->low_water_mark(0);
    $self->_handle->on_drain( sub { shutdown($_[0]{fh}, 1); $callback->(); } );
    $self->_clear_handle();
    return;
}

# we don't want DESTROY to be autoloaded
sub DESTROY { }

### Deal with common, general case, Riak commands
our $AUTOLOAD;

sub AUTOLOAD {
  my $command = $AUTOLOAD;
  $command =~ s/.*://;

  my ($request_name, $request_code, $response_name, $response_code) = _command_to_req($command);
  my $method = sub { shift->_run_cmd($request_name, $request_code, $response_name, $response_code, @_) };

  # Save this method for future calls
  no strict 'refs';
  *$AUTOLOAD = $method;

  goto $method;
}

sub _command_to_req {
    my ($command) = @_;
    my $request_name  = 'Rpb' . ucfirst(_to_camel($command)) . 'Req';
    my $response_name = 'Rpb' . ucfirst(_to_camel($command)) . 'Resp';
    my $request_code  = $message_codes->{$request_name};
    my $response_code = $message_codes->{$response_name};
    defined $request_code && defined $response_code
      or croak "unknown method '$command'";
    return $request_name, $request_code, $response_name, $response_code;
}

sub _run_cmd {
    my $callback = pop;
    defined $callback && ref($callback) eq 'CODE'
      or croak "last parameter must be a CoderRef callback";
    my ( $self, $request_name, $request_code, $response_name, $expected_response_code, $args ) = @_;

    my $body = '';
    if (defined $args) {
        eval { $body = "$request_name"->encode($args); 1 }
          or return $callback->(undef, { error_code => -1, error_message => $@ });
    }

    my $handle = $self->_handle;
    $handle->on_error(sub {
        my ($handle, $fatal, $message) = @_;
        $fatal or $handle->destroy(); # force destroy even if non fatal
        $callback->(undef, { error_code => $!,
                             error_message => $message }) });

    $handle->timeout_reset;
    $handle->timeout($self->timeout);
    $handle->on_timeout(sub { $callback->(undef, { error_code => -1,
                                                   error_message => 'timeout' }) });
    $handle->push_write(  pack('N', bytes::length($body) + 1)
                        . pack('c', $request_code) . $body
                       );
    $handle->timeout_reset;

    $handle->push_read( chunk => 4, sub {
         my $len = unpack "N", $_[1];
         $_[0]->timeout_reset;
         $_[0]->unshift_read( chunk => $len, sub {
             $_[0]->timeout_reset;
             $_[0]->timeout(0);
             my ( $response_code, $response_body ) = unpack( 'c a*', $_[1] );

             if ($response_code == $message_codes->{RpbErrorResp}) {
                 my $decoded_message = RpbErrorResp->decode($response_body);
                 return $callback->(undef, { error_code => $decoded_message->errcode,
                                             error_message => $decoded_message->errmsg });
             }
             if ($response_code != $expected_response_code) {
                 return $callback->(undef, {
                   error_code => -2,
                   error_message =>   "wrong response (got: '$response_code', "
                                    . "expected: '$expected_response_code')" });
             }

             # my ($ret, $more_to_come) = ( 1, );

             my $result;
             if ($response_name) {
                 $result = $response_name->decode($response_body);
                 ref($result) eq $response_name
                   or return $callback->(undef, {
                     error_code => -2,
                     error_message =>   "wrong response (got: '" . ref($result) . "', "
                                      . "expected: '$response_name')" });
             } else {
                 $result = 1;
             }
             return $callback->($result);
         });
     });
    $handle->timeout_reset;
    return;
}

sub _to_camel {
    my ($str) = @_;
    $str =~ s/_([a-z])/uc($1)/ge;
    return $str;
}



# Now, some methods that are not generic

sub set_bucket {
    my ($request_name, $request_code, $response_name, $response_code) = _command_to_req('set_bucket');
    shift->_run_cmd($request_name, $request_code, undef, $response_code, @_);
}

sub reset_bucket {
    my ($request_name, $request_code, $response_name, $response_code) = _command_to_req('reset_bucket');
    shift->_run_cmd($request_name, $request_code, undef, $response_code, @_);
}

sub get_bucket_type {
    my $request_name = 'RpbGetBucketTypeReq';
    my $response_name = 'RpbGetBucketResp';
    shift->_run_cmd($request_name, $message_codes->{$request_name},
                    $response_name, $message_codes->{$response_name}, @_);
}

sub set_bucket_type {
    croak "not implemented yet";
}


sub get_client_id {
    croak "deprecated since Riak 1.4";
}


sub set_client_id {
    croak "deprecated since Riak 1.4";
}


1;

__END__

=pod

=encoding UTF-8

=head1 NAME

AnyEvent::Riak - AnyEvent ProtocolBuffers Riak Client

=head1 VERSION

version 0.93

=head1 SYNOPSIS

  use AnyEvent::Riak;
  my $cv1 = AE::cv;
  my $client = AnyEvent::Riak->new(
    on_connect       => sub { $cv1->send },
    on_connect_error => sub { $cv1->croak($_[1])},
  );
  $cv1->recv;

  my $cv2
  $client->put({ bucket  => 'bucket_name',
                 key     => 'key_name',
                 content => { value => 'plip',
                              content_type => 'text/plain',
                            },
               },
               sub {
                   my ($result, $error) = @_;
                   $error and $cv2->croak(
                     sprintf("error %d: %s",
                     @{$error}{qw(error_code error_message)})
                   );
                   $cv2->send($result);
               });

  my $put_result = $cv2->recv();

  my $cv3 = AE::cv;
  $client->get({ bucket => 'bucket_name',
                 key => 'key_name',
               },
               sub {
                   my ($result, $error) = @_;
                   $error and $cv3->croak(
                     sprintf("error %d: %s",
                     @{$error}{qw(error_code error_message)})
                   );
                   $cv3->send($result);
               });

  my $get_result = $cv3->recv();

=head1 ATTRIBUTES

=head2 host => $hostname

Str, Required. Riak IP or hostname. Default to 127.0.0.1

=head2 port => $port_number

Int, Required. Port of the PBC interface. Default to 8087

=head2 on_connect => $cb->($handle, $host, $port, $retry->())

CodeRef, required. Executed on connection. Check out
L<AnyEvent::Handle#on_connect-cb--handle-host-port-retry>

=head2 on_connect_error => $cb->($handle, $message)

CodeRef, required. Executed when the connection could not be established. Check out
L<AnyEvent::Handle#on_connect_error-cb--handle-message>

=head2 connect_timeout => $fractional_seconds

Float, Default 5. Timeout for connection operation, in seconds. Set to 0 for no timeout.

=head2 timeout => $fractional_seconds

Float, Default 5. Timeout for read/write operation, in seconds. Set to 0 for no timeout.

=head2 no_delay => <boolean>

Boolean, Default 0. If set to a true value, TCP_NODELAY will be enabled on the
socket, which means deactivating Nagle's algorithm. Use only if you know what
you're doing.

=head1 METHODS

=head2 $client->close($cb)

This method will wait until everything has been written to the connection, then
close the connection, and then calls the callback without parameters. Use this
to properly close the connection, before destroying the client instance.

=head2 get_bucket

Get bucket properties request.

=over

=item bucket

required, string

=item type

optional, string

=back

=head2 set_bucket

Set bucket properties request

=over

=item bucket

required, string

=item props

required, RpbBucketProps

=item type

optional, string

=back

=head2 reset_bucket

Reset bucket properties request

=over

=item bucket

required, string

=item type

optional, string

=back

=head2 get_bucket_type

Get bucket properties request

=over

=item type

required, string

=back

=head2 set_bucket_type

Set bucket properties request

=over

=item type

required, string

=item props

required, RpbBucketProps

=back

=head2 get

Get Request - retrieve bucket/key

=over

=item bucket

required, string

=item key

required, string

=item r

optional, number

=item pr

optional, number

=item basic_quorum

optional, boolean

=item notfound_ok

optional, boolean

=item if_modified

optional, string

fail if the supplied vclock does not match

=item head

optional, boolean

return everything but the value

=item deletedvclock

optional, boolean

return the tombstone's vclock, if applicable

=item timeout

optional, number

=item sloppy_quorum

optional, boolean

Experimental, may change/disappear

=item n_val

optional, number

Experimental, may change/disappear

=item type

optional, string

Bucket type, if not set we assume the 'default' type

=back

=head2 put

Put request - if options.return_body is set then the updated metadata/data for the key will be returned.

=over

=item bucket

required, string

=item key

optional, string

=item vclock

optional, string

=item content

required, RpbContent

=item w

optional, number

=item dw

optional, number

=item return_body

optional, boolean

=item pw

optional, number

=item if_not_modified

optional, boolean

=item if_none_match

optional, boolean

=item return_head

optional, boolean

=item timeout

optional, number

=item asis

optional, boolean

=item sloppy_quorum

optional, boolean

Experimental, may change/disappear

=item n_val

optional, number

Experimental, may change/disappear

=item type

optional, string

Bucket type, if not set we assume the 'default' type

=back

=for Pod::Coverage BUILD

=for Pod::Coverage get_client_id

=for Pod::Coverage set_client_id

=begin NOT_IMPLEMENTED

=method auth

Authentication request

=over

=item user

required, string

=item password

required, string

=back

=method set_client_id

=over

=item client_id

required, string

Client id to use for this connection
=back

=end NOT_IMPLEMENTED

=begin NOT_IMPLEMENTED

=method del

Delete request

=over

=item bucket

required, string

=item key

required, string

=item rw

optional, number

=item vclock

optional, string

=item r

optional, number

=item w

optional, number

=item pr

optional, number

=item pw

optional, number

=item dw

optional, number

=item timeout

optional, number

=item sloppy_quorum

optional, boolean

Experimental, may change/disappear
=item n_val

optional, number

Experimental, may change/disappear
=item type

optional, string

Bucket type, if not set we assume the 'default' type
=back

=method list_buckets

List buckets request 

=over

=item timeout

optional, number

=item stream

optional, boolean

=item type

optional, string

Bucket type, if not set we assume the 'default' type
=back

=method list_keys

List keys in bucket request

=over

=item bucket

required, string

=item timeout

optional, number

=item type

optional, string

Bucket type, if not set we assume the 'default' type
=back

=method map_red

Map/Reduce request

=over

=item request

required, string

=item content_type

required, string

=back

=method index

Secondary Index query request

=over

=item bucket

required, string

=item index

required, string

=item qtype

required, one of 'eq', 'range'

=item key

optional, string

key here means equals value for index?
=item range_min

optional, string

=item range_max

optional, string

=item return_terms

optional, boolean

=item stream

optional, boolean

=item max_results

optional, number

=item continuation

optional, string

=item timeout

optional, number

=item type

optional, string

Bucket type, if not set we assume the 'default' type
=item term_regex

optional, string

=item pagination_sort

optional, boolean

Whether to use pagination sort for non-paginated queries=back
=method CS_bucket

 added solely for riak_cs currently for folding over a bucket and returning objects.

=over

=item bucket

required, string

=item start_key

required, string

=item end_key

optional, string

=item start_incl

optional, boolean

=item end_incl

optional, boolean

=item continuation

optional, string

=item max_results

optional, number

=item timeout

optional, number

=item type

optional, string

Bucket type, if not set we assume the 'default' type
=back

=method counter_update

Counter update request

=over

=item bucket

required, string

=item key

required, string

=item amount

required, sint64

=item w

optional, number

=item dw

optional, number

=item pw

optional, number

=item returnvalue

optional, boolean

=back

=method counter_get

 counter value

=over

=item bucket

required, string

=item key

required, string

=item r

optional, number

=item pr

optional, number

=item basic_quorum

optional, boolean

=item notfound_ok

optional, boolean

=back

=end NOT_IMPLEMENTED

=head1 RESPONSE OBJECTS

Results returned from various methods are blessed response objects from the
following types. Their attributes can be accessed using accessors (of the same
name), or using the response as a HashRef.

=head2 RpbErrorResp

Error response - may be generated for any Req

=over

=item errmsg

required, string

=item errcode

required, number

=back

=head2 RpbGetServerInfoResp

Get server info request - no message defined, just send RpbGetServerInfoReq message code

=over

=item node

optional, string

=item server_version

optional, string

=back

=head2 RpbGetBucketResp

Get bucket properties response

=over

=item props

required, RpbBucketProps

=back

=head2 RpbGetClientIdResp

Get ClientId Request - no message defined, just send RpbGetClientIdReq message code

=over

=item client_id

required, string

Client id in use for this connection

=back

=head2 RpbGetResp

Get Response - if the record was not found there will be no content/vclock

=over

=item content

repeated, RpbContent

=item vclock

optional, string

the opaque vector clock for the object

=item unchanged

optional, boolean

=back

=head2 RpbPutResp

Put response - same as get response with optional key if one was generated

=over

=item content

repeated, RpbContent

=item vclock

optional, string

the opaque vector clock for the object

=item key

optional, string

the key generated, if any

=back

=head2 RpbListBucketsResp

List buckets response - one or more of these packets will be sent the last one will have done set true (and may not have any buckets in it)

=over

=item buckets

repeated, string

=item done

optional, boolean

=back

=head2 RpbListKeysResp

List keys in bucket response - one or more of these packets will be sent the last one will have done set true (and may not have any keys in it)

=over

=item keys

repeated, string

=item done

optional, boolean

=back

=head2 RpbMapRedResp

Map/Reduce response one or more of these packets will be sent the last one will have done set true (and may not have phase/data in it)

=over

=item phase

optional, number

=item response

optional, string

=item done

optional, boolean

=back

=head2 RpbIndexResp

Secondary Index query response

=over

=item keys

repeated, string

=item results

repeated, RpbPair

=item continuation

optional, string

=item done

optional, boolean

=back

=head2 RpbCSBucketResp

 return for CS bucket fold

=over

=item objects

repeated, RpbIndexObject

=item continuation

optional, string

=item done

optional, boolean

=back

=head2 RpbCounterUpdateResp

Counter update response? No message | error response

=over

=item value

optional, sint64

=back

=head2 RpbCounterGetResp

Counter value response

=over

=item value

optional, sint64

=back

=head1 AUTHOR

Damien Krotkine <dams@cpan.org>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2014 by Damien Krotkine.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut