The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
/* Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define C_LUCY_POLYREADER
#include "Lucy/Util/ToolSet.h"

#include "Clownfish/HashIterator.h"
#include "Lucy/Index/PolyReader.h"
#include "Lucy/Document/HitDoc.h"
#include "Lucy/Index/DeletionsReader.h"
#include "Lucy/Index/IndexManager.h"
#include "Lucy/Index/Segment.h"
#include "Lucy/Index/SegReader.h"
#include "Lucy/Index/Snapshot.h"
#include "Lucy/Plan/FieldType.h"
#include "Lucy/Plan/Schema.h"
#include "Lucy/Store/Folder.h"
#include "Lucy/Store/FSFolder.h"
#include "Lucy/Store/Lock.h"
#include "Lucy/Util/Json.h"
#include "Lucy/Util/Freezer.h"
#include "Lucy/Util/IndexFileNames.h"

// Obtain/release read locks and commit locks.
static bool
S_obtain_read_lock(PolyReader *self, String *snapshot_filename);
static bool 
S_obtain_deletion_lock(PolyReader *self);
static void
S_release_read_lock(PolyReader *self);
static void
S_release_deletion_lock(PolyReader *self);

// Try to open all SegReaders.
struct try_open_elements_context {
    PolyReader *self;
    Vector     *seg_readers;
};
void
S_try_open_elements(void *context);

// Try to read a Snapshot file.
struct try_read_snapshot_context {
    Snapshot *snapshot;
    Folder   *folder;
    String   *path;
};
static void
S_try_read_snapshot(void *context);

// Try to open an individual SegReader.
struct try_open_segreader_context {
    Schema    *schema;
    Folder    *folder;
    Snapshot  *snapshot;
    Vector    *segments;
    int32_t    seg_tick;
    SegReader *result;
};
static void
S_try_open_segreader(void *context);

static Folder*
S_derive_folder(Obj *index);

PolyReader*
PolyReader_new(Schema *schema, Folder *folder, Snapshot *snapshot,
               IndexManager *manager, Vector *sub_readers) {
    PolyReader *self = (PolyReader*)Class_Make_Obj(POLYREADER);
    return PolyReader_init(self, schema, folder, snapshot, manager,
                           sub_readers);
}

PolyReader*
PolyReader_open(Obj *index, Snapshot *snapshot, IndexManager *manager) {
    PolyReader *self = (PolyReader*)Class_Make_Obj(POLYREADER);
    return PolyReader_do_open(self, index, snapshot, manager);
}

static Obj*
S_first_non_null(Vector *array) {
    for (size_t i = 0, max = Vec_Get_Size(array); i < max; i++) {
        Obj *thing = Vec_Fetch(array, i);
        if (thing) { return thing; }
    }
    return NULL;
}

static void
S_init_sub_readers(PolyReader *self, Vector *sub_readers) {
    PolyReaderIVARS *const ivars = PolyReader_IVARS(self);
    size_t   num_sub_readers = Vec_Get_Size(sub_readers);
    int32_t *starts = (int32_t*)MALLOCATE(num_sub_readers * sizeof(int32_t));
    Hash  *data_readers = Hash_new(0);

    DECREF(ivars->sub_readers);
    DECREF(ivars->offsets);
    ivars->sub_readers       = (Vector*)INCREF(sub_readers);

    // Accumulate doc_max, subreader start offsets, and DataReaders.
    ivars->doc_max = 0;
    for (size_t i = 0; i < num_sub_readers; i++) {
        SegReader *seg_reader = (SegReader*)Vec_Fetch(sub_readers, i);
        Hash *components = SegReader_Get_Components(seg_reader);
        starts[i] = ivars->doc_max;
        ivars->doc_max += SegReader_Doc_Max(seg_reader);
        HashIterator *iter = HashIter_new(components);
        while (HashIter_Next(iter)) {
            String     *api       = HashIter_Get_Key(iter);
            DataReader *component = (DataReader*)HashIter_Get_Value(iter);
            Vector *readers = (Vector*)Hash_Fetch(data_readers, api);
            if (!readers) {
                readers = Vec_new(num_sub_readers);
                Hash_Store(data_readers, api, (Obj*)readers);
            }
            Vec_Store(readers, i, INCREF(component));
        }
        DECREF(iter);
    }
    ivars->offsets = I32Arr_new_steal(starts, num_sub_readers);

    HashIterator *iter = HashIter_new(data_readers);
    while (HashIter_Next(iter)) {
        String *api     = HashIter_Get_Key(iter);
        Vector *readers = (Vector*)HashIter_Get_Value(iter);
        DataReader *datareader
            = (DataReader*)CERTIFY(S_first_non_null(readers), DATAREADER);
        DataReader *aggregator
            = DataReader_Aggregator(datareader, readers, ivars->offsets);
        if (aggregator) {
            CERTIFY(aggregator, DATAREADER);
            Hash_Store(ivars->components, api, (Obj*)aggregator);
        }
    }
    DECREF(iter);
    DECREF(data_readers);

    DeletionsReader *del_reader
        = (DeletionsReader*)Hash_Fetch(
              ivars->components, Class_Get_Name(DELETIONSREADER));
    ivars->del_count = del_reader ? DelReader_Del_Count(del_reader) : 0;
}

PolyReader*
PolyReader_init(PolyReader *self, Schema *schema, Folder *folder,
                Snapshot *snapshot, IndexManager *manager,
                Vector *sub_readers) {
    PolyReaderIVARS *const ivars = PolyReader_IVARS(self);
    ivars->doc_max    = 0;
    ivars->del_count  = 0;

    if (sub_readers) {
        size_t num_segs = Vec_Get_Size(sub_readers);
        Vector *segments = Vec_new(num_segs);
        for (size_t i = 0; i < num_segs; i++) {
            SegReader *seg_reader
                = (SegReader*)CERTIFY(Vec_Fetch(sub_readers, i), SEGREADER);
            Vec_Push(segments, INCREF(SegReader_Get_Segment(seg_reader)));
        }
        IxReader_init((IndexReader*)self, schema, folder, snapshot,
                      segments, -1, manager);
        DECREF(segments);
        S_init_sub_readers(self, sub_readers);
    }
    else {
        IxReader_init((IndexReader*)self, schema, folder, snapshot,
                      NULL, -1, manager);
        ivars->sub_readers = Vec_new(0);
        ivars->offsets = I32Arr_new_steal(NULL, 0);
    }

    return self;
}

void
PolyReader_Close_IMP(PolyReader *self) {
    PolyReaderIVARS *const ivars = PolyReader_IVARS(self);
    PolyReader_Close_t super_close
        = SUPER_METHOD_PTR(POLYREADER, LUCY_PolyReader_Close);
    for (size_t i = 0, max = Vec_Get_Size(ivars->sub_readers); i < max; i++) {
        SegReader *seg_reader = (SegReader*)Vec_Fetch(ivars->sub_readers, i);
        SegReader_Close(seg_reader);
    }
    super_close(self);
}

void
PolyReader_Destroy_IMP(PolyReader *self) {
    PolyReaderIVARS *const ivars = PolyReader_IVARS(self);
    DECREF(ivars->sub_readers);
    DECREF(ivars->offsets);
    SUPER_DESTROY(self, POLYREADER);
}

static void
S_try_read_snapshot(void *context) {
    struct try_read_snapshot_context *args
        = (struct try_read_snapshot_context*)context;
    Snapshot_Read_File(args->snapshot, args->folder, args->path);
}

static void
S_try_open_segreader(void *context) {
    struct try_open_segreader_context *args
        = (struct try_open_segreader_context*)context;
    args->result = SegReader_new(args->schema, args->folder, args->snapshot,
                                 args->segments, args->seg_tick);
}

void
S_try_open_elements(void *context) {
    struct try_open_elements_context *args
        = (struct try_open_elements_context*)context;
    PolyReader *self              = args->self;
    PolyReaderIVARS *const ivars  = PolyReader_IVARS(self);
    Vector     *files             = Snapshot_List(ivars->snapshot);
    Folder     *folder            = PolyReader_Get_Folder(self);
    uint32_t    num_segs          = 0;
    uint64_t    latest_schema_gen = 0;
    String     *schema_file       = NULL;

    // Find schema file, count segments.
    for (size_t i = 0, max = Vec_Get_Size(files); i < max; i++) {
        String *entry = (String*)Vec_Fetch(files, i);

        if (Seg_valid_seg_name(entry)) {
            num_segs++;
        }
        else if (Str_Starts_With_Utf8(entry, "schema_", 7)
                 && Str_Ends_With_Utf8(entry, ".json", 5)
                ) {
            uint64_t gen = IxFileNames_extract_gen(entry);
            if (gen > latest_schema_gen) {
                latest_schema_gen = gen;
                schema_file       = entry;
            }
        }
    }

    // Read Schema.
    if (!schema_file) {
        DECREF(files);
        THROW(ERR, "Can't find a schema file.");
    }
    else {
        Obj *dump = Json_slurp_json(folder, schema_file);
        if (dump) { // read file successfully
            DECREF(ivars->schema);
            ivars->schema = (Schema*)CERTIFY(Freezer_load(dump), SCHEMA);
            DECREF(dump);
            schema_file = NULL;
        }
        else {
            String *mess = MAKE_MESS("Failed to parse %o", schema_file);
            DECREF(files);
            Err_throw_mess(ERR, mess);
        }
    }

    Vector *segments = Vec_new(num_segs);
    for (size_t i = 0, max = Vec_Get_Size(files); i < max; i++) {
        String *entry = (String*)Vec_Fetch(files, i);

        // Create a Segment for each segmeta.
        if (Seg_valid_seg_name(entry)) {
            int64_t seg_num = (int64_t)IxFileNames_extract_gen(entry);
            Segment *segment = Seg_new(seg_num);

            // Bail if reading the file fails (probably because it's been
            // deleted and a new snapshot file has been written so we need to
            // retry).
            if (Seg_Read_File(segment, folder)) {
                Vec_Push(segments, (Obj*)segment);
            }
            else {
                String *mess = MAKE_MESS("Failed to read %o", entry);
                DECREF(segment);
                DECREF(segments);
                DECREF(files);
                Err_throw_mess(ERR, mess);
            }
        }
    }

    // Sort the segments by age.
    Vec_Sort(segments);

    // Open individual SegReaders.
    struct try_open_segreader_context seg_context;
    seg_context.schema   = PolyReader_Get_Schema(self);
    seg_context.folder   = folder;
    seg_context.snapshot = PolyReader_Get_Snapshot(self);
    seg_context.segments = segments;
    seg_context.result   = NULL;
    args->seg_readers = Vec_new(num_segs);
    Err *error = NULL;
    for (int32_t seg_tick = 0; seg_tick < (int32_t)num_segs; seg_tick++) {
        seg_context.seg_tick = seg_tick;
        error = Err_trap(S_try_open_segreader, &seg_context);
        if (error) {
            break;
        }
        Vec_Push(args->seg_readers, (Obj*)seg_context.result);
        seg_context.result = NULL;
    }

    DECREF(segments);
    DECREF(files);
    if (error) {
        DECREF(args->seg_readers);
        args->seg_readers = NULL;
        RETHROW(error);
    }
}

// For test suite.
String* PolyReader_race_condition_debug1 = NULL;
int32_t  PolyReader_debug1_num_passes     = 0;

PolyReader*
PolyReader_do_open(PolyReader *self, Obj *index, Snapshot *snapshot,
                   IndexManager *manager) {
    PolyReaderIVARS *const ivars = PolyReader_IVARS(self);
    Folder   *folder   = S_derive_folder(index);
    uint64_t  last_gen = 0;

    PolyReader_init(self, NULL, folder, snapshot, manager, NULL);
    DECREF(folder);

    if (manager) { 
        if (!S_obtain_deletion_lock(self)) {
            DECREF(self);
            THROW(LOCKERR, "Couldn't get deletion lock");
        }
    }

    while (1) {
        String *target_snap_file;

        // If a Snapshot was supplied, use its file.
        if (snapshot) {
            target_snap_file = Snapshot_Get_Path(snapshot);
            if (!target_snap_file) {
                THROW(ERR, "Supplied snapshot objects must not be empty");
            }
            else {
                target_snap_file = (String*)INCREF(target_snap_file);
            }
        }
        else {
            // Otherwise, pick the most recent snap file.
            target_snap_file = IxFileNames_latest_snapshot(folder);

            // No snap file?  Looks like the index is empty.  We can stop now
            // and return NULL.
            if (!target_snap_file) { break; }
        }

        // Derive "generation" of this snapshot file from its name.
        uint64_t gen = IxFileNames_extract_gen(target_snap_file);

        // Get a read lock on the most recent snapshot file if indicated.
        if (manager) {
            if (!S_obtain_read_lock(self, target_snap_file)) {
                DECREF(self);
                THROW(LOCKERR, "Couldn't get read lock for %o",
                      target_snap_file);
            }
        }

        // Testing only.
        if (PolyReader_race_condition_debug1) {
            String *temp = SSTR_WRAP_C("temp");
            if (Folder_Exists(folder, temp)) {
                bool success = Folder_Rename(folder, temp,
                                             PolyReader_race_condition_debug1);
                if (!success) { RETHROW(INCREF(Err_get_error())); }
            }
            PolyReader_debug1_num_passes++;
        }

        // If a Snapshot object was passed in, the file has already been read.
        // If that's not the case, we must read the file we just picked.
        if (!snapshot) {
            struct try_read_snapshot_context context;
            context.snapshot = ivars->snapshot;
            context.folder   = folder;
            context.path     = target_snap_file;
            Err *error = Err_trap(S_try_read_snapshot, &context);

            if (error) {
                S_release_read_lock(self);
                DECREF(target_snap_file);
                if (last_gen < gen) { // Index updated, so try again.
                    DECREF(error);
                    last_gen = gen;
                    continue;
                }
                else { // Real error.
                    if (manager) { S_release_deletion_lock(self); }
                    RETHROW(error);
                }
            }
        }

        /* It's possible, though unlikely, for an Indexer to delete files
         * out from underneath us after the snapshot file is read but before
         * we've got SegReaders holding open all the required files.  If we
         * failed to open something, see if we can find a newer snapshot file.
         * If we can, then the exception was due to the race condition.  If
         * not, we have a real exception, so throw an error. */
        struct try_open_elements_context context;
        context.self        = self;
        context.seg_readers = NULL;
        Err *error = Err_trap(S_try_open_elements, &context);
        if (error) {
            S_release_read_lock(self);
            DECREF(target_snap_file);
            if (last_gen < gen) { // Index updated, so try again.
                DECREF(error);
                last_gen = gen;
            }
            else { // Real error.
                if (manager) { S_release_deletion_lock(self); }
                RETHROW(error);
            }
        }
        else { // Succeeded.
            S_init_sub_readers(self, (Vector*)context.seg_readers);
            DECREF(context.seg_readers);
            DECREF(target_snap_file);
            break;
        }
    }

    if (manager) { S_release_deletion_lock(self); }

    return self;
}

