The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#
# This file is part of Riak-Light
#
# This software is copyright (c) 2013 by Weborama.
#
# This is free software; you can redistribute it and/or modify it under
# the same terms as the Perl 5 programming language system itself.
#
## no critic (RequireUseStrict, RequireUseWarnings)
package Riak::Light;
{
    $Riak::Light::VERSION = '0.053';
}
## use critic

use 5.012000;
use Riak::Light::PBC;
use Riak::Light::Driver;
use Params::Validate qw(validate_pos SCALAR CODEREF);
use English qw(-no_match_vars );
use Scalar::Util qw(blessed);
use IO::Socket;
use Const::Fast;
use JSON;
use Carp;
use Moo;
use MooX::Types::MooseLike::Base qw<Num Str Int Bool Maybe>;
use namespace::autoclean;

# ABSTRACT: Fast and lightweight Perl client for Riak

has port    => ( is => 'ro', isa => Int,  required => 1 );
has host    => ( is => 'ro', isa => Str,  required => 1 );
has r       => ( is => 'ro', isa => Int,  default  => sub {2} );
has w       => ( is => 'ro', isa => Int,  default  => sub {2} );
has dw      => ( is => 'ro', isa => Int,  default  => sub {2} );
has autodie => ( is => 'ro', isa => Bool, default  => sub {1} );
has timeout => ( is => 'ro', isa => Num,  default  => sub {0.5} );
has in_timeout  => ( is => 'lazy' );
has out_timeout => ( is => 'lazy' );

sub _build_in_timeout {
    (shift)->timeout;
}

sub _build_out_timeout {
    (shift)->timeout;
}

has timeout_provider => (
    is => 'ro', isa => Maybe [Str],
    default => sub {'Riak::Light::Timeout::Select'}
);

has driver => ( is => 'lazy' );

sub _build_driver {
    my $self = shift;

    Riak::Light::Driver->new( socket => $self->_build_socket() );
}

sub _build_socket {
    my $self = shift;

    my $host = $self->host;
    my $port = $self->port;

    my $socket = IO::Socket::INET->new(
        PeerHost => $host,
        PeerPort => $port,
        Timeout  => $self->timeout,
    );

    croak "Error ($!), can't connect to $host:$port"
      unless defined $socket;

    return $socket unless defined $self->timeout_provider;

    use Module::Load qw(load);
    load $self->timeout_provider;

    # TODO: add a easy way to inject this proxy
    $self->timeout_provider->new(
        socket      => $socket,
        in_timeout  => $self->in_timeout,
        out_timeout => $self->out_timeout,
    );
}

sub BUILD {
    (shift)->driver;
}

const my $PING     => 'ping';
const my $GET      => 'get';
const my $PUT      => 'put';
const my $DEL      => 'del';
const my $GET_KEYS => 'get_keys';

const my $ERROR_RESPONSE_CODE    => 0;
const my $GET_RESPONSE_CODE      => 10;
const my $GET_KEYS_RESPONSE_CODE => 18;

sub _CODES {
    my $operation = shift;

    return {
        $PING     => { request_code => 1,  response_code => 2 },
        $GET      => { request_code => 9,  response_code => 10 },
        $PUT      => { request_code => 11, response_code => 12 },
        $DEL      => { request_code => 13, response_code => 14 },
        $GET_KEYS => { request_code => 17, response_code => 18 },
    }->{$operation};
}

before [qw(ping get put del)] => sub {
    undef $@    ## no critic (RequireLocalizedPunctuationVars)
};

sub ping {
    my $self = shift;
    $self->_parse_response(
        operation => $PING,
        body      => q(),
    );
}

sub is_alive {
    my $self = shift;

    eval { $self->ping };
}

