package Net::Riak::MapReduce;
{
  $Net::Riak::MapReduce::VERSION = '0.1702';
}

# ABSTRACT: Allows you to build up and run a map/reduce operation on Riak

use JSON;
use Moose;
use Scalar::Util;

use Data::Dumper;

use Net::Riak::LinkPhase;
use Net::Riak::MapReducePhase;

with 'Net::Riak::Role::Base' =>
  {classes => [{name => 'client', required => 1}]};

has phases => (
    traits     => ['Array'],
    is         => 'rw',
    isa        => 'ArrayRef[Object]',
    auto_deref => 1,
    lazy       => 1,
    default    => sub { [] },
    handles    => {
        get_phases => 'elements',
        add_phase  => 'push',
        num_phases => 'count',
        get_phase  => 'get',
    },
);
has inputs_bucket => (
    is => 'rw',
    isa => 'Str',
    predicate => 'has_inputs_bucket',
);
has inputs => (
    traits  => ['Array'],
    is      => 'rw',
    isa     => 'ArrayRef[ArrayRef]',
    handles => {add_input => 'push',},
    default => sub { [] },
);
has input_mode => (
    is        => 'rw',
    isa       => 'Str',
    predicate => 'has_input_mode',
);

sub add {
    my $self = shift;
    my $arg  = shift;

    if (ref $arg eq 'ARRAY') {
        do{
            $self->add_input($arg);
        }while($arg = shift @_);

        return $self;
    }

    if (!scalar @_) {
        if ($arg->isa('Net::Riak::Object')) {
            $self->add_object($arg);
        } elsif ($arg->isa('Net::Riak::Bucket')) {
            $self->add_bucket($arg->name);
        } else {
            $self->add_bucket($arg);
        }
    }
    else {
        $self->add_bucket_key_data($arg, @_);
    }
    $self;
}

sub add_object {
    my ($self, $obj) = @_;
    $self->add_bucket_key_data($obj->bucket->name, $obj->key);
}

sub add_bucket_key_data {
    my ($self, $bucket, $key, $data) = @_;
    if ($self->has_input_mode && $self->input_mode eq 'bucket') {
        croak("Already added a bucket, can't add an object");
    }
    else {
        $self->add_input([$bucket, $key, $data]);
    }
}

sub add_bucket {
    my ($self, $bucket) = @_;
    $self->input_mode('bucket');
    $self->inputs_bucket($bucket);
    $self;
}

sub link {
    my ($self, $bucket, $tag, $keep) = @_;
    $bucket ||= '_';
    $tag    ||= '_';
    $keep   ||= JSON::false;

    $self->add_phase(
        Net::Riak::LinkPhase->new(
            bucket => $bucket,
            tag    => $tag,
            keep   => $keep
        )
    );
}

sub map {
    my ($self, $function, %options) = @_;

    my $map_reduce = Net::Riak::MapReducePhase->new(
        type     => 'map',
        function => $function,
        keep     => $options{keep} ? JSON::true : JSON::false,
        arg      => $options{arg} || [],
    );
    $self->add_phase($map_reduce);
    $self;
}

sub reduce {
    my ($self, $function, %options) = @_;

    my $map_reduce = Net::Riak::MapReducePhase->new(
        type     => 'reduce',
        function => $function,
        keep     => $options{keep} || JSON::false,
        arg      => $options{arg} || [],
    );
    $self->add_phase($map_reduce);
    $self;
}

sub run {
    my ($self, $timeout) = @_;

    my $num_phases = $self->num_phases;
    my $keep_flag  = 0;
    my $query      = [];

    my $total_phase = $self->num_phases;
    foreach my $i (0 .. ($total_phase - 1)) {
        my $phase = $self->get_phase($i);
        if ($i == ($total_phase - 1) && !$keep_flag) {
            $phase->keep(JSON::true);
        }
        $keep_flag = 1 if ($phase->{keep}->isa(JSON::true));
        push @$query, $phase->to_array;
    }

    my $inputs;
    if ($self->has_input_mode && $self->input_mode eq 'bucket' && $self->has_inputs_bucket) {
        $inputs = $self->inputs_bucket;
    }else{
        $inputs = $self->inputs;
    }

    my $job = {inputs => $inputs, query => $query};

    # how phases set to 'keep'.
    my $p = scalar ( grep { $_->keep } $self->phases);

    my $result = $self->client->execute_job($job, $timeout, $p);

    my @phases = $self->phases;
    if (ref $phases[-1] ne 'Net::Riak::LinkPhase') {
        return $result;
    }

    my $a = [];
    foreach (@$result) {
        my $l = Net::Riak::Link->new(
            bucket => Net::Riak::Bucket->new(name => $_->[0], client => $self->client),
            key    => $_->[1],
            tag    => $_->[2],
            client => $self->client
        );
        push @$a, $l;
    }
    return $a;
}