static Folder*
S_derive_folder(Obj *index) {
    Folder *folder = NULL;
    if (Obj_is_a(index, FOLDER)) {
        folder = (Folder*)INCREF(index);
    }
    else if (Obj_is_a(index, STRING)) {
        folder = (Folder*)FSFolder_new((String*)index);
    }
    else {
        THROW(ERR, "Invalid type for 'index': %o", Obj_get_class_name(index));
    }
    return folder;
}

static bool 
S_obtain_deletion_lock(PolyReader *self) {
    PolyReaderIVARS *const ivars = PolyReader_IVARS(self);
    ivars->deletion_lock = IxManager_Make_Deletion_Lock(ivars->manager);
    Lock_Clear_Stale(ivars->deletion_lock);
    if (!Lock_Obtain(ivars->deletion_lock)) {
        DECREF(ivars->deletion_lock);
        ivars->deletion_lock = NULL;
        return false;
    }
    return true;
}

static bool
S_obtain_read_lock(PolyReader *self, String *snapshot_file_name) {
    PolyReaderIVARS *const ivars = PolyReader_IVARS(self);
    ivars->read_lock = IxManager_Make_Snapshot_Read_Lock(ivars->manager,
                                                         snapshot_file_name);

    Lock_Clear_Stale(ivars->read_lock);
    if (!Lock_Obtain(ivars->read_lock)) {
        DECREF(ivars->read_lock);
        ivars->read_lock = NULL;
        return false;
    }
    return true;
}