sub get_keys {
    my ( $self, $bucket, $callback ) =
      validate_pos( @_, 1, 1, { type => CODEREF } );

    my $body = RpbListKeysReq->encode( { bucket => $bucket } );
    $self->_parse_response(
        key       => "*",
        bucket    => $bucket,
        operation => $GET_KEYS,
        body      => $body,
        extra     => { callback => $callback },
    );
}

sub get_raw {
    my ( $self, $bucket, $key ) = validate_pos( @_, 1, 1, 1 );
    $self->_fetch( $bucket, $key, decode => 0 );
}

sub get {
    my ( $self, $bucket, $key ) = validate_pos( @_, 1, 1, 1 );
    $self->_fetch( $bucket, $key, decode => 1 );
}

sub exists {
    my ( $self, $bucket, $key ) = validate_pos( @_, 1, 1, 1 );
    defined $self->_fetch( $bucket, $key, decode => 0, head => 1 );
}

sub _fetch {
    my ( $self, $bucket, $key, %extra ) = @_;

    my $head = $extra{head};

    my $body = RpbGetReq->encode(
        {   r      => $self->r,
            key    => $key,
            bucket => $bucket,
            head   => $head
        }
    );

    $self->_parse_response(
        key       => $key,
        bucket    => $bucket,
        operation => $GET,
        body      => $body,
        extra     => {%extra}
    );
}

sub put_raw {
    my ( $self, $bucket, $key, $value, $content_type ) = validate_pos(
        @_, 1, 1, 1, { type => SCALAR },
        { default => 'plain/text' }
    );

    $self->_store( $bucket, $key, $value, $content_type );
}

sub put {
    my ( $self, $bucket, $key, $value, $content_type ) =
      validate_pos( @_, 1, 1, 1, 1, { default => 'application/json' } );

    my $encoded_value =
      ( $content_type eq 'application/json' )
      ? encode_json($value)
      : $value;

    $self->_store( $bucket, $key, $encoded_value, $content_type );
}

sub _store {
    my ( $self, $bucket, $key, $encoded_value, $content_type ) =
      validate_pos( @_, 1, 1, 1, { type => SCALAR }, 1 );

    my $body = RpbPutReq->encode(
        {   key     => $key,
            bucket  => $bucket,
            content => {
                value        => $encoded_value,
                content_type => $content_type,
            },
        }
    );

    $self->_parse_response(
        key       => $key,
        bucket    => $bucket,
        operation => $PUT,
        body      => $body,
    );
}

sub del {
    my ( $self, $bucket, $key ) = validate_pos( @_, 1, 1, 1 );

    my $body = RpbDelReq->encode(
        {   key    => $key,
            bucket => $bucket,
            rw     => $self->dw
        }
    );

    $self->_parse_response(
        key       => $key,
        bucket    => $bucket,
        operation => $DEL,
        body      => $body,
    );
}

sub _parse_response {
    my ( $self, %args ) = @_;

    my $operation = $args{operation};

    my $request_code  = _CODES($operation)->{request_code};
    my $expected_code = _CODES($operation)->{response_code};

    my $request_body = $args{body};
    my $extra        = $args{extra};
    my $bucket       = $args{bucket};
    my $key          = $args{key};
    my $callback     = $extra->{callback};

    $self->driver->perform_request(
        code => $request_code,
        body => $request_body
      )
      or return $self->_process_generic_error(
        $ERRNO, $operation, $bucket,
        $key
      );

    my $done = $expected_code != $GET_KEYS_RESPONSE_CODE;
    my $response;
    do {
        $response = $self->driver->read_response();

        if ( !defined $response ) {
            $response = { code => -1, body => undef, error => $ERRNO };
            $done = 1;
        }
        elsif ( !$done
            && $response->{code} == $GET_KEYS_RESPONSE_CODE )
        {
            my $obj = RpbListKeysResp->decode( $response->{body} );

            my $keys = $obj->keys;

            if ($keys) {
                $callback->($_) foreach ( @{$keys} );
            }

            $done = $obj->done;
        }
        elsif ( !$done ) {
            $done = 1;
        }
    } while ( !$done );

    my $response_code  = $response->{code};
    my $response_body  = $response->{body};
    my $response_error = $response->{error};

    # return internal error message
    return $self->_process_generic_error(
        $response_error, $operation, $bucket,
        $key
    ) if defined $response_error;

    # return default message
    return $self->_process_generic_error(
        "Unexpected Response Code in (got: $response_code, expected: $expected_code)",
        $operation, $bucket, $key
      )
      if $response_code != $expected_code
          and $response_code != $ERROR_RESPONSE_CODE;

    # return the error msg
    return $self->_process_riak_error(
        $response_body, $operation, $bucket,
        $key
    ) if $response_code == $ERROR_RESPONSE_CODE;

    # return the result from fetch
    return $self->_process_riak_fetch( $response_body, $bucket, $key, $extra )
      if $response_code == $GET_RESPONSE_CODE;

    1    # return true value, in case of a successful put/del
}

