The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#define C_KINO_POSTINGLISTWRITER
#include "KinoSearch/Util/ToolSet.h"

#include "KinoSearch/Index/PostingListWriter.h"
#include "KinoSearch/Analysis/Inversion.h"
#include "KinoSearch/Index/Inverter.h"
#include "KinoSearch/Index/PolyReader.h"
#include "KinoSearch/Index/Posting.h"
#include "KinoSearch/Index/PostingPool.h"
#include "KinoSearch/Index/Segment.h"
#include "KinoSearch/Index/SegReader.h"
#include "KinoSearch/Index/Similarity.h"
#include "KinoSearch/Index/LexiconWriter.h"
#include "KinoSearch/Index/Snapshot.h"
#include "KinoSearch/Plan/Architecture.h"
#include "KinoSearch/Plan/FieldType.h"
#include "KinoSearch/Plan/Schema.h"
#include "KinoSearch/Store/Folder.h"
#include "KinoSearch/Store/InStream.h"
#include "KinoSearch/Store/OutStream.h"
#include "KinoSearch/Util/MemoryPool.h"

static size_t default_mem_thresh = 0x1000000;

int32_t PListWriter_current_file_format = 1;

// Open streams only if content gets added. 
static void
S_lazy_init(PostingListWriter *self);

// Return the PostingPool for this field, creating one if necessary.
static PostingPool*
S_lazy_init_posting_pool(PostingListWriter *self, int32_t field_num);

PostingListWriter*
PListWriter_new(Schema *schema, Snapshot *snapshot, Segment *segment,
                PolyReader *polyreader, LexiconWriter *lex_writer)
{
    PostingListWriter *self 
        = (PostingListWriter*)VTable_Make_Obj(POSTINGLISTWRITER);
    return PListWriter_init(self, schema, snapshot, segment, polyreader,
        lex_writer);
}

PostingListWriter*
PListWriter_init(PostingListWriter *self, Schema *schema, Snapshot *snapshot,
                 Segment *segment, PolyReader *polyreader,
                 LexiconWriter *lex_writer)
{
    DataWriter_init((DataWriter*)self, schema, snapshot, segment, polyreader);

    // Assign. 
    self->lex_writer = (LexiconWriter*)INCREF(lex_writer);

    // Init. 
    self->pools          = VA_new(Schema_Num_Fields(schema));
    self->mem_thresh     = default_mem_thresh;
    self->mem_pool       = MemPool_new(0);
    self->lex_temp_out   = NULL;
    self->post_temp_out  = NULL;

    return self;
}

static void
S_lazy_init(PostingListWriter *self)
{
    if (!self->lex_temp_out) {
        Folder  *folder         = self->folder;
        CharBuf *seg_name       = Seg_Get_Name(self->segment);
        CharBuf *lex_temp_path  = CB_newf("%o/lextemp", seg_name);
        CharBuf *post_temp_path = CB_newf("%o/ptemp", seg_name);
        CharBuf *skip_path      = CB_newf("%o/postings.skip", seg_name);

        // Open temp streams and final skip stream. 
        self->lex_temp_out  = Folder_Open_Out(folder, lex_temp_path);
        if (!self->lex_temp_out) { RETHROW(INCREF(Err_get_error())); }
        self->post_temp_out = Folder_Open_Out(folder, post_temp_path);
        if (!self->post_temp_out) { RETHROW(INCREF(Err_get_error())); }
        self->skip_out = Folder_Open_Out(folder, skip_path);
        if (!self->skip_out) { RETHROW(INCREF(Err_get_error())); }

        DECREF(skip_path);
        DECREF(post_temp_path);
        DECREF(lex_temp_path);
    }
}

static PostingPool*
S_lazy_init_posting_pool(PostingListWriter *self, int32_t field_num)
{
    PostingPool *pool = (PostingPool*)VA_Fetch(self->pools, field_num);
    if (!pool && field_num != 0) {
        CharBuf *field = Seg_Field_Name(self->segment, field_num);
        pool = PostPool_new(self->schema, self->snapshot, self->segment,
            self->polyreader, field, self->lex_writer, self->mem_pool, 
            self->lex_temp_out, self->post_temp_out, self->skip_out);
        VA_Store(self->pools, field_num, (Obj*)pool);
    }
    return pool;
}

void
PListWriter_destroy(PostingListWriter *self)
{
    DECREF(self->lex_writer);
    DECREF(self->mem_pool);
    DECREF(self->pools);
    DECREF(self->lex_temp_out);
    DECREF(self->post_temp_out);
    DECREF(self->skip_out);
    SUPER_DESTROY(self, POSTINGLISTWRITER);
}

void
PListWriter_set_default_mem_thresh(size_t mem_thresh)
{
    default_mem_thresh = mem_thresh;
}

int32_t
PListWriter_format(PostingListWriter *self)
{
    UNUSED_VAR(self);
    return PListWriter_current_file_format;
}