static void
S_release_read_lock(PolyReader *self) {
    PolyReaderIVARS *const ivars = PolyReader_IVARS(self);
    if (ivars->read_lock) {
        Lock_Release(ivars->read_lock);
        DECREF(ivars->read_lock);
        ivars->read_lock = NULL;
    }
}

static void
S_release_deletion_lock(PolyReader *self) {
    PolyReaderIVARS *const ivars = PolyReader_IVARS(self);
    if (ivars->deletion_lock) {
        Lock_Release(ivars->deletion_lock);
        DECREF(ivars->deletion_lock);
        ivars->deletion_lock = NULL;
    }
}

int32_t
PolyReader_Doc_Max_IMP(PolyReader *self) {
    return PolyReader_IVARS(self)->doc_max;
}

int32_t
PolyReader_Doc_Count_IMP(PolyReader *self) {
    PolyReaderIVARS *const ivars = PolyReader_IVARS(self);
    return ivars->doc_max - ivars->del_count;
}

int32_t
PolyReader_Del_Count_IMP(PolyReader *self) {
    return PolyReader_IVARS(self)->del_count;
}

I32Array*
PolyReader_Offsets_IMP(PolyReader *self) {
    PolyReaderIVARS *const ivars = PolyReader_IVARS(self);
    return (I32Array*)INCREF(ivars->offsets);
}