sub _process_riak_fetch {
    my ( $self, $encoded_message, $bucket, $key, $extra ) = @_;

    $self->_process_generic_error( "Undefined Message", 'get', $bucket, $key )
      unless ( defined $encoded_message );

    my $should_decode   = $extra->{decode};
    my $decoded_message = RpbGetResp->decode($encoded_message);

    my $content = $decoded_message->content;
    if ( ref($content) eq 'ARRAY' ) {
        my $value        = $content->[0]->value;
        my $content_type = $content->[0]->content_type;

        return ( $content_type eq 'application/json' and $should_decode )
          ? decode_json($value)
          : $value;
    }

    undef;
}

sub _process_riak_error {
    my ( $self, $encoded_message, $operation, $bucket, $key ) = @_;

    my $decoded_message = RpbErrorResp->decode($encoded_message);

    my $errmsg  = $decoded_message->errmsg;
    my $errcode = $decoded_message->errcode;

    $self->_process_generic_error(
        "Riak Error (code: $errcode) '$errmsg'",
        $operation, $bucket, $key
    );
}

sub _process_generic_error {
    my ( $self, $error, $operation, $bucket, $key ) = @_;

    my $extra =
      ( $operation ne 'ping' )
      ? "(bucket: $bucket, key: $key)"
      : q();

    my $error_message = "Error in '$operation' $extra: $error";
    croak $error_message if $self->autodie;

    $@ = $error_message;    ## no critic (RequireLocalizedPunctuationVars)

    undef;
}

1;


=pod

=head1 NAME

Riak::Light - Fast and lightweight Perl client for Riak

=head1 VERSION

version 0.053

=head1 SYNOPSIS

  use Riak::Light;

  # create a new instance - using pbc only
  my $client = Riak::Light->new(
    host => '127.0.0.1',
    port => 8087
  );

  $client->is_alive() or die "ops, riak is not alive";

  # store hashref into bucket 'foo', key 'bar'
  # will serializer as 'application/json'
  $client->put( foo => bar => { baz => 1024 });

  # store text into bucket 'foo', key 'bar'
  $client->put( foo => baz => "sometext", 'text/plain');

  # fetch hashref from bucket 'foo', key 'bar'
  my $hash = $client->get( foo => 'bar');

  # delete hashref from bucket 'foo', key 'bar'
  $client->del(foo => 'bar');

  # list keys in stream
  $client->get_keys(foo => sub{
     my $key = $_[0];

     # you should use another client inside this callback!
     $another_client->del(foo => $key);
  });

=head1 DESCRIPTION

Riak::Light is a very light (and fast) perl client for Riak using PBC interface. Support only basic operations like ping, get, put and del. Is flexible to change the timeout backend for I/O operations and can suppress 'die' in case of error (autodie) using the configuration. There is no auto-reconnect option.

=head2 ATTRIBUTES

=head3 host

Riak ip or hostname. There is no default.

=head3 port

