The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package ElasticSearch;
$ElasticSearch::VERSION = '0.68';
use strict;
use warnings FATAL => 'all';
use Any::URI::Escape qw(uri_escape);
use Carp;
use constant {
    ONE_REQ     => 1,
    ONE_OPT     => 2,
    ONE_ALL     => 3,
    MULTI_ALL   => 4,
    MULTI_BLANK => 5,
    MULTI_REQ   => 6,

use constant {
    CMD_NONE          => [],
    CMD_INDEX_TYPE_ID => [ index => ONE_REQ, type => ONE_REQ, id => ONE_REQ ],
    CMD_INDEX_TYPE_id => [ index => ONE_REQ, type => ONE_REQ, id => ONE_OPT ],
    CMD_INDEX_type_ID => [ index => ONE_REQ, type => ONE_ALL, id => ONE_REQ ],
    CMD_Index           => [ index => ONE_OPT ],
    CMD_index           => [ index => MULTI_BLANK ],
    CMD_indices         => [ index => MULTI_ALL ],
    CMD_INDICES         => [ index => MULTI_REQ ],
    CMD_INDEX           => [ index => ONE_REQ ],
    CMD_INDEX_TYPE      => [ index => ONE_REQ, type => ONE_REQ ],
    CMD_INDEX_type      => [ index => ONE_REQ, type => MULTI_BLANK ],
    CMD_index_TYPE      => [ index => MULTI_ALL, type => ONE_REQ ],
    CMD_index_types     => [ index => MULTI_ALL, type => MULTI_REQ ],
    CMD_INDICES_TYPE    => [ index => MULTI_REQ, type => ONE_REQ ],
    CMD_index_type      => [ index => MULTI_ALL, type => MULTI_BLANK ],
    CMD_index_then_type => [ index => ONE_OPT, type => ONE_OPT ],
    CMD_RIVER           => [ river => ONE_REQ ],
    CMD_nodes           => [ node  => MULTI_BLANK ],
    CMD_NAME            => [ name  => ONE_REQ ],
    CMD_INDEX_PERC      => [ index => ONE_REQ, percolator => ONE_REQ ],

    CONSISTENCY => [ 'enum', [ 'one', 'quorum', 'all' ] ],
    REPLICATION => [ 'enum', [ 'async', 'sync' ] ],
    SEARCH_TYPE => [
        [   'dfs_query_then_fetch', 'dfs_query_and_fetch',
            'query_then_fetch',     'query_and_fetch',
            'count',                'scan'
    IGNORE_INDICES => [ 'enum', [ 'missing', 'none' ] ],


our %QS_Format = (
    boolean  => '1 | 0',
    duration => "'5m' | '10s'",
    optional => "'scalar value'",
    flatten  => "'scalar' or ['scalar_1', 'scalar_n']",
    'int'    => "integer",
    string   => sub {
        my $k = shift;
        return $k eq 'preference'
            ? '_local | _primary | _primary_first | $string'
            : $k eq 'percolate' || $k eq 'q' ? '$query_string'
            : $k eq 'scroll_id' ? '$scroll_id'
            : $k eq 'df'        ? '$default_field'
            :                     '$string';
    float   => 'float',
    enum    => sub { join " | ", @{ $_[1][1] } },
    coderef => 'sub {..} | "IGNORE"',

our %QS_Formatter = (
    boolean => sub {
        my $key = shift;
        my $val = $_[0] ? $_[1] : $_[2];
        return unless defined $val;
        return ref $val ? $val : [ $key, $val ? 'true' : 'false' ];
    duration => sub {
        my ( $k, $t ) = @_;
        return unless defined $t;
        return [ $k, $t ] if $t =~ /^\d+([smh]|ms)$/i;
        die "$k '$t' is not in the form $QS_Format{duration}\n";
    flatten => sub {
        my $key = shift;
        my $array = shift or return;
        return [ $key, ref $array ? join( ',', @$array ) : $array ];
    'int' => sub {
        my $key = shift;
        my $int = shift;
        return unless defined $int;
        eval { $int += 0; 1 } or die "'$key' is not an integer";
        return [ $key, $int ];
    'float' => sub {
        my $key   = shift;
        my $float = shift;
        return unless defined $float;
        $key = shift if @_;
        eval { $float += 0; 1 } or die "'$key' is not a float";
        return [ $key, $float ];
    'string' => sub {
        my $key    = shift;
        my $string = shift;
        return unless defined $string;
        return [ $key, $string ];
    'coderef' => sub {
        my $key     = shift;
        my $coderef = shift;
        return unless defined $coderef;
        unless ( ref $coderef ) {
            die "'$key' is not a code ref or the string 'IGNORE'"
                unless $coderef eq 'IGNORE';
            $coderef = sub { };
        return [ $key, $coderef ];
    'enum' => sub {
        my $key = shift;
        my $val = shift;
        return unless defined $val;
        my $vals = $_[0];
        for (@$vals) {
            return [ $key, $val ] if $val eq $_;
        die "Unrecognised value '$val'. Allowed values: "
            . join( ', ', @$vals );



sub get {
        {   cmd => CMD_INDEX_type_ID,
            qs  => {
                fields         => ['flatten'],
                ignore_missing => [ 'boolean', 1 ],
                preference     => ['string'],
                refresh        => [ 'boolean', 1 ],
                routing        => ['string'],
                parent         => ['string'],

sub exists : method {
        {   method => 'HEAD',
            cmd    => CMD_INDEX_TYPE_ID,
            qs     => {
                preference => ['string'],
                refresh    => [ 'boolean', 1 ],
                routing    => ['string'],
                parent     => ['string'],
            fixup => sub { $_[1]->{qs}{ignore_missing} = 1 }

sub mget {
    my ( $self, $params ) = parse_params(@_);

    $params->{$_} ||= $self->{_default}{$_} for qw(index type);

    if ( $params->{index} ) {
        if ( my $ids = delete $params->{ids} ) {
            $self->throw( 'Param', 'mget',
                'Cannot specify both ids and docs in mget()' )
                if $params->{docs};
            $params->{docs} = [ map { +{ _id => $_ } } @$ids ];
    else {
        $self->throw( 'Param',
            'Cannot specify a type for mget() without specifying index' )
            if $params->{type};
        $self->throw( 'Param',
            'Use of the ids param with mget() requires an index' )
            if $params->{ids};

    my $filter;
        {   cmd     => [ index => ONE_OPT, type => ONE_OPT ],
            postfix => '_mget',
            data => { docs => 'docs' },
            qs   => {
                fields         => ['flatten'],
                filter_missing => [ 'boolean', 1 ],
            fixup => sub {
                $_[1]->{skip} = [] unless @{ $_[1]{data}{docs} };
                $filter = delete $_[1]->{qs}{filter_missing};
            post_process => sub {
                my $result = shift;
                my $docs   = $result->{docs};
                return $filter ? [ grep { $_->{exists} } @$docs ] : $docs;

my %Index_Defn = (
    cmd => CMD_INDEX_TYPE_id,
    qs  => {
        consistency => CONSISTENCY,
        create      => [ 'boolean', [ op_type => 'create' ] ],
        parent      => ['string'],
        percolate   => ['string'],
        refresh     => [ 'boolean', 1 ],
        replication => REPLICATION,
        routing     => ['string'],
        timeout     => ['duration'],
        timestamp   => ['string'],
        ttl         => ['int'],
        version     => ['int'],
        version_type => [ 'enum', [ 'internal', 'external' ] ],
    data  => { data => 'data' },
    fixup => sub {
        my $data = $_[1]{data}{data};
        $_[1]{data} = ref $data eq 'HASH' ? $data : \$data;

sub index {
    my ( $self, $params ) = parse_params(@_);
    $self->_index( 'index', \%Index_Defn, $params );

sub set {
    my ( $self, $params ) = parse_params(@_);
    $self->_index( 'set', \%Index_Defn, $params );

sub create {
    my ( $self, $params ) = parse_params(@_);
    $self->_index( 'create', \%Index_Defn, { %$params, create => 1 } );

sub _index {
    my $self = shift;
    $_[1]->{method} = $_[2]->{id} ? 'PUT' : 'POST';

sub update {
        {   method  => 'POST',
            cmd     => CMD_INDEX_TYPE_ID,
            postfix => '_update',
            data    => {
                script => ['script'],
                params => ['params'],
                doc    => ['doc'],
                upsert => ['upsert'],
            qs => {
                consistency       => CONSISTENCY,
                fields            => ['flatten'],
                ignore_missing    => [ 'boolean', 1 ],
                parent            => ['string'],
                percolate         => ['string'],
                retry_on_conflict => ['int'],
                routing           => ['string'],
                timeout           => ['duration'],
                replication       => REPLICATION,

sub delete {
        {   method => 'DELETE',
            cmd    => CMD_INDEX_TYPE_ID,
            qs     => {
                consistency    => CONSISTENCY,
                ignore_missing => [ 'boolean', 1 ],
                refresh        => [ 'boolean', 1 ],
                parent         => ['string'],
                routing        => ['string'],
                version        => ['int'],
                replication    => REPLICATION,

sub analyze {
        {   method  => 'GET',
            cmd     => CMD_Index,
            postfix => '_analyze',
            qs      => {
                text         => ['string'],
                analyzer     => ['string'],
                tokenizer    => ['string'],
                filters      => ['flatten'],
                field        => ['string'],
                format       => [ 'enum', [ 'detailed', 'text' ] ],
                prefer_local => [ 'boolean', undef, 0 ],


sub bulk {
    my $self = shift;
    $self->_bulk( 'bulk', $self->_bulk_params( 'actions', @_ ) );

sub _bulk {
    my ( $self, $method, $params ) = @_;
    my %callbacks;
    my $actions = $params->{actions} || [];

        {   cmd     => CMD_index_then_type,
            method  => 'POST',
            postfix => '_bulk',
            qs      => {
                consistency => CONSISTENCY,
                replication => REPLICATION,
                refresh     => [ 'boolean', 1 ],
                on_conflict => ['coderef'],
                on_error    => ['coderef'],
            data  => { actions => 'actions' },
            fixup => sub {
                die "Cannot specify type without index"
                    if $params->{type} && !$params->{index};
                $_[1]->{data} = $self->_bulk_request($actions);
                $_[1]->{skip} = { actions => [], results => [] }
                    unless ${ $_[1]->{data} };
                $callbacks{$_} = delete $_[1]->{qs}{$_}
                    for qw(on_error on_conflict);
            post_process => sub {
                $self->_bulk_response( \%callbacks, $actions, @_ );

sub bulk_index  { shift->_bulk_action( 'index',  @_ ) }
sub bulk_create { shift->_bulk_action( 'create', @_ ) }
sub bulk_delete { shift->_bulk_action( 'delete', @_ ) }

sub _bulk_action {
    my $self   = shift;
    my $action = shift;
    my $params = $self->_bulk_params( 'docs', @_ );
        = [ map { +{ $action => $_ } } @{ delete $params->{docs} } ];
    return $self->_bulk( "bulk_$action", $params );

sub _bulk_params {
    my $self = shift;
    my $key  = shift;

    return { $key => [], @_ } unless ref $_[0];
        ref $_[0] eq 'ARRAY' ? { $key => $_[0] } : { $key => [], %{ $_[0] } }
        unless @_ > 1;

    carp "The method signature for bulk methods has changed. "
        . "Please check the docs.";

    if ( ref $_[0] eq 'ARRAY' ) {
        my $first = shift;
        my $params = ref $_[0] ? shift : {@_};
        $params->{$key} = $first;
        return $params;
    return { $key => \@_ };

my %Bulk_Actions = (
    'delete' => {
        index        => ONE_OPT,
        type         => ONE_OPT,
        id           => ONE_REQ,
        parent       => ONE_OPT,
        routing      => ONE_OPT,
        version      => ONE_OPT,
        version_type => ONE_OPT,
    'index' => {
        index        => ONE_OPT,
        type         => ONE_OPT,
        id           => ONE_OPT,
        data         => ONE_REQ,
        routing      => ONE_OPT,
        parent       => ONE_OPT,
        percolate    => ONE_OPT,
        timestamp    => ONE_OPT,
        ttl          => ONE_OPT,
        version      => ONE_OPT,
        version_type => ONE_OPT,
$Bulk_Actions{create} = $Bulk_Actions{index};

sub _bulk_request {
    my $self    = shift;
    my $actions = shift;

    my $json      = $self->transport->JSON;
    my $indenting = $json->get_indent;

    my $json_docs = '';
    my $error;
    eval {
        for my $data (@$actions)
            die "'actions' must be an ARRAY ref of HASH refs"
                unless ref $data eq 'HASH';

            my ( $action, $params ) = %$data;
            $action ||= '';
            my $defn = $Bulk_Actions{$action}
                || die "Unknown action '$action'";

            my %metadata;
            $params = {%$params};
            delete @{$params}{qw(_score sort)};
            $params->{data} ||= delete $params->{_source}
                if $params->{_source};

            for my $key ( keys %$defn ) {
                my $val = delete $params->{$key};
                $val = delete $params->{"_$key"} unless defined $val;
                unless ( defined $val ) {
                    next if $defn->{$key} == ONE_OPT;
                    die "Missing required param '$key' for action '$action'";
                $metadata{"_$key"} = $val;
            die "Unknown params for bulk action '$action': "
                . join( ', ', keys %$params )
                if keys %$params;

            my $data = delete $metadata{_data};
            my $request = $json->encode( { $action => \%metadata } ) . "\n";
            if ($data) {
                $data = $json->encode($data) if ref $data eq 'HASH';
                $request .= $data . "\n";
            $json_docs .= $request;
    } or $error = $@ || 'Unknown error';

    die $error if $error;

    return \$json_docs;

sub _bulk_response {
    my $self      = shift;
    my $callbacks = shift;
    my $actions   = shift;
    my $results   = shift;

    my $items = ref($results) eq 'HASH' && $results->{items}
        || $self->throw( 'Request', 'Malformed response to bulk query',
        $results );

    my ( @errors, %matches );
    my ( $on_conflict, $on_error ) = @{$callbacks}{qw(on_conflict on_error)};

    for ( my $i = 0; $i < @$actions; $i++ ) {
        my ( $action, $item ) = ( %{ $items->[$i] } );
        if ( my $match = $item->{matches} ) {
            push @{ $matches{$_} }, $item for @$match;

        my $error = $items->[$i]{$action}{error} or next;
        if (    $on_conflict
            and $error =~ /
                    | DocumentAlreadyExistsException
            $on_conflict->( $action, $actions->[$i]{$action}, $error, $i );
        elsif ($on_error) {
            $on_error->( $action, $actions->[$i]{$action}, $error, $i );
        else {
            push @errors, { action => $actions->[$i], error => $error };

    return {
        actions => $actions,
        results => $items,
        matches => \%matches,
        took    => $results->{took},
        ( @errors ? ( errors => \@errors ) : () )


sub _to_dsl {
    my $self = shift;
    my $ops  = shift;
    my $builder;
    foreach my $clause (@_) {
        while ( my ( $old, $new ) = each %$ops ) {
            my $src = delete $clause->{$old} or next;
            die "Cannot specify $old and $new parameters.\n"
                if $clause->{$new};
            $builder ||= $self->builder;
            my $method = $new eq 'query' ? 'query' : 'filter';
            my $sub_clause = $builder->$method($src) or next;
            $clause->{$new} = $sub_clause->{$method};

sub _data_fixup {
    my $self = shift;
    my $data = shift;
    $self->_to_dsl( { queryb => 'query', filterb => 'filter' }, $data );

    my $facets = $data->{facets} or return;
    die "(facets) must be a HASH ref" unless ref $facets eq 'HASH';
    $facets = $data->{facets} = {%$facets};
    for ( values %$facets ) {
        die "All (facets) must be HASH refs" unless ref $_ eq 'HASH';
        $_ = my $facet = {%$_};
        $self->_to_dsl( {
                queryb        => 'query',
                filterb       => 'filter',
                facet_filterb => 'facet_filter'

sub _query_fixup {
    my $self = shift;
    my $args = shift;
    $self->_to_dsl( { queryb => 'query' }, $args->{data} );
    if ( my $query = delete $args->{data}{query} ) {
        my ( $k, $v ) = %$query;
        $args->{data}{$k} = $v;

sub _warmer_fixup {
    my ( $self, $args ) = @_;
    my $warmers = $args->{data}{warmers} or return;
    $warmers = $args->{data}{warmers} = {%$warmers};
    for ( values %$warmers ) {
        $_ = {%$_};
        my $source = $_->{source} or next;
        $_->{source} = $source = {%$source};


my %Search_Data = (
    explain       => ['explain'],
    facets        => ['facets'],
    fields        => ['fields'],
    filter        => ['filter'],
    filterb       => ['filterb'],
    from          => ['from'],
    highlight     => ['highlight'],
    indices_boost => ['indices_boost'],
    min_score     => ['min_score'],
    script_fields => ['script_fields'],
    size          => ['size'],
    'sort'        => ['sort'],
    track_scores  => ['track_scores'],

my %Search_Defn = (
    cmd     => CMD_index_type,
    postfix => '_search',
    data    => {
        query          => ['query'],
        queryb         => ['queryb'],
        partial_fields => ['partial_fields']
    qs => {
        search_type    => SEARCH_TYPE,
        ignore_indices => IGNORE_INDICES,
        preference     => ['string'],
        routing        => ['flatten'],
        timeout        => ['duration'],
        scroll         => ['duration'],
        stats          => ['flatten'],
        version        => [ 'boolean', 1 ]
    fixup => sub { $_[0]->_data_fixup( $_[1]->{data} ) },

my %SearchQS_Defn = (
    cmd     => CMD_index_type,
    postfix => '_search',
    qs      => {
        q                => ['string'],
        df               => ['string'],
        analyze_wildcard => [ 'boolean', 1 ],
        analyzer         => ['string'],
        default_operator => [ 'enum', [ 'OR', 'AND' ] ],
        explain                  => [ 'boolean', 1 ],
        fields                   => ['flatten'],
        from                     => ['int'],
        ignore_indices           => IGNORE_INDICES,
        lenient                  => [ 'boolean', 1 ],
        lowercase_expanded_terms => [ 'boolean', 1 ],
        min_score                => ['float'],
        preference               => ['string'],
        quote_analyzer           => ['string'],
        quote_field_suffix       => ['string'],
        routing                  => ['flatten'],
        scroll                   => ['duration'],
        search_type              => SEARCH_TYPE,
        size                     => ['int'],
        'sort'                   => ['flatten'],
        stats                    => ['flatten'],
        timeout                  => ['duration'],
        version                  => [ 'boolean', 1 ],

my %Query_Defn = (
    data => {
        query  => ['query'],
        queryb => ['queryb'],
    deprecated => {
        bool               => ['bool'],
        boosting           => ['boosting'],
        constant_score     => ['constant_score'],
        custom_score       => ['custom_score'],
        dis_max            => ['dis_max'],
        field              => ['field'],
        field_masking_span => ['field_masking_span'],
        filtered           => ['filtered'],
        flt                => [ 'flt', 'fuzzy_like_this' ],
        flt_field          => [ 'flt_field', 'fuzzy_like_this_field' ],
        fuzzy              => ['fuzzy'],
        has_child          => ['has_child'],
        ids                => ['ids'],
        match_all          => ['match_all'],
        mlt                => [ 'mlt', 'more_like_this' ],
        mlt_field          => [ 'mlt_field', 'more_like_this_field' ],
        prefix             => ['prefix'],
        query_string       => ['query_string'],
        range              => ['range'],
        span_first         => ['span_first'],
        span_near          => ['span_near'],
        span_not           => ['span_not'],
        span_or            => ['span_or'],
        span_term          => ['span_term'],
        term               => ['term'],
        terms              => [ 'terms', 'in' ],
        text               => ['text'],
        text_phrase        => ['text_phrase'],
        text_phrase_prefix => ['text_phrase_prefix'],
        top_children       => ['top_children'],
        wildcard           => ['wildcard'],

sub search   { shift()->_do_action( 'search',   \%Search_Defn,   @_ ) }
sub searchqs { shift()->_do_action( 'searchqs', \%SearchQS_Defn, @_ ) }

sub msearch {
    my $self    = shift;
    my $params  = $self->parse_params(@_);
    my $queries = $params->{queries} || [];

    my $order;
    if ( ref $queries eq 'HASH' ) {
        $order = {};
        my $i = 0;
        my @queries;
        for ( sort keys %$queries ) {
            $order->{$_} = $i++;
            push @queries, $queries->{$_};
        $queries = \@queries;

        {   cmd     => CMD_index_type,
            method  => 'GET',
            postfix => '_msearch',
            qs      => { search_type => SEARCH_TYPE },
            data    => { queries => 'queries' },
            fixup   => sub {
                my ( $self, $args ) = @_;
                $args->{data} = $self->_msearch_queries($queries);
                $args->{skip} = $order ? {} : [] unless ${ $args->{data} };
            post_process => sub {
                my $responses = shift->{responses};
                return $responses unless $order;
                return {
                    map { $_ => $responses->[ $order->{$_} ] }
                        keys %$order

my %MSearch = (
    ( map { $_ => 'h' } 'index', 'type', keys %{ $Search_Defn{qs} } ),
    ( map { $_ => 'b' } 'version', keys %{ $Search_Defn{data} } )
delete $MSearch{scroll};

sub _msearch_queries {
    my $self    = shift;
    my $queries = shift;

    my $json      = $self->transport->JSON;
    my $indenting = $json->get_indent;

    my $json_docs = '';
    my $error;
    eval {
        for my $query (@$queries)
            die "'queries' must contain HASH refs\n"
                unless ref $query eq 'HASH';

            my %request = ( h => {}, b => {} );
            for ( keys %$query ) {
                my $dest = $MSearch{$_}
                    or die "Unknown param for msearch: $_\n";
                $request{$dest}{$_} = $query->{$_};

            # flatten arrays
            for (qw(index type stats routing)) {
                $request{h}{$_} = join ",", @{ $request{h}{$_} }
                    if ref $request{h}{$_} eq 'ARRAY';
            $self->_data_fixup( $request{b} );
            $json_docs .= $json->encode( $request{h} ) . "\n"
                . $json->encode( $request{b} ) . "\n";
    } or $error = $@ || 'Unknown error';

    die $error if $error;

    return \$json_docs;

sub validate_query {
        {   cmd     => CMD_index_type,
            postfix => '_validate/query',
            data    => {
                query  => ['query'],
                queryb => ['queryb'],
            qs => {
                q              => ['string'],
                explain        => [ 'boolean', 1 ],
                ignore_indices => IGNORE_INDICES,
            fixup => sub {
                my $args = $_[1];
                if ( defined $args->{qs}{q} ) {
                    die "Cannot specify q and query/queryb parameters.\n"
                        if %{ $args->{data} };
                    delete $args->{data};
                else {
                    eval { _query_fixup(@_); 1 } or do {
                        die $@ if $@ =~ /Cannot specify queryb and query/;

sub explain {
        {   cmd     => CMD_INDEX_TYPE_ID,
            postfix => '_explain',
            data    => {
                query  => ['query'],
                queryb => ['queryb'],
            qs => {
                preference               => ['string'],
                routing                  => ['string'],
                q                        => ['string'],
                df                       => ['string'],
                analyzer                 => ['string'],
                analyze_wildcard         => [ 'boolean', 1 ],
                default_operator         => [ 'enum', [ 'OR', 'AND' ] ],
                fields                   => ['flatten'],
                lowercase_expanded_terms => [ 'boolean', undef, 0 ],
                lenient => [ 'boolean', 1 ],
            fixup => sub {
                my $args = $_[1];
                if ( defined $args->{qs}{q} ) {
                    die "Cannot specify q and query/queryb parameters.\n"
                        if %{ $args->{data} };
                    delete $args->{data};
                else {
                    $_[0]->_data_fixup( $args->{data} );

sub scroll {
        {   cmd    => [],
            prefix => '_search/scroll',
            qs     => {
                scroll_id => ['string'],
                scroll    => ['duration'],

sub scrolled_search {
    my $self = shift;
    require ElasticSearch::ScrolledSearch;
    return ElasticSearch::ScrolledSearch->new( $self, @_ );

sub delete_by_query {
        {   %Search_Defn,
            method  => 'DELETE',
            postfix => '_query',
            qs      => {
                consistency => CONSISTENCY,
                replication => REPLICATION,
                routing     => ['flatten'],
            fixup => sub {
                die "Missing required param 'query' or 'queryb'\n"
                    unless %{ $_[1]->{data} };

sub count {
        {   %Search_Defn,
            postfix => '_count',
            qs => {
                routing        => ['flatten'],
                ignore_indices => IGNORE_INDICES,
            fixup => sub {
                delete $_[1]{data} unless %{ $_[1]{data} };

sub mlt {
        {   cmd    => CMD_INDEX_TYPE_ID,
            method => 'GET',
            qs     => {
                mlt_fields         => ['flatten'],
                pct_terms_to_match => [ 'float', 'percent_terms_to_match' ],
                min_term_freq      => ['int'],
                max_query_terms    => ['int'],
                stop_words         => ['flatten'],
                min_doc_freq       => ['int'],
                max_doc_freq       => ['int'],
                min_word_len       => ['int'],
                max_word_len       => ['int'],
                boost_terms        => ['float'],
                routing            => ['flatten'],
                search_indices     => ['flatten'],
                search_from        => ['int'],
                search_size        => ['int'],
                search_type        => SEARCH_TYPE,
                search_types       => ['flatten'],
                search_scroll      => ['string'],
            postfix => '_mlt',
            data    => {
                explain       => ['explain'],
                facets        => ['facets'],
                fields        => ['fields'],
                filter        => ['filter'],
                filterb       => ['filterb'],
                highlight     => ['highlight'],
                indices_boost => ['indices_boost'],
                min_score     => ['min_score'],
                script_fields => ['script_fields'],
                'sort'        => ['sort'],
                track_scores  => ['track_scores'],
            fixup => sub {
                shift()->_to_dsl( { filterb => 'filter' }, $_[0]->{data} );

sub create_percolator {
        {   cmd    => CMD_INDEX_PERC,
            prefix => '_percolator',
            method => 'PUT',
            data   => {
                query  => ['query'],
                queryb => ['queryb'],
                data   => ['data']
            fixup => sub {
                my $self = shift;
                my $args = shift;
                $self->_to_dsl( { queryb => 'query' }, $args->{data} );
                die('create_percolator() requires either the query or queryb param'
                ) unless $args->{data}{query};
                die 'The "data" param cannot include a "query" key'
                    if $args->{data}{data}{query};
                $args->{data} = {
                    query => $args->{data}{query},
                    %{ $args->{data}{data} }

sub delete_percolator {
        {   cmd    => CMD_INDEX_PERC,
            prefix => '_percolator',
            method => 'DELETE',
            qs     => { ignore_missing => [ 'boolean', 1 ], }

sub get_percolator {
        {   cmd          => CMD_INDEX_PERC,
            prefix       => '_percolator',
            method       => 'GET',
            qs           => { ignore_missing => [ 'boolean', 1 ], },
            post_process => sub {
                my $result = shift;
                return $result
                    unless ref $result eq 'HASH';
                return {
                    index      => $result->{_type},
                    percolator => $result->{_id},
                    query      => delete $result->{_source}{query},
                    data       => $result->{_source},

sub percolate {
        {   cmd     => CMD_INDEX_TYPE,
            postfix => '_percolate',
            method  => 'GET',
            qs      => { prefer_local => [ 'boolean', undef, 0 ] },
            data    => { doc => 'doc', query => ['query'] },


sub index_status {
        {   cmd     => CMD_index,
            postfix => '_status',
            qs      => {
                recovery       => [ 'boolean', 1 ],
                snapshot       => [ 'boolean', 1 ],
                ignore_indices => IGNORE_INDICES,

sub index_stats {
        {   cmd     => CMD_index,
            postfix => '_stats',
            qs      => {
                docs     => [ 'boolean', 1, 0 ],
                store    => [ 'boolean', 1, 0 ],
                indexing => [ 'boolean', 1, 0 ],
                get      => [ 'boolean', 1, 0 ],
                search   => [ 'boolean', 1, 0 ],
                clear    => [ 'boolean', 1 ],
                all      => [ 'boolean', 1 ],
                merge    => [ 'boolean', 1 ],
                flush    => [ 'boolean', 1 ],
                refresh  => [ 'boolean', 1 ],
                types    => ['flatten'],
                groups   => ['flatten'],
                level => [ 'enum', [qw(shards)] ],
                ignore_indices => IGNORE_INDICES,

sub index_segments {
        {   cmd     => CMD_index,
            postfix => '_segments',
            qs      => { ignore_indices => IGNORE_INDICES, }

sub create_index {
        {   method  => 'PUT',
            cmd     => CMD_INDEX,
            postfix => '',
            data    => {
                settings => ['settings'],
                mappings => ['mappings'],
                warmers  => ['warmers'],
            fixup => \&_warmer_fixup

sub delete_index {
        {   method  => 'DELETE',
            cmd     => CMD_INDICES,
            qs      => { ignore_missing => [ 'boolean', 1 ], },
            postfix => ''

sub index_exists {
        {   method => 'HEAD',
            cmd    => CMD_index,
            fixup  => sub { $_[1]->{qs}{ignore_missing} = 1 }

sub open_index {
        {   method  => 'POST',
            cmd     => CMD_INDEX,
            postfix => '_open'

sub close_index {
        {   method  => 'POST',
            cmd     => CMD_INDEX,
            postfix => '_close'

sub aliases {
    my ( $self, $params ) = parse_params(@_);
    my $actions = $params->{actions};
    if ( defined $actions && ref $actions ne 'ARRAY' ) {
        $params->{actions} = [$actions];

        {   prefix => '_aliases',
            method => 'POST',
            cmd    => [],
            data   => { actions => 'actions' },
            fixup  => sub {
                my $self    = shift;
                my $args    = shift;
                my @actions = @{ $args->{data}{actions} };
                for (@actions) {
                    my ( $key, $value ) = %$_;
                    $value = {%$value};
                    $self->_to_dsl( { filterb => 'filter' }, $value );
                    $_ = { $key => $value };
                $args->{data}{actions} = \@actions;

sub get_aliases {
        {   postfix => '_aliases',
            cmd     => CMD_index,
            qs      => { ignore_missing => [ 'boolean', 1 ] },

sub create_warmer {
        {   method  => 'PUT',
            cmd     => CMD_index_type,
            postfix => '_warmer/',
            data    => {
                warmer        => 'warmer',
                facets        => ['facets'],
                filter        => ['filter'],
                filterb       => ['filterb'],
                script_fields => ['script_fields'],
                'sort'        => ['sort'],
                query         => ['query'],
                queryb        => ['queryb'],
            fixup => sub {
                my ( $self, $args ) = @_;
                $args->{cmd} .= delete $args->{data}{warmer};
                $self->_data_fixup( $args->{data} );

sub warmer {
    my ( $self, $params ) = parse_params(@_);
    $params->{warmer} = '*'
        unless defined $params->{warmer} and length $params->{warmer};

        {   method  => 'GET',
            cmd     => CMD_indices,
            postfix => '_warmer/',
            data    => { warmer => ['warmer'] },
            qs      => { ignore_missing => [ 'boolean', 1 ] },
            fixup   => sub {
                my ( $self, $args ) = @_;
                $args->{cmd} .= delete $args->{data}{warmer};

sub delete_warmer {
        {   method  => 'DELETE',
            cmd     => CMD_INDICES,
            postfix => '_warmer/',
            data    => { warmer => 'warmer' },
            qs      => { ignore_missing => [ 'boolean', 1 ] },
            fixup   => sub {
                my ( $self, $args ) = @_;
                $args->{cmd} .= delete $args->{data}{warmer};

sub create_index_template {
        {   method => 'PUT',
            cmd    => CMD_NAME,
            prefix => '_template',
            data   => {
                template => 'template',
                settings => ['settings'],
                mappings => ['mappings'],
                warmers  => ['warmers'],
                order    => ['order'],
            fixup => \&_warmer_fixup

sub delete_index_template {
        {   method => 'DELETE',
            cmd    => CMD_NAME,
            prefix => '_template',
            qs     => { ignore_missing => [ 'boolean', 1 ] },

sub index_template {
        {   method => 'GET',
            cmd    => CMD_NAME,
            prefix => '_template',

sub flush_index {
        {   method  => 'POST',
            cmd     => CMD_index,
            postfix => '_flush',
            qs      => {
                refresh        => [ 'boolean', 1 ],
                full           => [ 'boolean', 1 ],
                ignore_indices => IGNORE_INDICES,

sub refresh_index {
        {   method  => 'POST',
            cmd     => CMD_index,
            postfix => '_refresh',
            qs      => { ignore_indices => IGNORE_INDICES, }

sub optimize_index {
        {   method  => 'POST',
            cmd     => CMD_index,
            postfix => '_optimize',
            qs      => {
                only_deletes =>
                    [ 'boolean', [ only_expunge_deletes => 'true' ] ],
                max_num_segments => ['int'],
                refresh          => [ 'boolean', undef, 0 ],
                flush            => [ 'boolean', undef, 0 ],
                wait_for_merge   => [ 'boolean', undef, 0 ],
                ignore_indices   => IGNORE_INDICES,

sub snapshot_index {
        {   method  => 'POST',
            cmd     => CMD_index,
            postfix => '_gateway/snapshot',
            qs      => { ignore_indices => IGNORE_INDICES, }

sub gateway_snapshot {
        {   method  => 'POST',
            cmd     => CMD_index,
            postfix => '_gateway/snapshot'

sub put_mapping {
    my ( $self, $params ) = parse_params(@_);
    my %defn = (
        data       => { mapping => 'mapping' },
        deprecated => {
            dynamic           => ['dynamic'],
            dynamic_templates => ['dynamic_templates'],
            properties        => ['properties'],
            _all              => ['_all'],
            _analyzer         => ['_analyzer'],
            _boost            => ['_boost'],
            _id               => ['_id'],
            _index            => ['_index'],
            _meta             => ['_meta'],
            _parent           => ['_parent'],
            _routing          => ['_routing'],
            _source           => ['_source'],

    $defn{deprecated}{mapping} = undef
        if !$params->{mapping} && grep { exists $params->{$_} }
            keys %{ $defn{deprecated} };

    my $type = $params->{type} || $self->{_default}{type};
        {   method  => 'PUT',
            cmd     => CMD_index_TYPE,
            postfix => '_mapping',
            qs      => { ignore_conflicts => [ 'boolean', 1 ] },
            fixup => sub {
                my $args = $_[1];
                my $mapping = $args->{data}{mapping} || $args->{data};
                $args->{data} = { $type => $mapping };

sub delete_mapping {
    my ( $self, $params ) = parse_params(@_);

        {   method => 'DELETE',
            cmd    => CMD_INDICES_TYPE,
            qs     => { ignore_missing => [ 'boolean', 1 ], }

sub mapping {
    my ( $self, $params ) = parse_params(@_);

        {   method  => 'GET',
            cmd     => CMD_index_type,
            postfix => '_mapping',
            qs      => { ignore_missing => [ 'boolean', 1 ], }

sub type_exists {
        {   method => 'HEAD',
            cmd    => CMD_index_types,
            qs     => { ignore_indices => IGNORE_INDICES, },
            fixup  => sub { $_[1]->{qs}{ignore_missing} = 1 }

sub clear_cache {
        {   method  => 'POST',
            cmd     => CMD_index,
            postfix => '_cache/clear',
            qs      => {
                id             => [ 'boolean', 1 ],
                filter         => [ 'boolean', 1 ],
                field_data     => [ 'boolean', 1 ],
                bloom          => [ 'boolean', 1 ],
                fields         => ['flatten'],
                ignore_indices => IGNORE_INDICES,

sub index_settings {
    my ( $self, $params ) = parse_params(@_);

        {   method  => 'GET',
            cmd     => CMD_index,
            postfix => '_settings'

sub update_index_settings {
    my ( $self, $params ) = parse_params(@_);

        {   method  => 'PUT',
            cmd     => CMD_index,
            postfix => '_settings',
            data    => { index => 'settings' }


sub create_river {
    my ( $self, $params ) = parse_params(@_);
    my $type = $params->{type}
        or $self->throw( 'Param', 'No river type specified', $params );
    my $data = { type => 'type', index => ['index'], $type => [$type] };
        {   method  => 'PUT',
            prefix  => '_river',
            cmd     => CMD_RIVER,
            postfix => '_meta',
            data    => $data

sub get_river {
    my ( $self, $params ) = parse_params(@_);
        {   method  => 'GET',
            prefix  => '_river',
            cmd     => CMD_RIVER,
            postfix => '_meta',
            qs      => { ignore_missing => [ 'boolean', 1 ] }

sub delete_river {
    my ( $self, $params ) = parse_params(@_);
        {   method => 'DELETE',
            prefix => '_river',
            cmd    => CMD_RIVER,

sub river_status {
    my ( $self, $params ) = parse_params(@_);
        {   method  => 'GET',
            prefix  => '_river',
            cmd     => CMD_RIVER,
            postfix => '_status',
            qs      => { ignore_missing => [ 'boolean', 1 ] }


sub cluster_state {
        {   prefix => '_cluster/state',
            qs     => {
                filter_blocks        => [ 'boolean', 1 ],
                filter_nodes         => [ 'boolean', 1 ],
                filter_metadata      => [ 'boolean', 1 ],
                filter_routing_table => [ 'boolean', 1 ],
                filter_indices       => ['flatten'],


sub current_server_version {
        {   cmd          => CMD_NONE,
            prefix       => '',
            post_process => sub {
                return shift->{version};

sub nodes {
        {   prefix => '_cluster/nodes',
            cmd    => CMD_nodes,
            qs     => {
                settings    => [ 'boolean', 1 ],
                http        => [ 'boolean', 1 ],
                jvm         => [ 'boolean', 1 ],
                network     => [ 'boolean', 1 ],
                os          => [ 'boolean', 1 ],
                process     => [ 'boolean', 1 ],
                thread_pool => [ 'boolean', 1 ],
                transport   => [ 'boolean', 1 ],

sub nodes_stats {
        {   prefix  => '_cluster/nodes',
            postfix => 'stats',
            cmd     => CMD_nodes,
            qs      => {
                indices     => [ 'boolean', 1, 0 ],
                clear       => [ 'boolean', 1 ],
                all         => [ 'boolean', 1 ],
                fs          => [ 'boolean', 1 ],
                http        => [ 'boolean', 1 ],
                jvm         => [ 'boolean', 1 ],
                network     => [ 'boolean', 1 ],
                os          => [ 'boolean', 1 ],
                process     => [ 'boolean', 1 ],
                thread_pool => [ 'boolean', 1 ],
                transport   => [ 'boolean', 1 ],

sub shutdown {
        {   method  => 'POST',
            prefix  => '_cluster/nodes',
            cmd     => CMD_nodes,
            postfix => '_shutdown',
            qs      => { delay => ['duration'] }

sub restart {
        {   method  => 'POST',
            prefix  => '_cluster/nodes',
            cmd     => CMD_nodes,
            postfix => '_restart',
            qs      => { delay => ['duration'] }

sub cluster_health {
        {   prefix => '_cluster/health',
            cmd    => CMD_index,
            qs     => {
                level           => [ 'enum', [qw(cluster indices shards)] ],
                wait_for_status => [ 'enum', [qw(green yellow red)] ],
                wait_for_relocating_shards => ['int'],
                wait_for_nodes             => ['string'],
                timeout                    => ['duration']

sub cluster_settings {
    my ( $self, $params ) = parse_params(@_);

        {   method  => 'GET',
            cmd     => CMD_NONE,
            postfix => '_cluster/settings'

sub update_cluster_settings {
    my ( $self, $params ) = parse_params(@_);

        {   method  => 'PUT',
            cmd     => CMD_NONE,
            postfix => '_cluster/settings',
            data    => {
                persistent => ['persistent'],
                transient  => ['transient']

sub cluster_reroute {
    my ( $self, $params ) = parse_params(@_);
    $params->{commands} = [ $params->{commands} ]
        if $params->{commands} and ref( $params->{commands} ) ne 'ARRAY';

        {   prefix => '_cluster/reroute',
            cmd    => [],
            method => 'POST',
            data   => { commands => ['commands'] },
            qs     => { dry_run => [ 'boolean', 1 ], },


sub camel_case {
    my $self = shift;
    if (@_) {
        if ( shift() ) {
            $self->{_base_qs}{case} = 'camelCase';
        else {
            delete $self->{_base_qs}{case};
    return $self->{_base_qs}{case} ? 1 : 0;

sub error_trace {
    my $self = shift;
    if (@_) {
        if ( shift() ) {
            $self->{_base_qs}{error_trace} = 'true';
        else {
            delete $self->{_base_qs}{error_trace};
    return $self->{_base_qs}{error_trace} ? 1 : 0;


sub _do_action {
    my $self            = shift;
    my $action          = shift || '';
    my $defn            = shift || {};
    my $original_params = $self->parse_params(@_);

    my $error;

    my $params = {%$original_params};
    my %args = ( method => $defn->{method} || 'GET' );
    $args{as_json} = delete $params->{as_json};

    eval {
            = $self->_build_cmd( $params, @{$defn}{qw(prefix cmd postfix)} );
        $args{qs} = $self->_build_qs( $params, $defn->{qs} );
            = $self->_build_data( $params, @{$defn}{ 'data', 'deprecated' } );
        if ( my $fixup = $defn->{fixup} ) {
            $fixup->( $self, \%args );
        die "Unknown parameters: " . join( ', ', keys %$params ) . "\n"
            if keys %$params;
    } or $error = $@ || 'Unknown error';

    $args{post_process} = $defn->{post_process};
    if ($error) {
        die $error if ref $error;
            $error . $self->_usage( $action, $defn ),
            { params => $original_params }
    if ( my $skip = $args{skip} ) {
        return $self->transport->skip_request( $args{as_json}, $skip );
    return $self->request( \%args );

sub _usage {
    my $self   = shift;
    my $action = shift;
    my $defn   = shift;

    my $usage = "Usage for '$action()':\n";
    my @cmd = @{ $defn->{cmd} || [] };
    while ( my $key = shift @cmd ) {
        my $type = shift @cmd;
        my $arg_format
            = $type == ONE_REQ ? "\$$key"
            : $type == ONE_OPT ? "\$$key"
            :                    "\$$key | [\$${key}_1,\$${key}_n]";

        my $required
            = ( $type == ONE_REQ or $type == MULTI_REQ )
            ? 'required'
            : 'optional';
        $usage .= sprintf( "  - %-26s =>  %-45s # %s\n",
            $key, $arg_format, $required );

    if ( my $data = $defn->{data} ) {
        my @keys = sort { $a->[0] cmp $b->[0] }
            map { ref $_ ? [ $_->[0], 'optional' ] : [ $_, 'required' ] }
            values %$data;

        for (@keys) {
            $usage .= sprintf(
                "  - %-26s =>  %-45s # %s\n",
                $_->[0], '{' . $_->[0] . '}',

    if ( my $qs = $defn->{qs} ) {
        for ( sort keys %$qs ) {
            my $arg_format = $QS_Format{ $qs->{$_}[0] };
            my @extra;
            $arg_format = $arg_format->( $_, $qs->{$_} )
                if ref $arg_format;
            if ( length($arg_format) > 45 ) {
                ( $arg_format, @extra ) = split / [|] /, $arg_format;
            $usage .= sprintf( "  - %-26s =>  %-45s # optional\n", $_,
                $arg_format );
            $usage .= ( ' ' x 34 ) . " | $_\n" for @extra;

    return $usage;

sub _build_qs {
    my $self   = shift;
    my $params = shift;
    my $defn   = shift || {};
    my %qs     = %{ $self->{_base_qs} };
    foreach my $key ( keys %$defn ) {
        my ( $format_name, @args ) = @{ $defn->{$key} || [] };
        $format_name ||= '';

        next unless exists $params->{$key};

        my $formatter = $QS_Formatter{$format_name}
            or die "Unknown QS formatter '$format_name'";

        my $val = $formatter->( $key, delete $params->{$key}, @args )
            or next;
        $qs{ $val->[0] } = $val->[1];
    return \%qs;

sub _build_data {
    my $self   = shift;
    my $params = shift;
    my $defn   = shift or return;

    if ( my $deprecated = shift ) {
        $defn = { %$defn, %$deprecated };

    my %data;
KEY: while ( my ( $key, $source ) = each %$defn ) {
        next unless defined $source;
        if ( ref $source eq 'ARRAY' ) {
            foreach (@$source) {
                my $val = delete $params->{$_};
                next unless defined $val;
                $data{$key} = $val;
                next KEY;
        else {
            $data{$key} = delete $params->{$source}
                or die "Missing required param '$source'\n";
    return \%data;

sub _build_cmd {
    my $self   = shift;
    my $params = shift;
    my ( $prefix, $defn, $postfix ) = @_;

    my @defn = ( @{ $defn || [] } );
    my @cmd;
    while (@defn) {
        my $key  = shift @defn;
        my $type = shift @defn;

        my $val
            = exists $params->{$key}
            ? delete $params->{$key}
            : $self->{_default}{$key};

        $val = '' unless defined $val;

        if ( ref $val eq 'ARRAY' ) {
            die "'$key' must be a single value\n"
                if $type <= ONE_ALL;
            $val = join ',', @$val;
        unless ( length $val ) {
            next if $type == ONE_OPT || $type == MULTI_BLANK;
            die "Param '$key' is required\n"
                if $type == ONE_REQ || $type == MULTI_REQ;
            $val = '_all';
        push @cmd, uri_escape($val);

    return join '/', '', grep {defined} ( $prefix, @cmd, $postfix );