The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
package AnyEvent::Riak;

# ABSTRACT: non-blocking Riak client

use JSON;
use AnyEvent;
use AnyEvent::HTTP;
use Moose;

with qw/AnyEvent::Riak::Role::HTTPUtils AnyEvent::Riak::Role::CVCB/;

our $VERSION = '0.02';

has host => (is => 'rw', isa => 'Str', default => 'http://127.0.0.1:8098');
has path        => (is => 'rw', isa => 'Str', default => 'riak');
has mapred_path => (is => 'rw', isa => 'Str', default => 'mapred');
has r           => (is => 'rw', isa => 'Int', default => 2);
has w           => (is => 'rw', isa => 'Int', default => 2);
has dw          => (is => 'rw', isa => 'Int', default => 2);

sub is_alive {
    my $self = shift;

    my ($cv, $cb) = $self->_cvcb(\@_);
    my $options = shift;

    http_request(
        GET     => $self->_build_uri([qw/ping/]),
        headers => $self->_build_headers(),
        sub {
            my ($body, $headers) = @_;
            if ($headers->{Status} == 200) {
                $cv->send($cb->(1));
            }
            else {
                $cv->send($cb->(0));
            }
        }
    );
    $cv;
}

sub list_bucket {
    my $self = shift;
    my $bucket_name = shift;

    my ($cv, $cb) = $self->_cvcb(\@_);
    my $options = shift;

    my $params = {
        props => delete $options->{props} || 'true',
        keys  => delete $options->{keys}  || 'true',
    };

    http_request(
        GET     => $self->_build_uri([$self->path, $bucket_name], $params),
        headers => $self->_build_headers(),
        sub {
            my ($body, $headers) = @_;
            if ($body && $headers->{Status} == 200) {
                my $res = JSON::decode_json($body);
                $cv->send($cb->($res));
            }
            else {
                $cv->send($cb->(undef));
            }
        }
    );
    $cv;
}

sub set_bucket {
    my $self        = shift;
    my $bucket_name = shift;
    my $schema      = shift;

    my ($cv, $cb) = $self->_cvcb(\@_);

    http_request(
        PUT     => $self->_build_uri([$self->path, $bucket_name]),
        headers => $self->_build_headers(),
        body => JSON::encode_json({props => $schema}),
        sub {
            my ($body, $headers) = @_;
            if ($headers->{Status} == 204) {
                $cv->send($cb->(1));
            }
            else {
                $cv->send($cb->(0));
            }
        }
    );
    $cv;
}

sub fetch {
    my $self        = shift;
    my $bucket_name = shift;
    my $key         = shift;

    my ($cv, $cb) = $self->_cvcb(\@_);
    my $options = shift;

    my $params = {r => $options->{params}->{r} || $self->r,};

    if ($options->{vtag}) {
        $params->{vtag} = delete $options->{vtag};
    }

    my $headers = {};
    foreach (qw/If-None-Match If-Modified-Since Accept/) {
        $headers->{$_} = delete $options->{headers}->{$_}
          if (exists $options->{headers}->{$_});
    }

    http_request(
        GET =>
          $self->_build_uri([$self->path, $bucket_name, $key], $params),
        headers => $self->_build_headers($headers),
        sub {
            my ($body, $headers) = @_;
            # XXX 300 && 304
            if ($body && $headers->{Status} == 200) {
                $cv->send($cb->(JSON::decode_json($body)));
            }
            else {
                $cv->send($cb->(0));
            }
        }
    );
    $cv;
}

sub store {
    my $self        = shift;
    my $bucket_name = shift;
    my $object      = shift;

    my ($cv, $cb) = $self->_cvcb(\@_);
    my $options = shift;
    my $key = '';

    my $params = {
        w          => $options->{params}->{w}          || $self->w,
        dw         => $options->{params}->{dw}         || $self->dw,
        returnbody => $options->{params}->{returnbody} || 'true',
    };

    if ($options->{key}) {
        $key = delete $options->{key};
        $params->{r} = $options->{params}->{r} || $self->r;
    }

    # XXX headers

    my $json = JSON::encode_json($object);

    http_request(
        POST => $self->_build_uri([$self->path, $bucket_name, $key,], $params),
        headers => $self->_build_headers(),
        body    => $json,
        sub {
            my ($body, $headers) = @_;
            my $result;
            if ($body && ($headers->{Status} == 201 || $headers->{Status} == 200)) {
                $result = $body ? JSON::decode_json($body) : 1;
            }
            elsif ($headers->{Status} == 204) {
                $result = 1;
            }
            else {
                $result = 0;
            }
            $cv->send($cb->($result, $headers));
        }
    );
    $cv;
}

sub delete {
    my $self        = shift;
    my $bucket_name = shift;
    my $key         = shift;

    my ($cv, $cb) = $self->_cvcb(@_);

    http_request(
        DELETE  => $self->_build_uri([$self->path, $bucket_name, $key],),
        headers => $self->_build_headers(),
        sub {
            my ($body, $headers) = @_;
            if ($headers->{Status} == 204) {
                $cv->send($cb->(1));
            }
            else {
                $cv->send($cb->(0));
            }
        }
    );
    $cv;
}

no Moose;

1;



=pod

=head1 NAME

AnyEvent::Riak - non-blocking Riak client

=head1 VERSION

version 0.02

=head1 SYNOPSIS

    use AnyEvent::Riak;

    my $riak = AnyEvent::Riak->new(
        host => 'http://127.0.0.1:8098',
        path => 'riak',
    );

This version is not compatible with the previous version (0.01) of this module and with Riak < 0.91.

For a complete description of the Riak REST API, please refer to L<https://wiki.basho.com/display/RIAK/REST+API>.

=head1 DESCRIPTION

AnyEvent::Riak is a non-blocking riak client using C<AnyEvent>. This client allows you to connect to a Riak instance, create, modify and delete Riak objects.

=head2 METHODS

=over 4

=item B<is_alive> ([$cv, $cb])

Check if the Riak server is alive. If the ping is successful, 1 is returned, else 0.

Options can be:

=over 4

=item B<headers>

A list of valid HTTP headers that will be send with the query

=back

=item B<list_bucket> ($bucket_name, [$options, $cv, $cb])

Reads the bucket properties and/or keys.

    $riak->list_bucket(
        'mybucket',
        {props => 'true', keys => 'false'},
        sub {
            my $res = shift;
            ...
        }
      );

=item B<set_bucket> ($bucket_name, $schema, [%options, $cv, $cb])

Sets bucket properties like n_val and allow_mult.

    $riak->set_bucket(
        'mybucket',
        {n_val => 5},
        sub {
            my $res = shift;
            ...;
        }
    );

=item B<fetch> ($bucket_name, $key, [$options, $cv, $cb])

Reads an object from a bucket.

    $riak->fetch(
        'mybucket', 'mykey',
        {params => {r = 2}, headers => {'If-Modified-Since' => $value}},
        sub {
            my $res = shift;
        }
    );

=item B<store> ($bucket_name, $key, $object, [$options, $cv, $cb])

Stores a new object in a bucket.

    $riak->store(
        'mybucket', $object,
        {key => 'mykey', headers => {''}, params => {w => 2}},
        sub {
            my $res = shift;
            ...
        }
    );

=item B<delete> ($bucket, $key, [$options, $cv, $cb])

Deletes an object from a bucket.

    $riak->delete('mybucket', 'mykey', sub { my $res = shift;... });

=back

=head1 AUTHOR

  franck cuny <franck@lumberjaph.net>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2010 by linkfluence.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut


__END__