The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
/* Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define C_LUCY_SORTFIELDWRITER
#define C_LUCY_SFWRITERELEM
#include "Lucy/Util/ToolSet.h"

#include "Lucy/Index/SortFieldWriter.h"
#include "Lucy/Index/Inverter.h"
#include "Lucy/Index/PolyReader.h"
#include "Lucy/Index/Segment.h"
#include "Lucy/Index/SegReader.h"
#include "Lucy/Index/Snapshot.h"
#include "Lucy/Index/SortCache.h"
#include "Lucy/Index/SortCache/NumericSortCache.h"
#include "Lucy/Index/SortCache/TextSortCache.h"
#include "Lucy/Index/SortReader.h"
#include "Lucy/Index/SortWriter.h"
#include "Lucy/Plan/FieldType.h"
#include "Lucy/Plan/Schema.h"
#include "Lucy/Store/Folder.h"
#include "Lucy/Store/InStream.h"
#include "Lucy/Store/OutStream.h"
#include "Lucy/Util/NumberUtils.h"
#include "Clownfish/Blob.h"
#include "Clownfish/Num.h"
#include "Clownfish/Util/Memory.h"
#include "Clownfish/Util/SortUtils.h"

// Prepare to read back a run.
static void
S_flip_run(SortFieldWriter *run, uint32_t sub_thresh, InStream *ord_in,
           InStream *ix_in, InStream *dat_in);

// Write out a sort cache.  Returns the number of unique values in the sort
// cache.
static int32_t
S_write_files(SortFieldWriter *self, OutStream *ord_out, OutStream *ix_out,
              OutStream *dat_out);

// Constructor for an element in the sort pool.
static SFWriterElem*
S_SFWriterElem_create(Obj *value, int32_t doc_id);

static int64_t
SI_increase_to_word_multiple(int64_t amount) {
    const int64_t remainder = amount % (int64_t)sizeof(void*);
    if (remainder) {
        amount += sizeof(void*);
        amount -= remainder;
    }
    return amount;
}

SortFieldWriter*
SortFieldWriter_new(Schema *schema, Snapshot *snapshot, Segment *segment,
                    PolyReader *polyreader, String *field,
                    Counter *counter, uint32_t mem_thresh,
                    OutStream *temp_ord_out, OutStream *temp_ix_out,
                    OutStream *temp_dat_out) {
    SortFieldWriter *self
        = (SortFieldWriter*)Class_Make_Obj(SORTFIELDWRITER);
    return SortFieldWriter_init(self, schema, snapshot, segment, polyreader,
                                field, counter, mem_thresh, temp_ord_out,
                                temp_ix_out, temp_dat_out);
}

SortFieldWriter*
SortFieldWriter_init(SortFieldWriter *self, Schema *schema,
                     Snapshot *snapshot, Segment *segment,
                     PolyReader *polyreader, String *field,
                     Counter *counter, uint32_t mem_thresh,
                     OutStream *temp_ord_out, OutStream *temp_ix_out,
                     OutStream *temp_dat_out) {
    // Init.
    SortEx_init((SortExternal*)self);
    SortFieldWriterIVARS *const ivars = SortFieldWriter_IVARS(self);
    ivars->null_ord        = -1;
    ivars->count           = 0;
    ivars->ord_start       = 0;
    ivars->ord_end         = 0;
    ivars->ix_start        = 0;
    ivars->ix_end          = 0;
    ivars->dat_start       = 0;
    ivars->dat_end         = 0;
    ivars->run_cardinality = -1;
    ivars->run_max         = -1;
    ivars->sort_cache      = NULL;
    ivars->doc_map         = NULL;
    ivars->sorted_ids      = NULL;
    ivars->run_tick        = 1;
    ivars->ord_width       = 0;

    // Assign.
    ivars->field        = Str_Clone(field);
    ivars->schema       = (Schema*)INCREF(schema);
    ivars->snapshot     = (Snapshot*)INCREF(snapshot);
    ivars->segment      = (Segment*)INCREF(segment);
    ivars->polyreader   = (PolyReader*)INCREF(polyreader);
    ivars->counter      = (Counter*)INCREF(counter);
    ivars->temp_ord_out = (OutStream*)INCREF(temp_ord_out);
    ivars->temp_ix_out  = (OutStream*)INCREF(temp_ix_out);
    ivars->temp_dat_out = (OutStream*)INCREF(temp_dat_out);
    ivars->mem_thresh   = mem_thresh;

    // Derive.
    ivars->field_num = Seg_Field_Num(segment, field);
    FieldType *type = (FieldType*)CERTIFY(
                          Schema_Fetch_Type(ivars->schema, field), FIELDTYPE);
    ivars->type    = (FieldType*)INCREF(type);
    ivars->prim_id = FType_Primitive_ID(type);
    ivars->mem_per_entry = Class_Get_Obj_Alloc_Size(SFWRITERELEM);
    if (ivars->prim_id == FType_TEXT) {
        ivars->mem_per_entry += Class_Get_Obj_Alloc_Size(STRING);
        ivars->var_width = true;
    }
    else if (ivars->prim_id == FType_BLOB) {
        ivars->mem_per_entry += Class_Get_Obj_Alloc_Size(BLOB);
        ivars->var_width = true;
    }
    else {
        ivars->mem_per_entry += Class_Get_Obj_Alloc_Size(FLOAT);
        ivars->var_width = false;
    }

    return self;
}

void
SortFieldWriter_Destroy_IMP(SortFieldWriter *self) {
    SortFieldWriterIVARS *const ivars = SortFieldWriter_IVARS(self);
    DECREF(ivars->field);
    DECREF(ivars->schema);
    DECREF(ivars->snapshot);
    DECREF(ivars->segment);
    DECREF(ivars->polyreader);
    DECREF(ivars->type);
    DECREF(ivars->counter);
    DECREF(ivars->temp_ord_out);
    DECREF(ivars->temp_ix_out);
    DECREF(ivars->temp_dat_out);
    DECREF(ivars->ord_in);
    DECREF(ivars->ix_in);
    DECREF(ivars->dat_in);
    DECREF(ivars->sort_cache);
    DECREF(ivars->doc_map);
    FREEMEM(ivars->sorted_ids);
    SUPER_DESTROY(self, SORTFIELDWRITER);
}

int32_t
SortFieldWriter_Get_Null_Ord_IMP(SortFieldWriter *self) {
    return SortFieldWriter_IVARS(self)->null_ord;
}

int32_t
SortFieldWriter_Get_Ord_Width_IMP(SortFieldWriter *self) {
    return SortFieldWriter_IVARS(self)->ord_width;
}

void
SortFieldWriter_Add_IMP(SortFieldWriter *self, int32_t doc_id, Obj *value) {
    SortFieldWriterIVARS *const ivars = SortFieldWriter_IVARS(self);
    Counter *counter   = ivars->counter;
    Counter_Add(counter, (int64_t)ivars->mem_per_entry);
    if (ivars->prim_id == FType_TEXT) {
        int64_t size = (int64_t)Str_Get_Size((String*)value) + 1;
        size = SI_increase_to_word_multiple(size);
        Counter_Add(counter, size);
    }
    else if (ivars->prim_id == FType_BLOB) {
        int64_t size = (int64_t)Blob_Get_Size((Blob*)value) + 1;
        size = SI_increase_to_word_multiple(size);
        Counter_Add(counter, size);
    }
    SFWriterElem *elem = S_SFWriterElem_create(Obj_Clone(value), doc_id);
    SortFieldWriter_Feed(self, (Obj*)elem);
    ivars->count++;
}

void
SortFieldWriter_Add_Segment_IMP(SortFieldWriter *self, SegReader *reader,
                                I32Array *doc_map, SortCache *sort_cache) {
    if (!sort_cache) { return; }
    SortFieldWriterIVARS *const ivars = SortFieldWriter_IVARS(self);
    SortFieldWriter *run
        = SortFieldWriter_new(ivars->schema, ivars->snapshot, ivars->segment,
                              ivars->polyreader, ivars->field, ivars->counter,
                              ivars->mem_thresh, NULL, NULL, NULL);
    SortFieldWriterIVARS *const run_ivars = SortFieldWriter_IVARS(run);
    run_ivars->sort_cache = (SortCache*)INCREF(sort_cache);
    run_ivars->doc_map    = (I32Array*)INCREF(doc_map);
    run_ivars->run_max    = SegReader_Doc_Max(reader);
    run_ivars->run_cardinality = SortCache_Get_Cardinality(sort_cache);
    run_ivars->null_ord   = SortCache_Get_Null_Ord(sort_cache);
    SortFieldWriter_Add_Run(self, (SortExternal*)run);
}

static int32_t
S_calc_width(int32_t cardinality) {
    if (cardinality <= 0x00000002)      { return 1; }
    else if (cardinality <= 0x00000004) { return 2; }
    else if (cardinality <= 0x0000000F) { return 4; }
    else if (cardinality <= 0x000000FF) { return 8; }
    else if (cardinality <= 0x0000FFFF) { return 16; }
    else                                { return 32; }
}

static void
S_write_ord(void *ords, int32_t width, int32_t doc_id, int32_t ord) {
    switch (width) {
        case 1:
            if (ord) { NumUtil_u1set(ords, (uint32_t)doc_id); }
            else     { NumUtil_u1clear(ords, (uint32_t)doc_id); }
            break;
        case 2:
            NumUtil_u2set(ords, (uint32_t)doc_id, (uint8_t)ord);
            break;
        case 4:
            NumUtil_u4set(ords, (uint32_t)doc_id, (uint8_t)ord);
            break;
        case 8: {
                uint8_t *ints = (uint8_t*)ords;
                ints[doc_id] = (uint8_t)ord;
            }
            break;
        case 16: {
                uint8_t *bytes = (uint8_t*)ords;
                bytes += (size_t)doc_id * sizeof(uint16_t);
                NumUtil_encode_bigend_u16((uint16_t)ord, &bytes);
            }
            break;
        case 32: {
                uint8_t *bytes = (uint8_t*)ords;
                bytes += (size_t)doc_id * sizeof(uint32_t);
                NumUtil_encode_bigend_u32((uint32_t)ord, &bytes);
            }
            break;
        default:
            THROW(ERR, "Invalid width: %i32", width);
    }
}

static void
S_write_val(Obj *val, int8_t prim_id, OutStream *ix_out, OutStream *dat_out,
            int64_t dat_start) {
    if (val) {
        switch (prim_id & FType_PRIMITIVE_ID_MASK) {
            case FType_TEXT: {
                    String *string = (String*)val;
                    int64_t dat_pos = OutStream_Tell(dat_out) - dat_start;
                    OutStream_Write_I64(ix_out, dat_pos);
                    OutStream_Write_Bytes(dat_out, Str_Get_Ptr8(string),
                                          Str_Get_Size(string));
                    break;
                }
            case FType_BLOB: {
                    Blob *blob = (Blob*)val;
                    int64_t dat_pos = OutStream_Tell(dat_out) - dat_start;
                    OutStream_Write_I64(ix_out, dat_pos);
                    OutStream_Write_Bytes(dat_out, Blob_Get_Buf(blob),
                                          Blob_Get_Size(blob));
                    break;
                }
            case FType_INT32: {
                    int32_t i32 = (int32_t)Int_Get_Value((Integer*)val);
                    OutStream_Write_I32(dat_out, i32);
                    break;
                }
            case FType_INT64: {
                    int64_t i64 = Int_Get_Value((Integer*)val);
                    OutStream_Write_I64(dat_out, i64);
                    break;
                }
            case FType_FLOAT32: {
                    float f32 = (float)Float_Get_Value((Float*)val);
                    OutStream_Write_F32(dat_out, f32);
                    break;
                }
            case FType_FLOAT64: {
                    double f64 = Float_Get_Value((Float*)val);
                    OutStream_Write_F64(dat_out, f64);
                    break;
                }
            default:
                THROW(ERR, "Unrecognized primitive id: %i32", (int32_t)prim_id);
        }
    }
    else {
        switch (prim_id & FType_PRIMITIVE_ID_MASK) {
            case FType_TEXT:
            case FType_BLOB: {
                    int64_t dat_pos = OutStream_Tell(dat_out) - dat_start;
                    OutStream_Write_I64(ix_out, dat_pos);
                }
                break;
            case FType_INT32:
                OutStream_Write_I32(dat_out, 0);
                break;
            case FType_INT64:
                OutStream_Write_I64(dat_out, 0);
                break;
            case FType_FLOAT64:
                OutStream_Write_F64(dat_out, 0.0);
                break;
            case FType_FLOAT32:
                OutStream_Write_F32(dat_out, 0.0f);
                break;
            default:
                THROW(ERR, "Unrecognized primitive id: %i32", (int32_t)prim_id);
        }
    }
}

int
SortFieldWriter_Compare_IMP(SortFieldWriter *self, Obj **ptr_a, Obj **ptr_b) {
    SortFieldWriterIVARS *const ivars = SortFieldWriter_IVARS(self);
    SFWriterElemIVARS *a = SFWriterElem_IVARS(*(SFWriterElem**)ptr_a);
    SFWriterElemIVARS *b = SFWriterElem_IVARS(*(SFWriterElem**)ptr_b);
    int32_t comparison
        = FType_null_back_compare_values(ivars->type, a->value, b->value);
    if (comparison == 0) { comparison = a->doc_id - b->doc_id; }
    return comparison;
}

static void
S_lazy_init_sorted_ids(SortFieldWriter *self) {
    SortFieldWriterIVARS *const ivars = SortFieldWriter_IVARS(self);
    if (ivars->sorted_ids) { return; }

    // Counting sort. Could be optimized by working directly on the
    // ordinal arrays.

    SortCache *sort_cache      = ivars->sort_cache;
    int32_t    run_cardinality = ivars->run_cardinality;
    int32_t    run_max         = ivars->run_max;

    // Count.
    int32_t *counts
        = (int32_t*)CALLOCATE((size_t)run_cardinality, sizeof(int32_t));
    for (int32_t doc_id = 0; doc_id <= run_max; ++doc_id) {
        int32_t ord = SortCache_Ordinal(sort_cache, doc_id);
        ++counts[ord];
    }

    // Compute partial sums.
    int32_t sum = 0;
    for (int32_t ord = 0; ord < run_cardinality; ++ord) {
        int32_t count = counts[ord];
        counts[ord] = sum;
        sum += count;
    }

    // Distribute.
    int32_t *sorted_ids
        = (int32_t*)MALLOCATE(((size_t)run_max + 1) * sizeof(int32_t));
    for (int32_t doc_id = 0; doc_id <= run_max; ++doc_id) {
        int32_t ord = SortCache_Ordinal(sort_cache, doc_id);
        int32_t pos = counts[ord]++;
        sorted_ids[pos] = doc_id;
    }

    ivars->sorted_ids = sorted_ids;
    FREEMEM(counts);
}

void
SortFieldWriter_Flush_IMP(SortFieldWriter *self) {
    SortFieldWriterIVARS *const ivars = SortFieldWriter_IVARS(self);

    // Don't add a run unless we have data to put in it.
    if (SortFieldWriter_Buffer_Count(self) == 0) { return; }

    OutStream *const temp_ord_out = ivars->temp_ord_out;
    OutStream *const temp_ix_out  = ivars->temp_ix_out;
    OutStream *const temp_dat_out = ivars->temp_dat_out;

    SortFieldWriter_Sort_Buffer(self);
    SortFieldWriter *run
        = SortFieldWriter_new(ivars->schema, ivars->snapshot, ivars->segment,
                              ivars->polyreader, ivars->field, ivars->counter,
                              ivars->mem_thresh, NULL, NULL, NULL);
    SortFieldWriterIVARS *const run_ivars = SortFieldWriter_IVARS(run);

    // Record stream starts and align.
    run_ivars->ord_start = OutStream_Align(temp_ord_out, sizeof(int64_t));
    if (ivars->var_width) {
        run_ivars->ix_start  = OutStream_Align(temp_ix_out, sizeof(int64_t));
    }
    run_ivars->dat_start = OutStream_Align(temp_dat_out, sizeof(int64_t));

    // Have the run borrow the array of elems.
    run_ivars->buffer   = ivars->buffer;
    run_ivars->buf_max  = ivars->buf_max;
    run_ivars->buf_tick = ivars->buf_tick;
    run_ivars->buf_cap  = ivars->buf_cap;

    // Write files, record stats.
    run_ivars->run_max = (int32_t)Seg_Get_Count(ivars->segment);
    run_ivars->run_cardinality = S_write_files(run, temp_ord_out, temp_ix_out,
                                               temp_dat_out);

    // Reclaim the buffer from the run and empty it.
    run_ivars->buffer    = NULL;
    run_ivars->buf_max   = 0;
    run_ivars->buf_tick  = 0;
    run_ivars->buf_cap   = 0;
    ivars->buf_tick = ivars->buf_max;
    SortFieldWriter_Clear_Buffer(self);

    // Record stream ends.
    run_ivars->ord_end = OutStream_Tell(temp_ord_out);
    if (ivars->var_width) {
        run_ivars->ix_end  = OutStream_Tell(temp_ix_out);
    }
    run_ivars->dat_end = OutStream_Tell(temp_dat_out);

    // Add the run to the array.
    SortFieldWriter_Add_Run(self, (SortExternal*)run);
}

uint32_t
SortFieldWriter_Refill_IMP(SortFieldWriter *self) {
    SortFieldWriterIVARS *const ivars = SortFieldWriter_IVARS(self);
    if (!ivars->sort_cache) { return 0; }

    // Sanity check, then reset the buffer and prepare to start loading items.
    uint32_t buf_count = SortFieldWriter_Buffer_Count(self);
    if (buf_count) {
        THROW(ERR, "Refill called but buffer contains %u32 items",
              buf_count);
    }
    SortFieldWriter_Clear_Buffer(self);
    Counter_Reset(ivars->counter);
    S_lazy_init_sorted_ids(self);

    const int32_t    null_ord   = ivars->null_ord;
    I32Array *const  doc_map    = ivars->doc_map;
    SortCache *const sort_cache = ivars->sort_cache;

    uint32_t count = 0;
    while (ivars->run_tick <= ivars->run_max
           && Counter_Get_Value(ivars->counter) < ivars->mem_thresh
          ) {
        int32_t raw_doc_id = ivars->sorted_ids[ivars->run_tick];
        int32_t ord = SortCache_Ordinal(sort_cache, raw_doc_id);
        if (ord != null_ord) {
            int32_t remapped = doc_map
                               ? I32Arr_Get(doc_map, (size_t)raw_doc_id)
                               : raw_doc_id;
            if (remapped) {
                Obj *val = SortCache_Value(sort_cache, ord);
                SortFieldWriter_Add(self, remapped, val);
                count++;
                DECREF(val);
            }
        }
        ivars->run_tick++;
    }

    if (ivars->run_tick > ivars->run_max) {
        DECREF(ivars->sort_cache);
        ivars->sort_cache = NULL;
        FREEMEM(ivars->sorted_ids);
        ivars->sorted_ids = NULL;
    }

    return count;
}

void
SortFieldWriter_Flip_IMP(SortFieldWriter *self) {
    SortFieldWriterIVARS *const ivars = SortFieldWriter_IVARS(self);
    uint32_t num_items = SortFieldWriter_Buffer_Count(self);
    size_t   num_runs = Vec_Get_Size(ivars->runs);

    if (ivars->flipped) { THROW(ERR, "Can't call Flip() twice"); }
    ivars->flipped = true;

    // Sanity check.
    if (num_runs && num_items) {
        THROW(ERR, "Sanity check failed: num_runs: %u32 num_items: %u32",
              (uint32_t)num_runs, num_items);
    }

    if (num_items) {
        SortFieldWriter_Sort_Buffer(self);
    }
    else if (num_runs) {
        Folder  *folder = PolyReader_Get_Folder(ivars->polyreader);
        String *seg_name = Seg_Get_Name(ivars->segment);
        String *ord_path = Str_newf("%o/sort_ord_temp", seg_name);
        ivars->ord_in = Folder_Open_In(folder, ord_path);
        DECREF(ord_path);
        if (!ivars->ord_in) { RETHROW(INCREF(Err_get_error())); }
        if (ivars->var_width) {
            String *ix_path = Str_newf("%o/sort_ix_temp", seg_name);
            ivars->ix_in = Folder_Open_In(folder, ix_path);
            DECREF(ix_path);
            if (!ivars->ix_in) { RETHROW(INCREF(Err_get_error())); }
        }
        String *dat_path = Str_newf("%o/sort_dat_temp", seg_name);
        ivars->dat_in = Folder_Open_In(folder, dat_path);
        DECREF(dat_path);
        if (!ivars->dat_in) { RETHROW(INCREF(Err_get_error())); }

        // Assign streams and a slice of mem_thresh.
        uint32_t sub_thresh = ivars->mem_thresh / num_runs;
        if (sub_thresh < 65536) { sub_thresh = 65536; }
        for (size_t i = 0; i < num_runs; i++) {
            SortFieldWriter *run = (SortFieldWriter*)Vec_Fetch(ivars->runs, i);
            S_flip_run(run, sub_thresh, ivars->ord_in, ivars->ix_in,
                       ivars->dat_in);
        }
    }

    ivars->flipped = true;
}

static int32_t
S_write_files(SortFieldWriter *self, OutStream *ord_out, OutStream *ix_out,
              OutStream *dat_out) {
    SortFieldWriterIVARS *const ivars = SortFieldWriter_IVARS(self);
    int8_t    prim_id   = ivars->prim_id;
    int32_t   doc_max   = (int32_t)Seg_Get_Count(ivars->segment);
    bool      has_nulls = ivars->count == doc_max ? false : true;
    size_t    size      = ((size_t)doc_max + 1) * sizeof(int32_t);
    int32_t  *ords      = (int32_t*)MALLOCATE(size);
    int32_t   ord       = 0;
    int64_t   dat_start = OutStream_Tell(dat_out);

    // Assign -1 as a stand-in for the NULL ord.
    for (int32_t i = 0; i <= doc_max; i++) {
        ords[i] = -1;
    }

    // Grab the first item and record its ord.  Add a dummy ord for invalid
    // doc id 0.
    SFWriterElem *elem = (SFWriterElem*)SortFieldWriter_Fetch(self);
    SFWriterElemIVARS *elem_ivars = SFWriterElem_IVARS(elem);
    if (elem_ivars->doc_id > doc_max) {
        THROW(ERR, "doc_id %i32 greater than doc_max %i32",
              elem_ivars->doc_id, doc_max);
    }
    ords[elem_ivars->doc_id] = ord;
    ords[0] = 0;

    // Build array of ords, write non-NULL sorted values.
    Obj *last_val = INCREF(elem_ivars->value);
    S_write_val(elem_ivars->value, prim_id, ix_out, dat_out, dat_start);
    DECREF(elem);
    while (NULL != (elem = (SFWriterElem*)SortFieldWriter_Fetch(self))) {
        elem_ivars = SFWriterElem_IVARS(elem);
        if (elem_ivars->value != last_val) {
            int32_t comparison
                = FType_Compare_Values(ivars->type, elem_ivars->value,
                                       last_val);
            if (comparison != 0) {
                ord++;
                S_write_val(elem_ivars->value, prim_id, ix_out, dat_out,
                            dat_start);
            }
            DECREF(last_val);
            last_val = INCREF(elem_ivars->value);
        }
        if (elem_ivars->doc_id > doc_max) {
            THROW(ERR, "doc_id %i32 greater than doc_max %i32",
                  elem_ivars->doc_id, doc_max);
        }
        ords[elem_ivars->doc_id] = ord;
        DECREF(elem);
    }
    DECREF(last_val);

    // If there are NULL values, write one now and record the NULL ord.
    if (has_nulls) {
        S_write_val(NULL, prim_id, ix_out, dat_out, dat_start);
        ord++;
        ivars->null_ord = ord;
    }
    int32_t null_ord = ivars->null_ord;

    // Write one extra file pointer so that we can always derive length.
    if (ivars->var_width) {
        OutStream_Write_I64(ix_out, OutStream_Tell(dat_out) - dat_start);
    }

    // Calculate cardinality and ord width.
    int32_t cardinality = ord + 1;
    ivars->ord_width     = S_calc_width(cardinality);
    int32_t ord_width   = ivars->ord_width;

    // Write ords.
    size_t byte_count = SIZE_MAX;
    switch (ord_width) {
        case 1:
            byte_count = (((size_t)doc_max + 1) + 7) / 8;
            break;
        case 2:
            byte_count = (((size_t)doc_max + 1) + 3) / 4;
            break;
        case 4:
            byte_count = (((size_t)doc_max + 1) + 1) / 2;
            break;
        case 8:
            byte_count = (size_t)doc_max + 1;
            break;
        case 16:
            byte_count = ((size_t)doc_max + 1) * 2;
            break;
        case 32:
            byte_count = ((size_t)doc_max + 1) * 4;
            break;
        default:
            THROW(ERR, "Invalid width: %i32", ord_width);

    }
    char *compressed_ords = (char*)CALLOCATE(byte_count, sizeof(char));
    for (int32_t i = 0; i <= doc_max; i++) {
        int32_t real_ord = ords[i] == -1 ? null_ord : ords[i];
        S_write_ord(compressed_ords, ord_width, i, real_ord);
    }
    OutStream_Write_Bytes(ord_out, compressed_ords, byte_count);
    FREEMEM(compressed_ords);

    FREEMEM(ords);
    return cardinality;
}

int32_t
SortFieldWriter_Finish_IMP(SortFieldWriter *self) {
    SortFieldWriterIVARS *const ivars = SortFieldWriter_IVARS(self);

    // Bail if there's no data.
    if (!SortFieldWriter_Peek(self)) { return 0; }

    int32_t  field_num = ivars->field_num;
    Folder  *folder    = PolyReader_Get_Folder(ivars->polyreader);
    String  *seg_name  = Seg_Get_Name(ivars->segment);

    // Open streams.
    String *ord_path = Str_newf("%o/sort-%i32.ord", seg_name, field_num);
    OutStream *ord_out = Folder_Open_Out(folder, ord_path);
    DECREF(ord_path);
    if (!ord_out) { RETHROW(INCREF(Err_get_error())); }
    OutStream *ix_out = NULL;
    if (ivars->var_width) {
        String *ix_path = Str_newf("%o/sort-%i32.ix", seg_name, field_num);
        ix_out = Folder_Open_Out(folder, ix_path);
        DECREF(ix_path);
        if (!ix_out) { RETHROW(INCREF(Err_get_error())); }
    }
    String *dat_path = Str_newf("%o/sort-%i32.dat", seg_name, field_num);
    OutStream *dat_out = Folder_Open_Out(folder, dat_path);
    DECREF(dat_path);
    if (!dat_out) { RETHROW(INCREF(Err_get_error())); }

    int32_t cardinality = S_write_files(self, ord_out, ix_out, dat_out);

    // Close streams.
    OutStream_Close(ord_out);
    if (ix_out) { OutStream_Close(ix_out); }
    OutStream_Close(dat_out);
    DECREF(dat_out);
    DECREF(ix_out);
    DECREF(ord_out);

    return cardinality;
}

static void
S_flip_run(SortFieldWriter *run, uint32_t sub_thresh, InStream *ord_in,
           InStream *ix_in, InStream *dat_in) {
    SortFieldWriterIVARS *const run_ivars = SortFieldWriter_IVARS(run);

    if (run_ivars->flipped) { THROW(ERR, "Can't Flip twice"); }
    run_ivars->flipped = true;

    // Get our own slice of mem_thresh.
    DECREF(run_ivars->counter);
    run_ivars->counter    = Counter_new();
    run_ivars->mem_thresh = sub_thresh;

    // Done if we already have a SortCache to read from.
    if (run_ivars->sort_cache) { return; }

    // Open the temp files for reading.
    String *seg_name  = Seg_Get_Name(run_ivars->segment);
    String *ord_alias = Str_newf("%o/sort_ord_temp-%i64-to-%i64", seg_name,
                                 run_ivars->ord_start, run_ivars->ord_end);
    InStream *ord_in_dupe
        = InStream_Reopen(ord_in, ord_alias, run_ivars->ord_start,
                          run_ivars->ord_end - run_ivars->ord_start);
    DECREF(ord_alias);
    InStream *ix_in_dupe = NULL;
    if (run_ivars->var_width) {
        String *ix_alias = Str_newf("%o/sort_ix_temp-%i64-to-%i64", seg_name,
                                    run_ivars->ix_start, run_ivars->ix_end);
        ix_in_dupe = InStream_Reopen(ix_in, ix_alias, run_ivars->ix_start,
                                     run_ivars->ix_end - run_ivars->ix_start);
        DECREF(ix_alias);
    }
    String *dat_alias = Str_newf("%o/sort_dat_temp-%i64-to-%i64", seg_name,
                                 run_ivars->dat_start, run_ivars->dat_end);
    InStream *dat_in_dupe
        = InStream_Reopen(dat_in, dat_alias, run_ivars->dat_start,
                          run_ivars->dat_end - run_ivars->dat_start);
    DECREF(dat_alias);

    // Get a SortCache.
    String *field = Seg_Field_Name(run_ivars->segment, run_ivars->field_num);
    switch (run_ivars->prim_id & FType_PRIMITIVE_ID_MASK) {
        case FType_TEXT:
            run_ivars->sort_cache = (SortCache*)TextSortCache_new(
                                  field, run_ivars->type, run_ivars->run_cardinality,
                                  run_ivars->run_max, run_ivars->null_ord,
                                  run_ivars->ord_width, ord_in_dupe,
                                  ix_in_dupe, dat_in_dupe);
            break;
        case FType_INT32:
            run_ivars->sort_cache = (SortCache*)I32SortCache_new(
                                  field, run_ivars->type, run_ivars->run_cardinality,
                                  run_ivars->run_max, run_ivars->null_ord,
                                  run_ivars->ord_width, ord_in_dupe,
                                  dat_in_dupe);
            break;
        case FType_INT64:
            run_ivars->sort_cache = (SortCache*)I64SortCache_new(
                                  field, run_ivars->type, run_ivars->run_cardinality,
                                  run_ivars->run_max, run_ivars->null_ord,
                                  run_ivars->ord_width, ord_in_dupe,
                                  dat_in_dupe);
            break;
        case FType_FLOAT32:
            run_ivars->sort_cache = (SortCache*)F32SortCache_new(
                                  field, run_ivars->type, run_ivars->run_cardinality,
                                  run_ivars->run_max, run_ivars->null_ord,
                                  run_ivars->ord_width, ord_in_dupe,
                                  dat_in_dupe);
            break;
        case FType_FLOAT64:
            run_ivars->sort_cache = (SortCache*)F64SortCache_new(
                                  field, run_ivars->type, run_ivars->run_cardinality,
                                  run_ivars->run_max, run_ivars->null_ord,
                                  run_ivars->ord_width, ord_in_dupe,
                                  dat_in_dupe);
            break;
        default:
            THROW(ERR, "No SortCache class for %o", run_ivars->type);
    }

    DECREF(ord_in_dupe);
    DECREF(ix_in_dupe);
    DECREF(dat_in_dupe);
}

/***************************************************************************/

static SFWriterElem*
S_SFWriterElem_create(Obj *value, int32_t doc_id) {
    SFWriterElem *self = (SFWriterElem*)Class_Make_Obj(SFWRITERELEM);
    SFWriterElemIVARS *ivars = SFWriterElem_IVARS(self);
    ivars->value  = value;
    ivars->doc_id = doc_id;
    return self;
}

void
SFWriterElem_Destroy_IMP(SFWriterElem *self) {
    SFWriterElemIVARS *ivars = SFWriterElem_IVARS(self);
    DECREF(ivars->value);
    SUPER_DESTROY(self, SFWRITERELEM);
}