Port of the PBC interface. There is no default.

=head3 r

R value setting for this client. Default 2.

=head3 w

W value setting for this client. Default 2.

=head3 dw

DW value setting for this client. Default 2.

=head3 autodie

Boolean, if false each operation will return undef in case of error (stored in $@). Default is true.

=head3 timeout

Timeout for connection, write and read operations. Default is 0.5 seconds.

=head3 in_timeout

Timeout for read operations. Default is timeout value.

=head3 out_timeout

Timeout for write operations. Default is timeout value.

=head3 timeout_provider

Can change the backend for timeout. The default value is IO::Socket::INET and there is only support to connection timeout.
IMPORTANT: in case of any timeout error, the socket between this client and the Riak server will be closed.
To support I/O timeout you can choose 5 options (or you can set undef to avoid IO Timeout):

=over

=item * Riak::Light::Timeout::Alarm

uses alarm and Time::HiRes to control the I/O timeout. Does not work on Win32. (Not Safe)

=item * Riak::Light::Timeout::Time::Out

uses Time::Out and Time::HiRes to control the I/O timeout. Does not work on Win32. (Not Safe)

=item *  Riak::Light::Timeout::Select

uses IO::Select to control the I/O timeout

=item *  Riak::Light::Timeout::SelectOnWrite

uses IO::Select to control only Output Operations. Can block in Write Operations. Be Careful.

=item *  Riak::Light::Timeout::SetSockOpt

uses setsockopt to set SO_RCVTIMEO and SO_SNDTIMEO socket properties. Does not Work on NetBSD 6.0.

=back

=head3 driver

This is a Riak::Light::Driver instance, to be able to connect and perform requests to Riak over PBC interface.

=head2 METHODS

=head3 is_alive

  $client->is_alive() or warn "ops... something is wrong: $@";

Perform a ping operation. Will return false in case of error (will store in $@).

=head3 is_alive

  try { $client->ping() } catch { "ops... something is wrong: $_" };

Perform a ping operation. Will die in case of error.

=head3 get

  my $value_or_reference = $client->get(bucket => 'key');

Perform a fetch operation. Expects bucket and key names. Decode the json into a Perl structure. if the content_type is 'application/json'. If you need the raw data you can use L<get_raw>.

=head3 get_raw

  my $scalar_value = $client->get_raw(bucket => 'key');

Perform a fetch operation. Expects bucket and key names. Return the raw data. If you need decode the json, you should use L<get> instead.

=head3 exists

  $client->exists(bucket => 'key') or warn "key not found";

Perform a fetch operation but with head => 0, and the if there is something stored in the bucket/key.

=head3 put

  $client->put(bucket => key => { some_values => [1,2,3] });
  $client->put(bucket => key => 'text', 'plain/text');

Perform a store operation. Expects bucket and key names, the value and the content type (optional, default is 'application/json'). Will encode the structure in json string if necessary. If you need only store the raw data you can use L<put_raw> instead.

=head3 put_raw

  $client->put_raw(bucket => key => encode_json({ some_values => [1,2,3] }), 'application/json');
  $client->put_raw(bucket => key => 'text');

Perform a store operation. Expects bucket and key names, the value and the content type (optional, default is 'plain/text'). Will encode the raw data. If you need encode the structure you can use L<put> instead.

=head3 del

  $client->del(bucket => key);

Perform a delete operation. Expects bucket and key names.

=head3 get_keys

  $client->get_keys(foo => sub{
     my $key = $_[0];

     # you should use another client inside this callback!
     $another_client->del(foo => $key);
  });

Perform a list keys operation. Receive a callback and will call it for each key. You can't use this callback to perform other operations!

=head1 SEE ALSO

L<Net::Riak>

L<Data::Riak>

L<Data::Riak::Fast>

=head1 AUTHOR

Tiago Peczenyj <tiago.peczenyj@gmail.com>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2013 by Weborama.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut


__END__