package KinoSearch1::Util::SortExternal;
use strict;
use warnings;
use KinoSearch1::Util::ToolSet;
use base qw( KinoSearch1::Util::CClass );
BEGIN {
__PACKAGE__->init_instance_vars(
# constructor args
invindex => undef,
seg_name => undef,
mem_threshold => 2**24,
);
}
our %instance_vars;
sub new {
my $class = shift;
verify_args( \%instance_vars, @_ );
my %args = ( %instance_vars, @_ );
my $invindex = $args{invindex};
$class = ref($class) || $class;
my $filename = "$args{seg_name}.srt";
$invindex->delete_file($filename) if $invindex->file_exists($filename);
my $outstream = $invindex->open_outstream($filename);
return _new( $class, $outstream,
@args{qw( invindex seg_name mem_threshold )} );
}
# Prepare to start fetching sorted results.
sub sort_all {
my $self = shift;
# deal with any items in the cache right now
if ( $self->_get_num_runs == 0 ) {
# if we've never exceeded mem_threshold, sort in-memory
$self->_sort_cache;
}
else {
# create a run from whatever's in the cache right now
$self->_sort_run;
}
# done adding elements, so close file and reopen as an instream
$self->_get_outstream->close;
my $filename = $self->_get_seg_name . ".srt";
my $instream = $self->_get_invindex()->open_instream($filename);
$self->_set_instream($instream);
# allow fetching now that we're set up
$self->_enable_fetch;
}
sub close { shift->_get_instream()->close }
1;
__END__
__XS__
MODULE = KinoSearch1 PACKAGE = KinoSearch1::Util::SortExternal
void
_new(class, outstream_sv, invindex_sv, seg_name_sv, mem_threshold)
char *class;
SV *outstream_sv;
SV *invindex_sv;
SV *seg_name_sv;
I32 mem_threshold;
PREINIT:
SortExternal *sortex;
PPCODE:
sortex = Kino1_SortEx_new(outstream_sv, invindex_sv, seg_name_sv,
mem_threshold);
ST(0) = sv_newmortal();
sv_setref_pv( ST(0), class, (void*)sortex );
XSRETURN(1);
=for comment
Add one or more items to the sort pool.
=cut
void
feed(sortex, ...)
SortExternal *sortex;
PREINIT:
I32 i;
PPCODE:
for (i = 1; i < items; i++) {
SV const * item_sv = ST(i);
if (!SvPOK(item_sv))
continue;
sortex->feed(sortex, SvPVX(item_sv), SvCUR(item_sv));
}
=for comment
Fetch the next sorted item from the sort pool. sort_all must be called first.
=cut
SV*
fetch(sortex)
SortExternal *sortex;
PREINIT:
ByteBuf *bb;
CODE:
bb = sortex->fetch(sortex);
if (bb == NULL) {
RETVAL = newSV(0);
}
else {
RETVAL = newSVpvn(bb->ptr, bb->size);
Kino1_BB_destroy(bb);
}
OUTPUT: RETVAL
=for comment
Sort all items currently in memory.
=cut
void
_sort_cache(sortex)
SortExternal *sortex;
PPCODE:
Kino1_SortEx_sort_cache(sortex);
=for comment
Sort everything in memory and write the sorted elements to disk, creating a
SortExRun C object.
=cut
void
_sort_run(sortex);
SortExternal *sortex;
PPCODE:
Kino1_SortEx_sort_run(sortex);
=for comment
Turn on fetching.
=cut
void
_enable_fetch(sortex)
SortExternal *sortex;
PPCODE:
Kino1_SortEx_enable_fetch(sortex);
SV*
_set_or_get(sortex, ...)
SortExternal *sortex;
ALIAS:
_set_outstream = 1
_get_outstream = 2
_set_instream = 3
_get_instream = 4
_set_num_runs = 5
_get_num_runs = 6
_set_invindex = 7
_get_invindex = 8
_set_seg_name = 9
_get_seg_name = 10
CODE:
{
KINO_START_SET_OR_GET_SWITCH
case 1: SvREFCNT_dec(sortex->outstream_sv);
sortex->outstream_sv = newSVsv( ST(1) );
Kino1_extract_struct(sortex->outstream_sv, sortex->outstream,
OutStream*, "KinoSearch1::Store::OutStream");
/* fall through */
case 2: RETVAL = newSVsv(sortex->outstream_sv);
break;
case 3: SvREFCNT_dec(sortex->instream_sv);
sortex->instream_sv = newSVsv( ST(1) );
Kino1_extract_struct(sortex->instream_sv, sortex->instream,
InStream*, "KinoSearch1::Store::InStream");
/* fall through */
case 4: RETVAL = newSVsv(sortex->instream_sv);
break;
case 5: Kino1_confess("can't set num_runs");
/* fall through */
case 6: RETVAL = newSViv(sortex->num_runs);
break;
case 7: Kino1_confess("can't set_invindex");
/* fall through */
case 8: RETVAL = newSVsv(sortex->invindex_sv);
break;
case 9: Kino1_confess("can't set_seg_name");
/* fall through */
case 10: RETVAL = newSVsv(sortex->seg_name_sv);
break;
KINO_END_SET_OR_GET_SWITCH
}
OUTPUT: RETVAL
void
DESTROY(sortex)
SortExternal *sortex;
PPCODE:
Kino1_SortEx_destroy(sortex);
__H__
#ifndef H_KINOSEARCH_UTIL_SORT_EXTERNAL
#define H_KINOSEARCH_UTIL_SORT_EXTERNAL 1
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "KinoSearch1StoreInStream.h"
#include "KinoSearch1StoreOutStream.h"
#include "KinoSearch1UtilByteBuf.h"
#include "KinoSearch1UtilCClass.h"
#include "KinoSearch1UtilMemManager.h"
typedef struct sortexrun {
double start;
double file_pos;
double end;
ByteBuf **cache;
I32 cache_cap;
I32 cache_elems;
I32 cache_pos;
I32 slice_size;
} SortExRun;
typedef struct sortexternal {
ByteBuf **cache; /* item cache, both incoming and outgoing */
I32 cache_cap; /* allocated limit for cache */
I32 cache_elems; /* number of elems in cache */
I32 cache_pos; /* index of current element in cache */
ByteBuf **scratch; /* memory for use by mergesort */
I32 scratch_cap; /* allocated limit for scratch */
I32 mem_threshold; /* bytes of mem allowed for cache */
I32 cache_bytes; /* bytes of mem occupied by cache */
I32 run_cache_limit; /* bytes of mem allowed each run cache */
SortExRun **runs;
I32 num_runs;
SV *outstream_sv;
OutStream *outstream;
SV *instream_sv;
InStream *instream;
SV *invindex_sv;
SV *seg_name_sv;
void (*feed) (struct sortexternal*, char*, I32);
ByteBuf* (*fetch)(struct sortexternal*);
} SortExternal;
SortExternal* Kino1_SortEx_new(SV*, SV*, SV*, I32);
void Kino1_SortEx_feed(SortExternal*, char*, I32);
ByteBuf* Kino1_SortEx_fetch(SortExternal*);
ByteBuf* Kino1_SortEx_fetch_death(SortExternal*);
void Kino1_SortEx_enable_fetch(SortExternal*);
void Kino1_SortEx_sort_cache(SortExternal*);
void Kino1_SortEx_sort_run(SortExternal*);
void Kino1_SortEx_destroy(SortExternal*);
#endif /* include guard */
__C__
#include "KinoSearch1UtilSortExternal.h"
static SortExRun* Kino1_SortEx_new_run(double, double);
static void Kino1_SortEx_grow_bufbuf(ByteBuf***, I32, I32);
static I32 Kino1_SortEx_refill_run(SortExternal*, SortExRun*);
static void Kino1_SortEx_refill_cache(SortExternal*);
static void Kino1_SortEx_merge_runs(SortExternal*);
static ByteBuf* Kino1_SortEx_find_endpost(SortExternal*);
static I32 Kino1_SortEx_define_slice(SortExRun*, ByteBuf*);
static void Kino1_SortEx_mergesort(ByteBuf**, ByteBuf**, I32);
static void Kino1_SortEx_msort(ByteBuf**, ByteBuf**, U32, U32);
static void Kino1_SortEx_merge(ByteBuf**, U32, ByteBuf**, U32,
ByteBuf**);
static void Kino1_SortEx_clear_cache(SortExternal*);
static void Kino1_SortEx_clear_run_cache(SortExRun*);
static void Kino1_SortEx_destroy_run(SortExRun*);
#define KINO_PER_ITEM_OVERHEAD (sizeof(ByteBuf) + sizeof(ByteBuf*))
SortExternal*
Kino1_SortEx_new(SV *outstream_sv, SV *invindex_sv, SV *seg_name_sv,
I32 mem_threshold) {
SortExternal *sortex;
/* allocate */
Kino1_New(0, sortex, 1, SortExternal);
Kino1_New(0, sortex->cache, 100, ByteBuf*);
Kino1_New(0, sortex->runs, 1, SortExRun*);
/* init */
sortex->scratch = NULL;
sortex->scratch_cap = 0;
sortex->cache_cap = 100;
sortex->cache_elems = 0;
sortex->cache_pos = 0;
sortex->cache_bytes = 0;
sortex->num_runs = 0;
sortex->instream_sv = &PL_sv_undef;
sortex->feed = Kino1_SortEx_feed;
sortex->fetch = Kino1_SortEx_fetch_death;
/* assign */
sortex->outstream_sv = newSVsv(outstream_sv);
Kino1_extract_struct(outstream_sv, sortex->outstream,
OutStream*, "KinoSearch1::Store::OutStream");
sortex->invindex_sv = newSVsv(invindex_sv);
sortex->seg_name_sv = newSVsv(seg_name_sv);
sortex->mem_threshold = mem_threshold;
/* derive */
sortex->run_cache_limit = mem_threshold / 2;
return sortex;
}
/* Create a new SortExRun object */
static SortExRun*
Kino1_SortEx_new_run(double start, double end) {
SortExRun *run;
/* allocate */
Kino1_New(0, run, 1, SortExRun);
Kino1_New(0, run->cache, 100, ByteBuf*);
/* init */
run->cache_cap = 100;
run->cache_elems = 0;
run->cache_pos = 0;
/* assign */
run->start = start;
run->file_pos = start;
run->end = end;
return run;
}
void
Kino1_SortEx_feed(SortExternal* sortex, char* ptr, I32 len) {
/* add room for more cache elements if needed */
if (sortex->cache_elems == sortex->cache_cap) {
/* add 100, plus 10% of the current capacity */
sortex->cache_cap = sortex->cache_cap + 100 + (sortex->cache_cap / 8);
Kino1_Renew(sortex->cache, sortex->cache_cap, ByteBuf*);
}
sortex->cache[ sortex->cache_elems ] = Kino1_BB_new_string(ptr, len);
sortex->cache_elems++;
/* track memory consumed */
sortex->cache_bytes += KINO_PER_ITEM_OVERHEAD;
sortex->cache_bytes += len + 1;
/* check if it's time to flush the cache */
if (sortex->cache_bytes >= sortex->mem_threshold)
Kino1_SortEx_sort_run(sortex);
}
ByteBuf*
Kino1_SortEx_fetch(SortExternal *sortex) {
if (sortex->cache_pos >= sortex->cache_elems)
Kino1_SortEx_refill_cache(sortex);
if (sortex->cache_elems > 0) {
return sortex->cache[ sortex->cache_pos++ ];
}
else {
return NULL;
}
}
ByteBuf*
Kino1_SortEx_fetch_death(SortExternal *sortex) {
ByteBuf *bb = NULL;
Kino1_confess("can't call fetch before sort_all");
return bb;
}
void
Kino1_SortEx_enable_fetch(SortExternal *sortex) {
sortex->fetch = Kino1_SortEx_fetch;
}
/* Allocate more memory to an array of pointers to pointers to ByteBufs, if
* the current allocation isn't sufficient.
*/
static void
Kino1_SortEx_grow_bufbuf(ByteBuf ***bb_buf, I32 current, I32 desired) {
if (current < desired)
Kino1_Renew(*bb_buf, desired, ByteBuf*);
}
/* Sort the main cache.
*/
void
Kino1_SortEx_sort_cache(SortExternal *sortex) {
Kino1_SortEx_grow_bufbuf(&sortex->scratch, sortex->scratch_cap,
sortex->cache_elems);
Kino1_SortEx_mergesort(sortex->cache, sortex->scratch,
sortex->cache_elems);
}
void
Kino1_SortEx_sort_run(SortExternal *sortex) {
OutStream *outstream;
ByteBuf **cache, **cache_end;
ByteBuf *bb;
double start, end;
/* bail if there's nothing in the cache */
if (sortex->cache_bytes == 0)
return;
/* allocate space for a new run */
sortex->num_runs++;
Kino1_Renew(sortex->runs, sortex->num_runs, SortExRun*);
/* make local copies */
outstream = sortex->outstream;
cache = sortex->cache;
/* mark start of run */
start = outstream->tell(outstream);
/* write sorted items to file */
Kino1_SortEx_sort_cache(sortex);
cache_end = cache + sortex->cache_elems;
for (cache = sortex->cache; cache < cache_end; cache++) {
bb = *cache;
outstream->write_vint(outstream, bb->size);
outstream->write_bytes(outstream, bb->ptr, bb->size);
}
/* clear the cache */
Kino1_SortEx_clear_cache(sortex);
/* mark end of run and build a new SortExRun object */
end = outstream->tell(outstream);
sortex->runs[ sortex->num_runs - 1 ] = Kino1_SortEx_new_run(start, end);
/* recalculate the size allowed for each run's cache */
sortex->run_cache_limit = (sortex->mem_threshold / 2) / sortex->num_runs;
sortex->run_cache_limit = sortex->run_cache_limit < 65536
? 65536
: sortex->run_cache_limit;
}
/* Recover sorted items from disk, up to the allowable memory limit.
*/
static I32
Kino1_SortEx_refill_run(SortExternal* sortex, SortExRun *run) {
InStream *instream;
double end;
I32 run_cache_bytes = 0;
int num_elems = 0; /* number of items recovered */
I32 len;
ByteBuf *bb;
I32 run_cache_limit;
/* see if we actually need to refill */
if (run->cache_elems - run->cache_pos)
return run->cache_elems - run->cache_pos;
else
Kino1_SortEx_clear_run_cache(run);
/* make local copies */
instream = sortex->instream;
run_cache_limit = sortex->run_cache_limit;
end = run->end;
instream->seek(instream, run->file_pos);
while (1) {
/* bail if we've read everything in this run */
if (instream->tell(instream) >= end) {
/* make sure we haven't read too much */
if (instream->tell(instream) > end) {
UV pos = instream->tell(instream);
Kino1_confess(
"read past end of run: %"UVuf", %"UVuf, pos, (UV)end );
}
break;
}
/* bail if we've hit the ceiling for this run's cache */
if (run_cache_bytes > run_cache_limit)
break;
/* retrieve and decode len; allocate a ByteBuf and recover the string */
len = instream->read_vint(instream);
bb = Kino1_BB_new(len);
instream->read_bytes(instream, bb->ptr, len);
bb->ptr[len] = '\0';
/* add to the run's cache */
if (num_elems == run->cache_cap) {
run->cache_cap = run->cache_cap + 100 + (run->cache_cap / 8);
Kino1_Renew(run->cache, run->cache_cap, ByteBuf*);
}
run->cache[ num_elems ] = bb;
/* track how much we've read so far */
num_elems++;
run_cache_bytes += len + 1 + KINO_PER_ITEM_OVERHEAD;
}
/* reset the cache array position and length; remember file pos */
run->cache_elems = num_elems;
run->cache_pos = 0;
run->file_pos = instream->tell(instream);
return num_elems;
}
/* Refill the main cache, drawing from the caches of all runs.
*/
static void
Kino1_SortEx_refill_cache(SortExternal *sortex) {
ByteBuf *endpost;
SortExRun *run;
I32 i = 0;
I32 total = 0;
/* free all the existing ByteBufs, as they've been fetched by now */
Kino1_SortEx_clear_cache(sortex);
/* make sure all runs have at least one item in the cache */
while (i < sortex->num_runs) {
run = sortex->runs[i];
if ( (run->cache_elems > run->cache_pos)
|| (Kino1_SortEx_refill_run(sortex, run))
) {
i++;
}
else {
/* discard empty runs */
Kino1_SortEx_destroy_run(run);
sortex->num_runs--;
sortex->runs[i] = sortex->runs[ sortex->num_runs ];
sortex->runs[ sortex->num_runs ] = NULL;
}
}
if (!sortex->num_runs)
return;
/* move as many items as possible into the sorting cache */
endpost = Kino1_SortEx_find_endpost(sortex);
for (i = 0; i < sortex->num_runs; i++) {
total += Kino1_SortEx_define_slice(sortex->runs[i], endpost);
}
/* make sure we have enough room in both the main cache and the scratch */
Kino1_SortEx_grow_bufbuf(&sortex->cache, sortex->cache_cap, total);
Kino1_SortEx_grow_bufbuf(&sortex->scratch, sortex->scratch_cap, total);
Kino1_SortEx_merge_runs(sortex);
sortex->cache_elems = total;
}
/* Merge all the items which are "in-range" from all the Runs into the main
* cache.
*/
static void
Kino1_SortEx_merge_runs(SortExternal *sortex) {
SortExRun *run;
ByteBuf ***slice_starts;
ByteBuf **cache = sortex->cache;
I32 *slice_sizes;
I32 i = 0, j = 0, slice_size = 0, num_slices = 0;
Kino1_New(0, slice_starts, sortex->num_runs, ByteBuf**);
Kino1_New(0, slice_sizes, sortex->num_runs, I32);
/* copy all the elements in range into the cache */
j = 0;
for (i = 0; i < sortex->num_runs; i++) {
run = sortex->runs[i];
slice_size = run->slice_size;
if (slice_size == 0)
continue;
slice_sizes[j] = slice_size;
slice_starts[j] = cache;
Copy( (run->cache + run->cache_pos), cache, slice_size, ByteBuf* );
run->cache_pos += slice_size;
cache += slice_size;
num_slices = ++j;
}
/* exploit previous sorting, rather than sort cache naively */
while (num_slices > 1) {
/* leave the first slice intact if the number of slices is odd */
i = 0;
j = 0;
while (i < num_slices) {
if (num_slices - i >= 2) {
/* merge two consecutive slices */
slice_size = slice_sizes[i] + slice_sizes[i+1];
Kino1_SortEx_merge(slice_starts[i], slice_sizes[i],
slice_starts[i+1], slice_sizes[i+1], sortex->scratch);
slice_sizes[j] = slice_size;
slice_starts[j] = slice_starts[i];
Copy(sortex->scratch, slice_starts[j], slice_size, ByteBuf*);
i += 2;
j += 1;
}
else if (num_slices - i >= 1) {
/* move single slice pointer */
slice_sizes[j] = slice_sizes[i];
slice_starts[j] = slice_starts[i];
i += 1;
j += 1;
}
}
num_slices = j;
}
Kino1_Safefree(slice_starts);
Kino1_Safefree(slice_sizes);
}
/* Return a pointer to the item in one of the runs' caches which is
* the highest in sort order, but which we can guarantee is lower in sort
* order than any item which has yet to enter a run cache.
*/
static ByteBuf*
Kino1_SortEx_find_endpost(SortExternal *sortex) {
int i;
ByteBuf *endpost = NULL, *candidate = NULL;
SortExRun *run;
for (i = 0; i < sortex->num_runs; i++) {
/* get a run and verify no errors */
run = sortex->runs[i];
if (run->cache_pos == run->cache_elems || run->cache_elems < 1)
Kino1_confess("find_endpost encountered an empty run cache");
/* get the last item in this run's cache */
candidate = run->cache[ run->cache_elems - 1 ];
/* if it's the first run, the item is automatically the new endpost */
if (i == 0) {
endpost = candidate;
continue;
}
/* if it's less than the current endpost, it's the new endpost */
else if (Kino1_BB_compare(candidate, endpost) < 0) {
endpost = candidate;
}
}
return endpost;
}
/* Record the number of items in the run's cache which are lexically
* less than or equal to the endpost.
*/
static I32
Kino1_SortEx_define_slice(SortExRun *run, ByteBuf *endpost) {
I32 lo, mid, hi, delta;
ByteBuf **cache = run->cache;
/* operate on a slice of the cache */
lo = run->cache_pos - 1;
hi = run->cache_elems;
/* binary search */
while (hi - lo > 1) {
mid = (lo + hi) / 2;
delta = Kino1_BB_compare(cache[mid], endpost);
if (delta > 0)
hi = mid;
else
lo = mid;
}
run->slice_size = lo == -1
? 0
: (lo - run->cache_pos) + 1;
return run->slice_size;
}
/* Standard merge sort.
*/
static void
Kino1_SortEx_mergesort(ByteBuf **bufbuf, ByteBuf **scratch, I32 buf_size) {
if (buf_size == 0)
return;
Kino1_SortEx_msort(bufbuf, scratch, 0, buf_size - 1);
}
/* Standard merge sort msort function.
*/
static void
Kino1_SortEx_msort(ByteBuf **bufbuf, ByteBuf **scratch, U32 left, U32 right) {
I32 mid;
if (right > left) {
mid = ( (right+left)/2 ) + 1;
Kino1_SortEx_msort(bufbuf, scratch, left, mid - 1);
Kino1_SortEx_msort(bufbuf, scratch, mid, right);
Kino1_SortEx_merge( (bufbuf + left), (mid - left),
(bufbuf + mid), (right - mid + 1), scratch);
Copy( scratch, (bufbuf + left), (right - left + 1), ByteBuf* );
}
}
/* Standard mergesort merge function. This variant is capable of merging two
* discontiguous source arrays. Copying elements back into the source is left
* for the caller.
*/
static void
Kino1_SortEx_merge(ByteBuf **left_ptr, U32 left_size,
ByteBuf **right_ptr, U32 right_size,
ByteBuf **dest) {
ByteBuf **left_boundary, **right_boundary;
left_boundary = left_ptr + left_size;
right_boundary = right_ptr + right_size;
while (left_ptr < left_boundary && right_ptr < right_boundary) {
if (Kino1_BB_compare(*left_ptr, *right_ptr) < 1) {
*dest++ = *left_ptr++;
}
else {
*dest++ = *right_ptr++;
}
}
while (left_ptr < left_boundary) {
*dest++ = *left_ptr++;
}
while (right_ptr < right_boundary) {
*dest++ = *right_ptr++;
}
}
static void
Kino1_SortEx_clear_cache(SortExternal *sortex) {
ByteBuf **cache, **cache_end;
cache_end = sortex->cache + sortex->cache_elems;
/* only blow away items that haven't been released */
for (cache = sortex->cache + sortex->cache_pos;
cache < cache_end; cache++
) {
Kino1_BB_destroy(*cache);
}
sortex->cache_bytes = 0;
sortex->cache_elems = 0;
sortex->cache_pos = 0;
}
static void
Kino1_SortEx_clear_run_cache(SortExRun *run) {
ByteBuf **cache, **cache_end;
cache_end = run->cache + run->cache_elems;
/* only destroy items which haven't been passed to the main cache */
for (cache = run->cache + run->cache_pos; cache < cache_end; cache++) {
Kino1_BB_destroy(*cache);
}
run->cache_elems = 0;
run->cache_pos = 0;
}
void
Kino1_SortEx_destroy(SortExternal *sortex) {
I32 i;
/* delegate to Perl garbage collector */
SvREFCNT_dec(sortex->outstream_sv);
SvREFCNT_dec(sortex->instream_sv);
SvREFCNT_dec(sortex->invindex_sv);
SvREFCNT_dec(sortex->seg_name_sv);
/* free the cache and the scratch */
Kino1_SortEx_clear_cache(sortex);
Kino1_Safefree(sortex->cache);
Kino1_Safefree(sortex->scratch);
/* free all of the runs and the array that held them */
for (i = 0; i < sortex->num_runs; i++) {
Kino1_SortEx_destroy_run(sortex->runs[i]);
}
Kino1_Safefree(sortex->runs);
/* free me */
Kino1_Safefree(sortex);
}
static void
Kino1_SortEx_destroy_run(SortExRun *run) {
Kino1_SortEx_clear_run_cache(run);
Kino1_Safefree(run->cache);
Kino1_Safefree(run);
}
__POD__
==begin devdocs
==head1 NAME
KinoSearch1::Util::SortExternal - external sorting
==head1 DESCRIPTION
External sorting implementation, using lexical comparison.
==head1 COPYRIGHT
Copyright 2005-2010 Marvin Humphrey
==head1 LICENSE, DISCLAIMER, BUGS, etc.
See L<KinoSearch1> version 1.01.
==end devdocs
==cut