Vector*
PolyReader_Seg_Readers_IMP(PolyReader *self) {
    PolyReaderIVARS *const ivars = PolyReader_IVARS(self);
    return (Vector*)Vec_Clone(ivars->sub_readers);
}

Vector*
PolyReader_Get_Seg_Readers_IMP(PolyReader *self) {
    return PolyReader_IVARS(self)->sub_readers;
}

uint32_t
PolyReader_sub_tick(I32Array *offsets, int32_t doc_id) {
    size_t size = I32Arr_Get_Size(offsets);
    if (size == 0) {
        return 0;
    }
    else if (size > INT32_MAX) {
        THROW(ERR, "Unexpectedly large offsets array: %u64", (uint64_t)size);
    }

    int32_t lo = -1;
    int32_t hi = (int32_t)size;
    while (hi - lo > 1) {
        int32_t mid = lo + ((hi - lo) / 2);
        int32_t offset = I32Arr_Get(offsets, (size_t)mid);
        if (doc_id <= offset) {
            hi = mid;
        }
        else {
            lo = mid;
        }
    }
    if (hi == (int32_t)size) {
        hi--;
    }

    while (hi > 0) {
        int32_t offset = I32Arr_Get(offsets, (size_t)hi);
        if (doc_id <= offset) {
            hi--;
        }
        else {
            break;
        }
    }

    if (hi < 0) { // Paranoid, but easier than proving correctness.
        THROW(ERR, "Internal error calculating sub-tick: %i32", hi);
    }
    return (uint32_t)hi;
}