void
PListWriter_add_inverted_doc(PostingListWriter *self, Inverter *inverter, 
                            int32_t doc_id)
{
    S_lazy_init(self);

    // Iterate over fields in document, adding the content of indexed fields
    // to their respective PostingPools.
    float doc_boost = Inverter_Get_Boost(inverter);
    Inverter_Iterate(inverter);
    int32_t field_num;
    while (0 != (field_num = Inverter_Next(inverter))) {
        FieldType *type = Inverter_Get_Type(inverter);
        if (FType_Indexed(type)) {
            Inversion   *inversion = Inverter_Get_Inversion(inverter);
            Similarity  *sim  = Inverter_Get_Similarity(inverter);
            PostingPool *pool = S_lazy_init_posting_pool(self, field_num);
            float length_norm 
                = Sim_Length_Norm(sim, Inversion_Get_Size(inversion));
            PostPool_Add_Inversion(pool, inversion, doc_id, doc_boost, 
                length_norm);
        }
    }

    // If our PostingPools have collectively passed the memory threshold,
    // flush all of them, then release all the RawPostings with a single
    // action.
    if (MemPool_Get_Consumed(self->mem_pool) > self->mem_thresh) {
        for (uint32_t i = 0, max = VA_Get_Size(self->pools); i < max; i++) {
            PostingPool *const pool = (PostingPool*)VA_Fetch(self->pools, i);
            if (pool) { PostPool_Flush(pool); }
        }
        MemPool_Release_All(self->mem_pool);
    }
}

void
PListWriter_add_segment(PostingListWriter *self, SegReader *reader,
                        I32Array *doc_map)
{
    Segment   *other_segment  = SegReader_Get_Segment(reader);
    Schema    *schema         = self->schema;
    Segment   *segment        = self->segment;
    VArray    *all_fields     = Schema_All_Fields(schema);
    S_lazy_init(self);

    for (uint32_t i = 0, max = VA_Get_Size(all_fields); i < max; i++) {
        CharBuf   *field    = (CharBuf*)VA_Fetch(all_fields, i);
        FieldType *type     = Schema_Fetch_Type(schema, field);
        int32_t old_field_num = Seg_Field_Num(other_segment, field);
        int32_t new_field_num = Seg_Field_Num(segment, field);

        if (!FType_Indexed(type)) { continue; }
        if (!old_field_num) { continue; } // not in old segment 
        if (!new_field_num) { THROW(ERR, "Unrecognized field: %o", field); }

        PostingPool *pool = S_lazy_init_posting_pool(self, new_field_num);
        PostPool_Add_Segment(pool, reader, doc_map, 
            (int32_t)Seg_Get_Count(segment));
    }

    // Clean up. 
    DECREF(all_fields);
}

void
PListWriter_finish(PostingListWriter *self)
{
    // If S_lazy_init was never called, we have no data, so bail out. 
    if (!self->lex_temp_out) { return; }

    Folder  *folder = self->folder;
    CharBuf *seg_name = Seg_Get_Name(self->segment);
    CharBuf *lex_temp_path  = CB_newf("%o/lextemp", seg_name);
    CharBuf *post_temp_path = CB_newf("%o/ptemp", seg_name);

    // Close temp streams. 
    OutStream_Close(self->lex_temp_out);
    OutStream_Close(self->post_temp_out);

    // Try to free up some memory. 
    for (uint32_t i = 0, max = VA_Get_Size(self->pools); i < max; i++) {
        PostingPool *pool = (PostingPool*)VA_Fetch(self->pools, i);
        if (pool) { PostPool_Shrink(pool); }
    }

    // Write postings for each field. 
    for (uint32_t i = 0, max = VA_Get_Size(self->pools); i < max; i++) {
        PostingPool *pool = (PostingPool*)VA_Delete(self->pools, i);
        if (pool) { 
            // Write out content for each PostingPool.  Let each PostingPool
            // use more RAM while finishing.  (This is a little dicy, because if
            // Shrink() was ineffective, we may double the RAM footprint.) 
            PostPool_Set_Mem_Thresh(pool, self->mem_thresh);
            PostPool_Flip(pool);
            PostPool_Finish(pool);
            DECREF(pool);
        }
    }

    // Store metadata. 
    Seg_Store_Metadata_Str(self->segment, "postings", 8, 
        (Obj*)PListWriter_Metadata(self));

    // Close down and clean up. 
    OutStream_Close(self->skip_out);
    if (!Folder_Delete(folder, lex_temp_path)) {
        THROW(ERR, "Couldn't delete %o", lex_temp_path);
    }
    if (!Folder_Delete(folder, post_temp_path)) {
        THROW(ERR, "Couldn't delete %o", post_temp_path);
    }
    DECREF(self->skip_out);
    self->skip_out     = NULL;
    DECREF(post_temp_path);
    DECREF(lex_temp_path);

    // Dispatch the LexiconWriter. 
    LexWriter_Finish(self->lex_writer);
}

/* Copyright 2006-2011 Marvin Humphrey
 *
 * This program is free software; you can redistribute it and/or modify
 * under the same terms as Perl itself.
 */