package Catmandu::Store::ElasticSearch::Bag;

use Catmandu::Sane;
use Moo;
use Catmandu::Hits;
use Catmandu::Store::ElasticSearch::Searcher;
use Catmandu::Store::ElasticSearch::CQL;

with 'Catmandu::Bag';
with 'Catmandu::Searchable';

has buffer_size => (is => 'ro', lazy => 1, builder => 'default_buffer_size');
has _bulk       => (is => 'ro', lazy => 1, builder => '_build_bulk');
has cql_mapping => (is => 'ro');
has on_error    => (is => 'ro', default => sub { 'IGNORE' });

sub default_buffer_size { 100 }

sub _build_bulk {
    my ($self) = @_;
    my %args = (
        index     => $self->store->index_name,
        type      => $self->name,
        max_count => $self->buffer_size,
        on_error  => \&{$self->on_error},
    );
    if ($self->log->is_debug) {
        $args{on_success} = sub {
            my ($action, $res, $i) = @_; # TODO return doc instead of index
            $self->log->debug($res);
        };
    }
    $self->store->es->bulk_helper(%args);
}

sub generator {
    my ($self) = @_;
    sub {
        state $scroll = $self->store->es->scroll_helper(
            index       => $self->store->index_name,
            type        => $self->name,
            search_type => 'scan',
            size        => $self->buffer_size, # TODO divide by number of shards
            body        => {
                query => {match_all => {}},
            },
        );
        my $data = $scroll->next // return;
        $data->{_source};
    };
}

sub count {
    my ($self) = @_;
    $self->store->es->count(
        index => $self->store->index_name,
        type  => $self->name,
    )->{count};
}

sub get { # TODO ignore missing
    my ($self, $id) = @_;
    try {
        $self->store->es->get_source(
            index => $self->store->index_name,
            type  => $self->name,
            id    => $id,
        );
   } catch_case [
       'Search::Elasticsearch::Error::Missing' => sub { undef }
   ];
}

sub add {
    my ($self, $data) = @_;
    $self->_bulk->index({
        id     => $data->{_id},
        source => $data,
    });
}

sub delete {
    my ($self, $id) = @_;
    $self->_bulk->delete({id => $id});
}

sub delete_all { # TODO refresh
    my ($self) = @_;
    my $es = $self->store->es;
    $es->delete_by_query(
        index => $self->store->index_name,
        type  => $self->name,
        body  => {
            query => {match_all => {}},
        },
    );
}

sub delete_by_query { # TODO refresh
    my ($self, %args) = @_;
    my $es = $self->store->es;
    $es->delete_by_query(
        index => $self->store->index_name,
        type  => $self->name,
        body  => {
            query => $args{query},
        },
    );
}

sub commit {
    my ($self) = @_;
    $self->_bulk->flush;
}

sub search {
    my ($self, %args) = @_;

    my $start = delete $args{start};
    my $limit = delete $args{limit};
    my $bag   = delete $args{reify};

    if ($bag) {
        $args{fields} = [];
    }

    my $res = $self->store->es->search(
        index => $self->store->index_name,
        type  => $self->name,
        body  => {
            %args,
            from => $start,
            size => $limit,
        },
    );

    my $docs = $res->{hits}{hits};

    my $hits = {
        start => $start,
        limit => $limit,
        total => $res->{hits}{total},
    };

    if ($bag) {
        $hits->{hits} = [ map { $bag->get($_->{_id}) } @$docs ];
    } elsif ($args{fields}) {
        $hits->{hits} = [ map { $_->{fields} || {} } @$docs ];
    } else {
        $hits->{hits} = [ map { $_->{_source} } @$docs ];
    }

    $hits = Catmandu::Hits->new($hits);

    for my $key (qw(facets suggest)) {
        $hits->{$key} = $res->{$key} if exists $args{$key};
    }

    if ($args{highlight}) {
        for my $hit (@$docs) {
            if (my $hl = $hit->{highlight}) {
                $hits->{highlight}{$hit->{_id}} = $hl;
            }
        }
    }

    $hits;
}

sub searcher {
    my ($self, %args) = @_;
    Catmandu::Store::ElasticSearch::Searcher->new(%args, bag => $self);
}

sub translate_sru_sortkeys {
    my ($self, $sortkeys) = @_;
    [ grep { defined $_ } map { $self->_translate_sru_sortkey($_) } split /\s+/, $sortkeys ];
}

sub _translate_sru_sortkey {
    my ($self, $sortkey) = @_;
    my ($field, $schema, $asc) = split /,/, $sortkey;
    $field || return;
    if (my $map = $self->cql_mapping) {
        $field = lc $field;
        $field =~ s/(?<=[^_])_(?=[^_])//g if $map->{strip_separating_underscores};
        $map = $map->{indexes} || return;
        $map = $map->{$field}  || return;
        $map->{sort} || return;
        if (ref $map->{sort} && $map->{sort}{field}) {
            $field = $map->{sort}{field};
        } elsif (ref $map->{field}) {
            $field = $map->{field}->[0];
        } elsif ($map->{field}) {
            $field = $map->{field};
        }
    }
    $asc //= 1;
    +{ $field => $asc ? 'asc' : 'desc' };
}

sub translate_cql_query {
    my ($self, $query) = @_;
    Catmandu::Store::ElasticSearch::CQL->new(mapping => $self->cql_mapping)->parse($query);
}

sub normalize_query {
    my ($self, $query) = @_;
    if (ref $query) {
        $query;
    } elsif ($query) {
        {query_string => {query => $query}};
    } else {
        {match_all => {}};
    }
}

=head1 SEE ALSO

L<Catmandu::Bag>, L<Catmandu::Searchable>

=cut

1;