1;

__END__

=pod

=head1 NAME

Net::Riak::MapReduce - Allows you to build up and run a map/reduce operation on Riak

=head1 VERSION

version 0.1702

=head1 SYNOPSIS

    use Net::Riak;

    my $riak = Net::Riak->new( host => "http://10.0.0.127:8098/" );
    my $bucket = $riak->bucket("Cats");

    my $query = $riak->add("Cats");
    $query->map(
        'function(v, d, a) { return [v]; }',
        arg => [qw/some params to your function/]
    );

    $query->reduce("function(v) { return [v];}");
    my $json = $query->run(10000);

    # can also be used like:

    my $query = Net::Riak::MapReduce->new(
        client => $riak->client
    );

    # named functions
    my $json = $query->add_bucket('Dogs')
        ->map('Riak.mapValuesJson')
        ->reduce('Your.SortFunction')
        ->run;

=head1 DESCRIPTION

The MapReduce object allows you to build up and run a map/reduce operations on Riak.

=head2 ATTRIBUTES

=over 4

=item B<phases>

=item B<inputs_bucket>

=item B<inputs>

=item B<input_mode>

=back

=head1 METHODS

=head2 add

arguments: L<Net::Riak::Bucket> / Bucket name /  L<Net::Riak::Object> / Array

return: a Net::Riak::MapReduce object

Add inputs to a map/reduce operation. This method takes three different forms, depending on the provided inputs. You can specify either a RiakObject, a string bucket name, or a bucket, key, and additional arg.

Create a MapReduce job

    my $mapred = $riak->add( ["alice","p1"],["alice","p2"],["alice","p5"] );

Add your inputs to a MapReduce job

    $mapred->add( ["alice","p1"],["alice","p2"] );
    $mapred->add( "alice", "p5" );
    $mapred->add( $riak->bucket("alice")->get("p6") );

=head2 add_object

=head2 add_bucket_key_data

=head2 add_bucket

=head2 link

arguments: bucketname, tag, keep

return: $self

Add a link phase to the map/reduce operation.

The default value for bucket name is '_', which means all buckets.

The default value for tag is '_'.

The flag argument means to flag whether to keep results from this stage in the map/reduce. (default False, unless this is the last step in the phase)

=head2 map

arguments: $function, %options

return: self

    ->map("function () {..}", keep => 0, args => ['foo', 'bar']);
    ->map('Riak.mapValuesJson'); # de-serializes data into JSON

Add a map phase to the map/reduce operation.

functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')

%options is an optional associative array containing:

    language
    keep - flag
    arg - an arrayref of parameterss for the JavaScript function

=head2 reduce

arguments: $function, %options

return: $self

    ->reduce("function () {..}", keep => 1, args => ['foo', 'bar']);

Add a reduce phase to the map/reduce operation.

functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')

=head2 run

arguments: $function, %options

arguments: $timeout

return: arrayref

Run the map/reduce operation and attempt to de-serialize the JSON response to a perl structure. rayref of RiakLink objects if the last phase is a link phase.

Timeout in milliseconds,

=head2 SEE ALSO

REST API

https://wiki.basho.com/display/RIAK/MapReduce

List of built-in named functions for map / reduce phases

http://hg.basho.com/riak/src/tip/doc/js-mapreduce.org#cl-496

=head1 AUTHOR

franck cuny <franck@lumberjaph.net>, robin edwards <robin.ge@gmail.com>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2013 by linkfluence.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut