The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
/* Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define C_LUCY_SEGWRITER
#include "Lucy/Util/ToolSet.h"

#include "Lucy/Index/SegWriter.h"
#include "Lucy/Document/Doc.h"
#include "Lucy/Plan/Schema.h"
#include "Lucy/Store/DirHandle.h"
#include "Lucy/Store/Folder.h"
#include "Lucy/Index/DeletionsWriter.h"
#include "Lucy/Index/Inverter.h"
#include "Lucy/Index/PolyReader.h"
#include "Lucy/Index/Segment.h"
#include "Lucy/Index/SegReader.h"
#include "Lucy/Index/Snapshot.h"
#include "Lucy/Plan/Architecture.h"

SegWriter*
SegWriter_new(Schema *schema, Snapshot *snapshot, Segment *segment,
              PolyReader *polyreader) {
    SegWriter *self = (SegWriter*)VTable_Make_Obj(SEGWRITER);
    return SegWriter_init(self, schema, snapshot, segment, polyreader);
}

SegWriter*
SegWriter_init(SegWriter *self, Schema *schema, Snapshot *snapshot,
               Segment *segment, PolyReader *polyreader) {
    Architecture *arch   = Schema_Get_Architecture(schema);
    DataWriter_init((DataWriter*)self, schema, snapshot, segment, polyreader);
    self->by_api   = Hash_new(0);
    self->inverter = Inverter_new(schema, segment);
    self->writers  = VA_new(16);
    Arch_Init_Seg_Writer(arch, self);
    return self;
}

void
SegWriter_destroy(SegWriter *self) {
    DECREF(self->inverter);
    DECREF(self->writers);
    DECREF(self->by_api);
    DECREF(self->del_writer);
    SUPER_DESTROY(self, SEGWRITER);
}

void
SegWriter_register(SegWriter *self, const CharBuf *api,
                   DataWriter *component) {
    CERTIFY(component, DATAWRITER);
    if (Hash_Fetch(self->by_api, (Obj*)api)) {
        THROW(ERR, "API %o already registered", api);
    }
    Hash_Store(self->by_api, (Obj*)api, (Obj*)component);
}

Obj*
SegWriter_fetch(SegWriter *self, const CharBuf *api) {
    return Hash_Fetch(self->by_api, (Obj*)api);
}

void
SegWriter_add_writer(SegWriter *self, DataWriter *writer) {
    VA_Push(self->writers, (Obj*)writer);
}

void
SegWriter_prep_seg_dir(SegWriter *self) {
    Folder  *folder   = SegWriter_Get_Folder(self);
    CharBuf *seg_name = Seg_Get_Name(self->segment);

    // Clear stale segment files from crashed indexing sessions.
    if (Folder_Exists(folder, seg_name)) {
        bool_t result = Folder_Delete_Tree(folder, seg_name);
        if (!result) {
            THROW(ERR, "Couldn't completely remove '%o'", seg_name);
        }
    }

    // Create the segment directory.
    bool_t result = Folder_MkDir(folder, seg_name);
    if (!result) { RETHROW(INCREF(Err_get_error())); }
}

void
SegWriter_add_doc(SegWriter *self, Doc *doc, float boost) {
    int32_t doc_id = (int32_t)Seg_Increment_Count(self->segment, 1);
    Inverter_Invert_Doc(self->inverter, doc);
    Inverter_Set_Boost(self->inverter, boost);
    SegWriter_Add_Inverted_Doc(self, self->inverter, doc_id);
}

void
SegWriter_add_inverted_doc(SegWriter *self, Inverter *inverter,
                           int32_t doc_id) {
    for (uint32_t i = 0, max = VA_Get_Size(self->writers); i < max; i++) {
        DataWriter *writer = (DataWriter*)VA_Fetch(self->writers, i);
        DataWriter_Add_Inverted_Doc(writer, inverter, doc_id);
    }
}

// Adjust current doc id. We create our own doc_count rather than rely on
// SegReader's number because the DeletionsWriter and the SegReader are
// probably out of sync.
static void
S_adjust_doc_id(SegWriter *self, SegReader *reader, I32Array *doc_map) {
    uint32_t doc_count = SegReader_Doc_Max(reader);
    for (uint32_t i = 1, max = I32Arr_Get_Size(doc_map); i < max; i++) {
        if (I32Arr_Get(doc_map, i) == 0) { doc_count--; }
    }
    Seg_Increment_Count(self->segment, doc_count);
}

void
SegWriter_add_segment(SegWriter *self, SegReader *reader, I32Array *doc_map) {
    // Bulk add the slab of documents to the various writers.
    for (uint32_t i = 0, max = VA_Get_Size(self->writers); i < max; i++) {
        DataWriter *writer = (DataWriter*)VA_Fetch(self->writers, i);
        DataWriter_Add_Segment(writer, reader, doc_map);
    }

    // Bulk add the segment to the DeletionsWriter, so that it can merge
    // previous segment files as necessary.
    DelWriter_Add_Segment(self->del_writer, reader, doc_map);

    // Adust the document id.
    S_adjust_doc_id(self, reader, doc_map);
}

void
SegWriter_merge_segment(SegWriter *self, SegReader *reader,
                        I32Array *doc_map) {
    Snapshot *snapshot = SegWriter_Get_Snapshot(self);
    CharBuf  *seg_name = Seg_Get_Name(SegReader_Get_Segment(reader));

    // Have all the sub-writers merge the segment.
    for (uint32_t i = 0, max = VA_Get_Size(self->writers); i < max; i++) {
        DataWriter *writer = (DataWriter*)VA_Fetch(self->writers, i);
        DataWriter_Merge_Segment(writer, reader, doc_map);
    }
    DelWriter_Merge_Segment(self->del_writer, reader, doc_map);

    // Remove seg directory from snapshot.
    Snapshot_Delete_Entry(snapshot, seg_name);

    // Adust the document id.
    S_adjust_doc_id(self, reader, doc_map);
}

void
SegWriter_delete_segment(SegWriter *self, SegReader *reader) {
    Snapshot *snapshot = SegWriter_Get_Snapshot(self);
    CharBuf  *seg_name = Seg_Get_Name(SegReader_Get_Segment(reader));

    // Have all the sub-writers delete the segment.
    for (uint32_t i = 0, max = VA_Get_Size(self->writers); i < max; i++) {
        DataWriter *writer = (DataWriter*)VA_Fetch(self->writers, i);
        DataWriter_Delete_Segment(writer, reader);
    }
    DelWriter_Delete_Segment(self->del_writer, reader);

    // Remove seg directory from snapshot.
    Snapshot_Delete_Entry(snapshot, seg_name);
}

void
SegWriter_finish(SegWriter *self) {
    CharBuf *seg_name = Seg_Get_Name(self->segment);

    // Finish off children.
    for (uint32_t i = 0, max = VA_Get_Size(self->writers); i < max; i++) {
        DataWriter *writer = (DataWriter*)VA_Fetch(self->writers, i);
        DataWriter_Finish(writer);
    }

    // Write segment metadata and add the segment directory to the snapshot.
    Snapshot *snapshot = SegWriter_Get_Snapshot(self);
    CharBuf *segmeta_filename = CB_newf("%o/segmeta.json", seg_name);
    Seg_Write_File(self->segment, self->folder);
    Snapshot_Add_Entry(snapshot, seg_name);
    DECREF(segmeta_filename);

    // Collapse segment files into compound file.
    Folder_Consolidate(self->folder, seg_name);
}

void
SegWriter_add_data_writer(SegWriter *self, DataWriter *writer) {
    VA_Push(self->writers, (Obj*)writer);
}

void
SegWriter_set_del_writer(SegWriter *self, DeletionsWriter *del_writer) {
    DECREF(self->del_writer);
    self->del_writer = (DeletionsWriter*)INCREF(del_writer);
}

DeletionsWriter*
SegWriter_get_del_writer(SegWriter *self) {
    return self->del_writer;
}