The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#define C_KINO_BBSORTEX
#include "KinoSearch/Util/ToolSet.h"

#include "KinoSearch/Test/Util/BBSortEx.h"

#include "KinoSearch/Index/Segment.h"
#include "KinoSearch/Store/InStream.h"
#include "KinoSearch/Store/Folder.h"
#include "KinoSearch/Store/OutStream.h"

BBSortEx*
BBSortEx_new(uint32_t mem_threshold, VArray *external)
{
    BBSortEx *self = (BBSortEx*)VTable_Make_Obj(BBSORTEX);
    return BBSortEx_init(self, mem_threshold, external);
}

BBSortEx*
BBSortEx_init(BBSortEx *self, uint32_t mem_threshold, VArray *external)
{
    SortEx_init((SortExternal*)self, sizeof(Obj*));
    self->external_tick = 0;
    self->external = (VArray*)INCREF(external);
    self->mem_consumed = 0;
    BBSortEx_Set_Mem_Thresh(self, mem_threshold);
    return self;
}

void
BBSortEx_destroy(BBSortEx *self) 
{
    DECREF(self->external);
    SUPER_DESTROY(self, BBSORTEX);
}

void
BBSortEx_clear_cache(BBSortEx *self) 
{
    Obj **const cache = (Obj**)self->cache;
    for (uint32_t i = self->cache_tick, max = self->cache_max; i < max; i++) {
        DECREF(cache[i]);
    }
    self->mem_consumed = 0;
    BBSortEx_clear_cache_t super_clear_cache = (BBSortEx_clear_cache_t)
        SUPER_METHOD(self->vtable, SortEx, Clear_Cache);
    super_clear_cache(self);
}

void
BBSortEx_feed(BBSortEx *self, void *data)
{
    SortEx_feed((SortExternal*)self, data);

    // Flush() if necessary. 
    ByteBuf *bytebuf = (ByteBuf*)CERTIFY(*(ByteBuf**)data, BYTEBUF);
    self->mem_consumed += BB_Get_Size(bytebuf);
    if (self->mem_consumed >= self->mem_thresh) {
        BBSortEx_Flush(self);
    }
}

void
BBSortEx_flush(BBSortEx *self)
{
    uint32_t     cache_count = self->cache_max - self->cache_tick;
    Obj        **cache = (Obj**)self->cache;
    VArray      *elems;
    BBSortEx    *run;
    uint32_t     i;

    if (!cache_count) return;
    else elems = VA_new(cache_count);

    // Sort, then create a new run. 
    BBSortEx_Sort_Cache(self);
    for (i = self->cache_tick; i < self->cache_max; i++) {
        VA_Push(elems, cache[i]);
    }
    run = BBSortEx_new(0, elems);
    DECREF(elems);
    BBSortEx_Add_Run(self, (SortExternal*)run);

    // Blank the cache vars. 
    self->cache_tick += cache_count;
    BBSortEx_Clear_Cache(self);
}

uint32_t
BBSortEx_refill(BBSortEx *self)
{
    // Make sure cache is empty, then set cache tick vars. 
    if (self->cache_max - self->cache_tick > 0) {
        THROW(ERR, "Refill called but cache contains %u32 items",
            self->cache_max - self->cache_tick);
    }
    self->cache_tick = 0;
    self->cache_max  = 0;

    // Read in elements. 
    while (1) {
        ByteBuf *elem = NULL;

        if (self->mem_consumed >= self->mem_thresh) {
            self->mem_consumed = 0;
            break;
        }
        else if (self->external_tick >= VA_Get_Size(self->external)) {
            break;
        }
        else {
            elem = (ByteBuf*)VA_Fetch(self->external, self->external_tick);
            self->external_tick++;
            // Should be + sizeof(ByteBuf), but that's ok. 
            self->mem_consumed += BB_Get_Size(elem); 
        }

        if (self->cache_max == self->cache_cap) {
            BBSortEx_Grow_Cache(self,
                Memory_oversize(self->cache_max + 1, self->width));
        }
        Obj **cache = (Obj**)self->cache;
        cache[ self->cache_max++ ] = INCREF(elem);
    }

    return self->cache_max;
}

void
BBSortEx_flip(BBSortEx *self)
{
    uint32_t i;
    uint32_t run_mem_thresh = 65536;

    BBSortEx_Flush(self);

    // Recalculate the approximate mem allowed for each run. 
    uint32_t num_runs = VA_Get_Size(self->runs);
    if (num_runs) {
        run_mem_thresh = (self->mem_thresh / 2) / num_runs;
        if (run_mem_thresh < 65536)
            run_mem_thresh = 65536;
    }

    for (i = 0; i < num_runs; i++) {
        BBSortEx *run = (BBSortEx*)VA_Fetch(self->runs, i);
        BBSortEx_Set_Mem_Thresh(run, run_mem_thresh);
    }

    // OK to fetch now. 
    self->flipped = true;
}

int
BBSortEx_compare(BBSortEx *self, void *va, void *vb)
{
    UNUSED_VAR(self);
    return BB_compare( (ByteBuf**)va, (ByteBuf**)vb );
}

/* Copyright 2006-2011 Marvin Humphrey
 *
 * This program is free software; you can redistribute it and/or modify
 * under the same terms as Perl itself.
 */