#!/usr/bin/perl
#
# $Header: /Users/claude/fuzz/lib/Genezzo/Index/RCS/bt2.pm,v 7.1 2005/07/19 07:49:03 claude Exp claude $
#
# copyright (c) 2003, 2004 Jeffrey I Cohen, all rights reserved, worldwide
#
#
use strict;
use warnings;
package Genezzo::Index::bt2;
use Genezzo::Util;
use Genezzo::Block::Std;
use Genezzo::Block::RowDir;
use Genezzo::PushHash::PushHash;
use Genezzo::Block::RDBlk_NN;
use Genezzo::Block::RDBlkA;
use Genezzo::Block::RDBArray;
use Genezzo::BufCa::BufCa;
use Carp;
use warnings::register;
BEGIN {
use Exporter ();
our ($VERSION, @ISA, @EXPORT, @EXPORT_OK, %EXPORT_TAGS);
# set the version for version checking
# $VERSION = 1.00;
# if using RCS/CVS, this may be preferred
$VERSION = do { my @r = (q$Revision: 7.1 $ =~ /\d+/g); sprintf "%d."."%02d" x $#r, @r }; # must be all one line, for MakeMaker
@ISA = qw(Exporter);
@EXPORT = ();
%EXPORT_TAGS = (); # eg: TAG => [ qw!name1 name2! ],
# your exported package globals go here,
# as well as any optionally exported functions
@EXPORT_OK = qw($bt2numcmp $bt2strcmp
$bt2numeq $bt2streq);
};
BEGIN {
# Use Greg Bacon's design for array-based objects as demonstrated
# in Mark Rogaski's <wendigo> Tree::Ternary
#
# Left, right, and nodeid are the only ones that will be used in
# every node, the others will only be defined in the leftmost
# (original root).
#
# left, right: immediate left and right siblings
#
# leftmost node (original root) holds height, (current) root
# nodeid, and rightmost nodeid
my @ATTRIBUTES = qw(
A_LEFT
A_RIGHT
A_NODEID
A_HEIGHT
A_ROOT
A_RIGHTMOST
);
# (from Tree::Ternary)
# Construct the code to declare our constants, execute, and check for
# errors (this was so much simpler in Pascal!)
#
my $attrcode = join "\n",
map qq[ sub $ATTRIBUTES[$_] () { $_ } ],
0..$#ATTRIBUTES;
eval $attrcode;
if ($@) {
require Carp;
Carp::croak("Failed to initialize module index: $@\n");
}
sub attrname {
return undef
unless (scalar(@_));
return undef
if (($_[0] !~ /\d+/) || ($_[0] > $#ATTRIBUTES));
return $ATTRIBUTES[$_[0]];
}
};
# basic numeric and string comparison functions
our $bt2numcmp = sub
{
return ($_[0] < $_[1]);
};
our $bt2numeq = sub
{
return ($_[0] == $_[1]);
};
our $bt2strcmp = sub
{
return ($_[0] lt $_[1]);
};
our $bt2streq = sub
{
return ($_[0] eq $_[1]);
};
# Packing/UnPacking functions:
# pr1 - single scalar key, single scalar value
our $pr1 = sub { return PackRow(@_); } ;
our $upr1 = sub { return UnPackRow(@_, $Genezzo::Util::UNPACK_TEMPL_ARR); } ;
# pr2 - array key, single scalar value
our $pr2 = sub {
my @k1;
push @k1, @{$_[0]->[0]}; # get key portion
push @k1, $_[0]->[1]; # get value
# greet @k1;
return PackRow(\@k1);
} ;
our $upr2 = sub {
my @a1 = UnPackRow(@_, $Genezzo::Util::UNPACK_TEMPL_ARR);
my @entry;
# greet @a1;
$entry[1] = pop @a1; # remove value from end
$entry[0] = \@a1; # key in remainder of array
# greet @entry;
return @entry;
} ;
# pr3 - single scalar key, array value
our $pr3 = sub {
# greet "pr3";
my @k1;
push @k1, $_[0]->[0]; # get key portion
push @k1, @{$_[0]->[1]}; # get value
# greet @k1;
return PackRow(\@k1);
} ;
our $upr3 = sub {
# greet "upr3";
my @a1 = UnPackRow(@_, $Genezzo::Util::UNPACK_TEMPL_ARR);
my @entry;
# greet @a1;
$entry[0] = shift @a1; # key in front of array
$entry[1] = \@a1; # value in remainder of array
# greet @entry;
return @entry;
} ;
# pr4 - array key of specified length, variable values
our $pr4 = sub {
my @k1;
push @k1, @{$_[0]->[0]}; # get key portion
push @k1, $_[0]->[1] # get value
if (defined($_[0]->[1]));
# greet @k1;
return PackRow(\@k1);
} ;
our $upr4 = sub {
my @args = @_;
# XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX
# NOTE: keycount is first argument
# XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX
my $keycount = shift @args;
my @a1 = UnPackRow(@args, $Genezzo::Util::UNPACK_TEMPL_ARR);
my @entry;
# greet @a1;
# extract key vector of size keycount - remainder is value array
my $numelts = scalar(@a1);
if ($numelts == $keycount)
{
$entry[0] = \@a1; # key is entire array
$entry[1] = $a1[-1]; # value is last part of key column
}
elsif ($numelts == ($keycount + 1))
{
$entry[1] = pop @a1; # remove value from end
$entry[0] = \@a1; # key in remainder of array
}
else
{
# splice off the key portion
my @kk = splice(@a1, 0, $keycount);
$entry[0] = \@kk; # key
$entry[1] = \@a1; # value is an array
}
# greet @entry;
return @entry;
} ;
=head1 _build_cmp_and_eq
construct comparison/equality callbacks
my $cmp1 = sub
{
my ($k1, $k2) = @_;
# NOTE: use "spaceship" (-1,0,1) comparison with
# short-circuit OR (which returns 0 or VALUE, not 0 or 1)
# to perform multi-column key comparison
# a la Schwartzian Transform
return (
( ($k1->[0] <=> $k2->[0])
|| ($k1->[1] <=> $k2->[1])) == -1
);
};
my $eq1 = sub
{
my ($k1, $k2) = @_;
return (($k1->[0] == $k2->[0])
&& ($k1->[1] == $k2->[1])
);
};
=cut
# XXX: note - not a class or instance method
sub _build_cmp_and_eq
{
my $keyvec = shift @_;
my $lastcol = (scalar(@{$keyvec}) - 1);
my ($eq_expr, $cmp_expr) = ('(', '((');
for my $i (0..$lastcol)
{
unless (0 == $i)
{
$eq_expr .= ' && ';
$cmp_expr .= ' || ';
}
my $ix = $i . ']';
my $k1 = '$k1->[' . $ix;
my $k2 = '$k2->[' . $ix;
my ($eq_op, $cmp_op) =
($keyvec->[$i] =~ m/n/) ? (' == ', ' <=> ') : (' eq ', ' cmp ');
$eq_expr .= '(' . $k1 . $eq_op . $k2 . ')';
$cmp_expr .= '(' . $k1 . $cmp_op . $k2 . ')';
}
$eq_expr .= ')';
$cmp_expr .= ') == -1)';
my ($eq1, $cmp1);
my $eq_sub = '$eq1 = sub {my ($k1, $k2) = @_; ';
# if (1 && !$Genezzo::Util::QUIETWHISPER)
# {
# $eq_sub .= 'greet $k1, $k2; ';
# }
$eq_sub .= 'return (' . $eq_expr . ');};';
# greet $eq_sub;
eval $eq_sub;
if ($@)
{
whisper "failed to evaluate $eq_sub";
return undef;
}
my $cmp_sub = '$cmp1 = sub {my ($k1, $k2) = @_; ';
# if (1 && !$Genezzo::Util::QUIETWHISPER)
# {
# $cmp_sub .= 'greet $k1, $k2; ';
# }
$cmp_sub .= 'return (' . $cmp_expr . ');};';
# greet $cmp_sub;
eval $cmp_sub;
if ($@)
{
whisper "failed to evaluate $cmp_sub";
return undef;
}
return ($eq1, $cmp1);
} # end build cmp and eq
sub new
{ #sub new
# greet @_;
# whoami;
my $invocant = shift;
my $class = ref($invocant) || $invocant ;
my $self = { };
# my $self->{root} = ();
my %optional = (maxsize => 50, numblocks => 100,
blocksize => $Genezzo::Block::Std::DEFBLOCKSIZE,
compare => $bt2strcmp, equal => $bt2streq,
pack_fn => $pr1, unpack_fn => $upr1,
use_IOT => 0,
unique_key => 0,
);
my %args = (%optional,
@_);
$self->{maxsize} = $args{maxsize};
$self->{blocksize} = $args{blocksize};
# XXX XXX: max key len at 1/3 blocksize, need at least 2 keys per block
$self->{maxkeysize} = $args{blocksize} / 3;
$self->{maxblockno} = $args{numblocks};
# NOTE: index-organized tables have an array of values, versus a
# single value. Need to make distinction for case of link entries
# (single val) in branch versus branch entries (array val)
$self->{use_IOT} = $args{use_IOT}; # XXX XXX: needed for makenodeentry
if (exists($args{key_type}))
{
my $ktype = $args{key_type};
# greet $ktype;
if (ref($args{key_type}))
{
my $ref_type = ref($args{key_type});
# greet $ref_type;
if ($ref_type ne "ARRAY")
{
whisper "no packing function for $ref_type";
return 0;
}
my $use_keycount = 0;
if ($args{use_keycount})
{
# special packing/unpacking for case of multiple keys,
# variable number [ 0..n ] of values.
$self->{keycount} = scalar(@{$ktype});
$use_keycount = 1;
}
else
{
$self->{keycount} = 0;
}
if ($self->{use_IOT})
{
whisper "no packing function for IOT with array key";
return 0;
}
# construct callbacks for comparison, equality
my @foo = _build_cmp_and_eq($ktype);
return 0
unless (scalar(@foo) == 2);
my ($eq1, $cmp1) = @foo;
if ($use_keycount)
{
# packing function takes keycount
$self->{pack_fn} = $pr4;
$self->{unpack_fn} = $upr4;
}
else
{
# array key, single scalar value
$self->{pack_fn} = $pr2;
$self->{unpack_fn} = $upr2;
}
$self->{compare} = $cmp1;
$self->{equal} = $eq1;
}
else # either char or numeric scalar key
{
unless ($ktype =~ m/^(c|n)$/)
{
whisper "unknown key type $ktype";
return 0;
}
if ($self->{use_IOT})
{ # single scalar key, array value
$self->{pack_fn} = $pr3;
$self->{unpack_fn} = $upr3;
}
else
{ # single scalar key, single scalar value
$self->{pack_fn} = $pr1;
$self->{unpack_fn} = $upr1;
}
($self->{compare}, $self->{equal}) =
($ktype =~ m/^n$/) ?
($bt2numcmp, $bt2numeq)
: ($bt2strcmp, $bt2streq);
}
}
else # no key_type specified -
# get packing and comparison functions from @args
{
$self->{pack_fn} = $args{pack_fn};
$self->{unpack_fn} = $args{unpack_fn};
$self->{compare} = $args{compare};
$self->{equal} = $args{equal};
}
# force uniqueness via a check at insert time. Should have no
# duplicates in leaves or branches. Calling searchR at height
# zero in insertR should be okay, since the recursive calling
# convention is identical at the branch level for both functions.
$self->{unique_key} = $args{unique_key};
# whoami %args;
return undef
unless _more_init($self, %args);
return bless $self, $class;
} # end new
sub _more_init
{
my $self = shift;
$self->{maxnodeid} = 0;
$self->{blocknum} = 0;
if ($self->{maxblockno})
{
# don't create a buffer cache if numblocks = 0
# (Note: need a new getarr and make_new_block methods in
# subclass if no bc created...)
$self->{bc} =
Genezzo::BufCa::BufCa->new(blocksize => $self->{blocksize},
numblocks => $self->{maxblockno});
return 0
unless (defined($self->{bc}));
}
# $self->{maxblockno} = -1;
$self->{statistics} = {
count => 0,
lastkey_count => 0,
last_was_last => 0,
keysize => {max => 0, min => 0}
};
return 1;
}
sub stats
{
my $self = shift;
my %stats;
$stats{count} = $self->{statistics}->{count};
$stats{lastkey_count} = $self->{statistics}->{lastkey_count};
$stats{keysize} = $self->{statistics}->{keysize};
$stats{makeysize} = $self->{maxkeysize};
$stats{height} = $self->{height};
$stats{nodecount} = $self->{maxnodeid};
return %stats;
}
sub _pack_row
{
my $self = shift;
# packs an "entry", which is a reference to a 2 element array, a
# key/value or key/link pair. key and value can be scalars or
# vectors -- it is the responsiblity of the packing function to
# convert them to a byte string.
my $p1 = $self->{pack_fn};
return &$p1(@_);
# returns a byte string - flattened row
}
sub _unpack_row
{
my $self = shift;
# unpacks a byte string and returns an array. Callers assume the
# array is a two element key/value or key/link pair, which is
# identical the output of makenodeentry.
my $up1 = $self->{unpack_fn};
return &$up1($self->{keycount}, @_)
if ($self->{keycount});
return &$up1(@_);
}
sub _make_new_block
{
my $self = shift;
# whoami;
my $blocknum = $self->{blocknum};
$self->{blocknum} += 1;
$self->{maxnodeid} += 1;
return $blocknum;
}
sub _getarr
{
my ($self, $blocknum) = @_;
my @outi;
my $bceref = $self->{bc}->ReadBlock(blocknum => $blocknum);
push @outi, $bceref; # block stays pinned as long as bceref is in scope
# obtain the actual Buffer Cache Element
my $bce = $$bceref;
local $Genezzo::Block::Std::DEFBLOCKSIZE = $self->{blocksize};
my $buff = $bce->{bigbuf};
my %h1;
my $blockclass = "Genezzo::Block::RDBlkA";
# my $blockclass = "Genezzo::Block::RDBlk_NN";
# tie a hash using the buffer
my $tie_thing = tie %h1, $blockclass, (refbufstr => $buff,
pctfree => 0,);
# XXX XXX XXX: should be able to use pctfree=0, but get some
# errors in Index1.t; Not sure if the broken code is in bt2 or
# rdblock...
my @a1;
# tie an array using the rdblka tied hash
my %args1 = (RDBlockHash => $tie_thing, RDBlock_Class => $blockclass);
my $t2 = tie @a1, "Genezzo::Block::RDBArray", %args1;
push @outi, $tie_thing;
push @outi, \%h1;
# return the tied array first, then the bceref, then the tied hash
unshift @outi, \@a1;
return @outi;
}
sub _makenode
{
my $self = shift;
my %optional = (height => 0
);
my %args = (%optional,
@_);
# at height zero check if enough space to handle splits at every
# level
unless ($args{height})
{
return undef
unless $self->_spacecheck($self->{height});
}
# build the metadata:
# left, right, nodeid
my @foo = ('', '', $self->{maxnodeid});
my $blocknum = $self->_make_new_block();
# store metadata in buffer
my ($currarr, $curr_bce, $curr_ph) = $self->_getarr($blocknum);
$self->_SetMeta($curr_ph,\@foo);
return $blocknum;
}
sub _makenodeentry
{
my $self = shift;
# %optional,
my %args = (
@_);
# key, link|value
# greet %args;
return undef
unless (exists($args{key}));
if (exists($args{value}))
{
my @outi = ($args{key}, $args{value});
return \@outi;
}
elsif (exists($args{link}))
{
# convert link to array (to match value array) for IOT case
# so packing/unpacking functions can work correctly
my $link1 = ($self->{use_IOT}) ? [$args{link}] : $args{link};
my @outi = ($args{key}, $link1);
return \@outi;
}
# XXX XXX XXX : else value TBD - callback fn
return undef;
}
my %ins_stat = (
ins_fail => "insert failed",
split_ok => "insert failed, but split okay",
split_fail => "insert okay, but split failed",
no_joy => "failed badly"
);
sub _setMainMeta
{
my $self = shift;
my ($lftmost_arr, $lftmost_bce, $lftmost_ph) =
$self->_getarr($self->{leftmost});
my @lftmost_meta1 = $self->_GetMeta($lftmost_ph);
# greet @lftmost_meta1;
$lftmost_meta1[A_HEIGHT] = $self->{height};
$lftmost_meta1[A_ROOT] = $self->{root};
$lftmost_meta1[A_RIGHTMOST] = $self->{rightmost};
# greet @lftmost_meta1;
return ($self->_SetMeta($lftmost_ph,\@lftmost_meta1));
}
sub _getMainMeta
{
my ($self, $blocknum) = @_;
my ($lftmost_arr, $lftmost_bce, $lftmost_ph) =
$self->_getarr($blocknum);
my @lftmost_meta1 = $self->_GetMeta($lftmost_ph);
# greet @lftmost_meta1;
$self->{leftmost} = $blocknum;
$self->{height} = $lftmost_meta1[A_HEIGHT];
$self->{root} = $lftmost_meta1[A_ROOT];
$self->{rightmost} = $lftmost_meta1[A_RIGHTMOST];
# greet @lftmost_meta1;
return (1);
}
sub insert
{
my ($self, $key, $val, $val_TBD_callback) = @_;
my $entry = $self->_makenodeentry(key => $key, value => $val);
# XXX XXX: use pack_row to invoke val TBD callback, so skip length
# check until hit insertR for this case
my $keysize = length($self->_pack_row($entry));
unless ($keysize < $self->{maxkeysize})
{
whisper "key too long\n";
return 0;
}
unless (exists($self->{root}))
{
$self->{height} = 0;
$self->{root} = $self->_makenode();
# in this implementation, the original head remains the
# leftmost leaf node of the tree, so we can start a full
# forward scan of leaves from {leftmost}
$self->{leftmost} = $self->{root};
# start with rightmost at head, reset the rightmost on splits
# if necessary
$self->{rightmost} = $self->{root};
# store additional metadata in leftmost
$self->_setMainMeta();
}
my $head = $self->{root};
my @splithead = $self->_insertR($head, $entry, $self->{height});
return 1
unless (scalar(@splithead));
my $istat;
if (scalar(@splithead) > 1)
{
$istat = shift @splithead;
my $mess1 = $ins_stat{$istat};
if ($istat =~ m/split_fail/)
{
# split failed, but insert succeeded. Most likely out of
# free blocks, but still have left over space in existing
# blocks.
whisper $mess1, "\n";
return 1;
}
if ($istat =~ m/ins_fail|no_joy/)
{
use Data::Dumper;
shift @splithead;
my $kk = shift @splithead;
my $mess2 = shift @splithead;
# insert failed - either key was too large or failed to
# split at height zero (leaf node)
my $key_info;
$key_info = ("SCALAR" eq ref($kk)) ? $kk :
Dumper($kk);
$key_info =~ s/\n/ /g ; # no newlines from dumper
$key_info =~ s/\t/ /g ; # no tabs from dumper
whisper "$mess1 for key $key_info";
whisper $mess2;
return 0;
}
}
# the head was split. splithead is a new node to the right of
# current head. Build a new head with two children - the current
# head on the left, the splithead on the right.
my $newhead = $self->_makenode();
my ($nh_arr, $nh_bce, $nh_ph) = $self->_getarr($newhead);
for my $childnode ($head, $splithead[0])
{
my ($cnarr, $cn_bce) = $self->_getarr($childnode);
my @row = $self->_unpack_row($cnarr->[0]);
my $cnentry = $self->_makenodeentry(key => $row[0],
link => $childnode);
push (@{$nh_arr}, $self->_pack_row($cnentry));
}
$self->{root} = $newhead;
$self->{height} = $self->{height} + 1;
# store additional metadata in leftmost
$self->_setMainMeta();
# greet $newhead;
# split ok even though insert failed. Successfully built a new
# head, but must report an error
if (defined($istat))
{
my $mess1 = $ins_stat{$istat};
if ($istat =~ m/split_ok/)
{
# split ok, but insert failed. Valid if key is too large
# (greater than one-half block in size)
shift @splithead;
my $kk = shift @splithead;
my $mess2 = shift @splithead;
whisper $mess1, " for key ", $kk, "\n", $mess2, "\n";
return 0;
}
}
return 1;
}
# recursive insert returns array retval
#
# possible return statuses for success:
# @retval = () -> successful insert
# @retval = (new_right) -> successful insert, but node was split
#
# possible return statuses for failure:
# @retval = ('ins_fail', undef, key, message, ...)
# @retval = ('split_ok', new_right, key, message, ...)
# @retval = ('split_fail', undef, key, message, ...)
#
#
sub _insertR
{
my ($self, $currnode, $entry, $height) = @_;
# greet $entry;
# return 0
# unless (defined($currnode));
# return 0
# unless (defined($entry));
my ($currarr, $curr_bce, $curr_ph) = $self->_getarr($currnode);
my $arrsize = scalar(@{$currarr});
my $key = $entry->[0];
my $i = 0;
my (@retval, @err_stack);
my $icmp = $self->{compare}; # get the comparison function
if (0 == $height)
{
if ($self->{unique_key})
{
# test if key already exists
my @tempo = $self->_searchR($currnode, $key, $height,
$self->{equal},
$icmp, 1);
# greet @tempo;
if (scalar(@tempo))
{ # fail due to duplicate key
@retval = ('ins_fail', $currnode, $key,
"duplicate key found");
return @retval;
}
}
$i = $self->_insert_estimate($key, $arrsize, $height, $currarr, $icmp)
if ($arrsize > 5);
for (; $i < $arrsize; $i++)
{
# break if can insert key before
my @row = $self->_unpack_row($currarr->[$i]);
last
if (&$icmp ($key, $row[0]));
# if ($key < $row[0]);
}
}
else
{
$i = $self->_insert_estimate($key, $arrsize, $height, $currarr, $icmp)
if ($arrsize > 5);
for (; $i < $arrsize; $i++)
{
# use array->[i=0] as sentinel record
my @r1;
@r1 = $self->_unpack_row($currarr->[$i + 1])
if (($i + 1) < $arrsize);
if ((($i + 1) == $arrsize) ||
(&$icmp ($key, $r1[0])))
# ($key < $r1[0]))
{
my @r2 = $self->_unpack_row($currarr->[$i]);
$i++;
# link is array for IOT case
my $link1 = ($self->{use_IOT}) ? $r2[1]->[0] : $r2[1];
# insert recursively into the link and increment i
my @newnode =
$self->_insertR($link1, $entry, $height-1);
return @retval
unless (scalar(@newnode));
# a single value is just the newnode in arr[0], else
# have an error stack
if (scalar(@newnode) > 1)
{
my $istat = $newnode[0];
# if the insert failed just return the error stack
# or if the insert succeed but the split failed
# return as well.
return @newnode
if ($istat =~ m/ins_fail|split_fail|no_joy/);
# save the old return status info
push (@err_stack, @newnode);
# the recursive insert failed, but the split
# succeeded, so we need to handle it at this
# level. newnode is at arr[1], so shift it to
# arr[0]
shift @newnode;
}
# the insert split the node below us and returned the
# new node, so we need to add an entry in current node
# for the new node.
whisper "build a new entry\n";
my ($nnarr, $nn_bce) = $self->_getarr($newnode[0]);
# build a new entry
my @r3 = $self->_unpack_row($nnarr->[0]);
$key = $r3[0];
$entry = $self->_makenodeentry(key => $r3[0],
link => $newnode[0]);
last;
}
}
}
if (0 == $height)
{ # save the statistics
$self->{statistics}->{count}++;
if ($i == $arrsize)
{ # this insert appends a new last key
$self->{statistics}->{lastkey_count}++;
$self->{statistics}->{last_was_last} = 1;
}
else
{ # last insert was not last key in index
$self->{statistics}->{last_was_last} = 0;
}
}
my $ins_ok = 1;
my $left_arr = $currarr;
my $left_ph = $curr_ph;
my $left_size = $arrsize;
my $lt_bce = $curr_bce;
my ($new_right_node, $right_arr, $right_ph);
my $right_size = 0;
my $rt_bce;
my $pack_entry = $self->_pack_row($entry);
if (0 == $height)
{ # save key size statistics
my $keysize = length($pack_entry);
if ($keysize > $self->{statistics}->{keysize}->{max})
{
$self->{statistics}->{keysize}->{max} = $keysize;
}
elsif ($keysize < $self->{statistics}->{keysize}->{min})
{
$self->{statistics}->{keysize}->{min} = $keysize;
}
}
my $preemptive_split = 0;
for my $num_tries (1..2)
{ # try to splice or push into current node,
# else split (current node becomes left, new node is right)
# and try again
$preemptive_split = 0; # split pre-emptively when block is low on space
if ($i < $left_size)
{
my $err_str;
## whisper "splice $key left\n";
$left_ph->HeSplice(\$err_str, $i, 0, $pack_entry);
if (defined($err_str))
{
my $entry_val = $entry->[1];
whisper "splice error is: [$err_str] for key: $key";
whisper " val: $entry_val , height $height\n";
$ins_ok = 0;
}
else
{
# Assumption: keys are about the same size. If the
# current splice succeeded, see if there is enough
# space to fit another key of current size. If not,
# do a pre-emptive split (which is cheaper than
# failing a splice and backing it out)
# NOTE: this is the RDBlock::_spacecheck,
# not bt2::_spacecheck
unless ($left_ph->_spacecheck(length($pack_entry)))
{
greet "preemptive split for splice, key $key";
$preemptive_split = 1;
}
}
}
else # no room to left splice
{
if ($right_size == 0)
{
# normal case -- no split.
## whisper "push $key left\n";
if (defined($left_ph->HPush($pack_entry)))
{
# Do the same key size check for the push that you
# would do for a splice. If there is not enough
# space to fit another key of current size, do a
# pre-emptive split (which is cheaper than failing
# a push and backing it out
# NOTE: this is the RDBlock::_spacecheck,
# not bt2::_spacecheck
unless ($left_ph->_spacecheck(length($pack_entry)))
{
greet "preemptive split for push, key $key";
$preemptive_split = 1;
}
}
else
{
my $entry_val = $entry->[1];
whisper "push out of space ";
whisper "for key: $key";
whisper " val: $entry_val , height $height\n";
$ins_ok = 0;
}
}
else # have a right node already
{
# since we just split the current node in half, look
# to insert the entry in the right side (since the
# offset is greater than the size of the left array) .
# subtract size of left array from offset $i to get
# offset into right array.
$i -= $left_size;
if ($i < $right_size)
{
whisper "splice $key right\n";
splice (@{$right_arr}, $i, 0, $pack_entry);
}
else
{
whisper "push $key right\n";
push ( @{$right_arr}, $pack_entry);
}
} # end have right node
} # end no room to left splice
L_ins_ok:
if ($ins_ok) # best case - key was inserted and we are happy
{
# return if we are not pre-emptively splitting, or if on
# the second pass
if (
(!$preemptive_split &&
(!$self->{maxsize} || ($arrsize < $self->{maxsize})))
|| (2 == $num_tries)
)
{
# XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX
#
# return an undef (no split) on the first pass if insert
# succeeded and we aren't pre-emptively splitting, or
# return the new right (post split) if we split the
# currnode and the insert succeeded on the second pass.
#
# XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX
if (defined($new_right_node))
{
@retval = ($new_right_node);
}
else
{
@retval = (); # insert succeed with no split
}
# if the recursive insert failed but had a successful
# split [err_stack is 'split_ok'] we need to process the
# err_stack correctly for this level.
if (scalar(@err_stack))
{ # recursive error stack
my @e2;
# need to keep original error stack because might not
# return here
push @e2, @err_stack;
# discard the ins_status and the newnode -- we will
# return our own newnode to our parent if necessary
shift @e2;
shift @e2;
if (scalar(@retval))
{
# split was successful at this level, but insert
# failed recursively, so return split ok
unshift @retval, $err_stack[0]; # prepend with err msg
push @retval, @e2; # append rest of err stack
}
else
{
# successfully updated this level, and didn't
# split, but insert failed recursively, so return
# ins_fail
push @retval, @err_stack; # append whole err stack
$retval[0] = 'ins_fail';
$retval[1] = undef;
}
} # end if recursive error stack
return @retval;
}
} # end if ins_ok
# Note: can only split current node on first pass -- if get
# here on second pass we are in trouble.
if (2 == $num_tries)
{
# XXX XXX : should never have a key larger than a
# half-empty block!!
@retval = ('ins_fail', $new_right_node, $key, "key too big!");
push @retval, @err_stack; # add the recursive error stack
return @retval;
# croak "key too big!";
}
# insert exceeded (or will exceed) space, so split the current
# node and return the new right neighbor node to our caller.
$new_right_node = $self->_bsplit($currnode, $height);
# Note: just return the new right node if the insert
# succeeded, but we split preemptively.
if ($ins_ok) # return only if insert has succeeded
{
# insert succeed, and we were splitting pre-emptively
# XXX XXX: set num_tries to 2 to force return, and go
# back to the ins_ok routine above, which handles the
# return statuses correctly.
#
# NOTE: if the split didn't succeed and we had an
# error_stack previously [split ok but insert failed] then
# will just return insert failed, which is bad enough
if (defined($new_right_node) ||
scalar(@err_stack))
{
$num_tries = 2;
goto L_ins_ok;
# return @retval
}
else
{
# the pre-emptive split failed and there was no
# previous error stack, so return split_fail
@retval = ('split_fail', undef, $key,
"pre-emptive split failed");
return @retval;
}
} # end ins_ok after pre-emptive split
unless (defined($new_right_node))
{
# we are hosed. We had to split and ran out of space.
# bsplit/makenode is supposed to be nice and fail
# prematurely in the leaf (height zero) if there are
# insufficient free blocks to split the whole tree. Pray
# that this is the case.
if (0 == $height) # split failure at 0 is an insert failure
{
@retval = ('ins_fail', $new_right_node, $key,
"split out of space");
return @retval;
}
# if we are performing concurrent operations on the tree
# we could run out of space at any point.
# crud. we need to undo the operations that got us to
# this point. We better have transaction support.
@retval = ('no_joy', $new_right_node, $key,
"split out of space");
return @retval;
}
# Note: insert failed on first pass, so we split the node. On
# the second pass we try to insert into either the left or the
# right nodes. Insert should usually succeed because both of
# these nodes are only half full.
($right_arr, $rt_bce, $right_ph) = $self->_getarr($new_right_node);
($left_arr, $lt_bce, $left_ph) = $self->_getarr($currnode);
$left_size = scalar(@{$left_arr});
$right_size = scalar(@{$right_arr});
$ins_ok = 1;
@retval = ();
} # end for num tries
return @retval;
} # end insertR
# estimate an insertion point - improve the linear scan
sub _insert_estimate
{
my ($self, $key, $arrsize, $height, $currarr, $icmp) = @_;
my $offset = 0;
my $retval = 0;
# greet $arrsize, $currarr, scalar(@{$currarr});
# $arrsize = scalar(@{$currarr});
return 0
if ($arrsize < 10);
$arrsize -= 2;
unless ($height == 0)
{
$arrsize--; # handle the sentinel record in non-leaf nodes
$offset++;
}
my @row;
# check the last position first -- speedup for insert of ascending
# sequences, like primary keys. Test if 80% of inserts were to
# end of index
if (($self->{statistics}->{last_was_last})
&& (
($self->{statistics}->{lastkey_count}/$self->{statistics}->{count})
> 0.8 ))
{
@row = $self->_unpack_row($currarr->[$arrsize+$offset]);
if (scalar(@row))
{
unless (&$icmp ($key, $row[0]))
{
# greet "lastkey match!";
return ($arrsize); # insert (append) at end of current array
}
}
}
# TODO: binary search, interpolation search
# XXX: interpolation only for numeric searching, vs insert??
# if ($icmp == $bt2numcmp)
{
use POSIX ; # need some rounding
# An iterative binary search. Note that we aren't looking for
# a match, just a start location for the linear scan in
# insertR.
my $lefty = 0;
my $righty = $arrsize;
$righty--;
# my $iter = 0;
while (1)
{
# $iter++;
last
if ($lefty >= $righty);
my $middle = POSIX::floor(($lefty+$righty)/2);
@row = $self->_unpack_row($currarr->[$middle+$offset]);
last # just kick out if some malformed row...
unless (scalar(@row));
if (&$icmp ($key, $row[0]))
{
# if key < current entry then keep moving left
# (eliminate the right interval)
$righty = $middle - 1;
}
else
{
# if key >= current entry then keep moving right
# (eliminate the left interval). Note that the return
# value for the estimate gets bumped up to the current
# position, because we can start a linear scan from
# this location
$retval = $middle;
$lefty = $middle + 1;
}
} # end while
# greet $key, $retval, $arrsize, $currarr, $iter;
}
return $retval;
}
sub _GetMeta
{
my ($self, $ph) = @_;
my @ggg;
my $row = $ph->_get_meta_row("I"); # "I" for Index
return @ggg
unless (defined($row));
return @{$row};
}
sub _SetMeta
{
my ($self, $ph, $rrow) = @_;
return ($ph->_set_meta_row("I", $rrow)); # "I" for Index
}
sub _spacecheck
{
my ($self, $height) = @_;
# degenerate case: splitting root (height zero) requires two
# additional blocks -- new head plus new sibling
my $maxsp = $height + 2;
my $spaceleft = $self->{maxblockno} - $self->{maxnodeid};
whisper "_spacecheck: need $maxsp blocks, $spaceleft left"
unless ($spaceleft > $maxsp);
return ($spaceleft > $maxsp);
}
# create a new right neighbor for the current node and split the
# contents betweeen the current and new node.
# return the new node
sub _bsplit
{
my ($self, $currnode, $height) = @_;
# XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX
# number of possible nodes created by recursive splits is
# ((self->{height} - height) + 2). If we run out of space in
# during a recursive split this could leave the tree in an
# inconsistent state, so need to check if space is available, else
# split should fail. Since we use bottom up splitting, at height
# 0 check if (self->{height} + 2) blocks are available, else fail
# the insert. With a transactional layer we can be a bit more lax
# since rolling back the transaction would restore intermediate
# split nodes. See spacecheck in makenode
# XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX
my $newnode = $self->_makenode(height => $height);
return undef
unless (defined($newnode)); # makenode ran out of space
my ($currarr, $currbce, $currph) = $self->_getarr($currnode);
my ($nn_arr, $nn_bce, $nn_ph) = $self->_getarr($newnode);
my @curr_meta1 = $self->_GetMeta($currph);
my @nn_meta1 = $self->_GetMeta($nn_ph);
$nn_meta1[A_LEFT] = $currnode;
$nn_meta1[A_RIGHT] = $curr_meta1[A_RIGHT];
# if have a right neighbor
if ($curr_meta1[A_RIGHT] =~ /\d+/)
{
my ($rt_arr, $rt_bce, $rt_ph) =
$self->_getarr($curr_meta1[A_RIGHT]);
my @rt_meta1 = $self->_GetMeta($rt_ph);
$rt_meta1[A_LEFT] = $newnode;
$self->_SetMeta($rt_ph,\@rt_meta1);
}
else
{
if (0 == $height)
{
# unless (defined($newnode->{right}))
{
# new node is rightmost if has no right neighbor
whisper "new rightmost ",$nn_meta1[A_NODEID],"\n";
$self->{rightmost} = $newnode;
if ($currnode eq $self->{leftmost})
{
# if already have leftmost, don't need an
# additional call to set the main metadata
$curr_meta1[A_RIGHTMOST] = $newnode;
}
else
{
# store additional metadata in leftmost
$self->_setMainMeta();
}
}
}
}
$curr_meta1[A_RIGHT] = $newnode;
$self->_SetMeta($currph,\@curr_meta1);
$self->_SetMeta($nn_ph,\@nn_meta1);
# finally, after the big setup, copy half of entries in the
# current node to the new right neighbor
my $arrsize = scalar(@{$currarr});
my $SplitLocation = $arrsize/2;
my $doOpt = 1; # XXX XXX: leave on - ~20% speedup for strict ascending
my $maxPct = 0.15; # 0.09; # .15
if ($doOpt
&& ($arrsize > 60) # optimize for ascending sequences...
&& ($self->{statistics}->{last_was_last})
&& (
($self->{statistics}->{lastkey_count}/$self->{statistics}->{count})
> 0.95 )
# XXX XXX: don't do this is key is large (> 15% maximum)
# && (($self->{statistics}->{keysize}->{max}/$self->{maxkeysize}) < $maxPct)
#&& (($self->{maxkeysize}/$self->{statistics}->{keysize}->{max}) > 5)
)
{
whisper "lopsided split";
# leave the current array really full instead of 1/2 full
$SplitLocation = $arrsize - 3;
}
my @newarr = splice(@{$currarr}, $SplitLocation);
push (@{$nn_arr},@newarr);
return $newnode;
} # end bsplit
sub delete
{
my ($self, $key, $value) = @_;
# Note: value is optional to do deletes with duplicate keys
my @outi = $self->search($key);
return undef
unless (scalar(@outi) > 1);
shift @outi; # key
my $outval = shift @outi;
my $nodeid = shift @outi;
my $offset = shift @outi;
my ($currarr, $curr_bce, $curr_ph) = $self->_getarr($nodeid);
unless (defined($value))
{
# greet $currarr, $offset;
my $stat = (delete ($currarr->[$offset]));
# greet $currarr;
return $outval
if (defined($stat));
return undef;
}
# XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX
# ugly hack for non-unique
# scan index, looking for a matching value...
my $place =
$self->_joinplace("A",
$nodeid,
$offset);
my ($prefix, $currnode);
while (defined($place))
{
my @row = $self->offsetFETCH($place);
last
unless (scalar(@row) > 1);
if ($row[-1] eq $value)
{
whisper "found it!";
($prefix, $currnode, $offset) = $self->_splitplace($place);
($currarr, $curr_bce, $curr_ph) = $self->_getarr($currnode);
my $stat = (delete ($currarr->[$offset]));
return $outval
if (defined($stat));
last;
}
$place = $self->offsetNEXTKEY($place);
} # end while def
return undef;
}
sub search
{
my ($self, $start_key, $f_eq, $f_cmp) = @_;
return $self->_search2($start_key, 0 , $f_eq, $f_cmp);
}
sub _search2
{
my ($self, $start_key, $nearest, $f_eq, $f_cmp) = @_;
# whoami;
my $std_search = (scalar(@_) < 4);
return undef
unless (exists($self->{root}));
my $head = $self->{root};
my $ieq = (defined($f_eq)) ? $f_eq : $self->{equal};
my $icmp = (defined($f_cmp)) ? $f_cmp : $self->{compare};
return $self->_searchR($head, $start_key, $self->{height}, $ieq, $icmp,
$std_search, $nearest);
}
# XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX
# Note that this search works for a partial match because it always
# scans from left to right. We can't just switch the equality
# comparison to a binary search and have it work correctly.
# XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX
sub _searchR
{
my ($self, $currnode, $key, $height, $ieq, $icmp,
$std_search, $near) = @_;
my $nearest = $near || 0;
L_starti:
my ($currarr, $curr_bce, $curr_ph) = $self->_getarr($currnode);
my $arrsize = scalar(@{$currarr});
# greet "near = 1", $currnode, $height, $currarr
# if $nearest;
my $i = 0;
my @retval;
if (0 == $height)
{
$i = $self->_insert_estimate($key, $arrsize, $height, $currarr, $icmp)
if ($std_search && ($arrsize > 5));
for (; $i < $arrsize; $i++)
{
my $packval = $currarr->[$i];
# next unless (defined($packval));
# break if can insert key before
my @row = $self->_unpack_row($packval);
# if ($key == $row[0]);
if (&$ieq ($key, $row[0]))
{
# key might be partial, so return row[0]
push @retval, $row[0], $row[1], $currnode, $i;
last;
}
else
{
# NOTE: check if we passed the key!!
unless (&$icmp ($row[0], $key))
# (key > row[0])
{
# whisper "passed the key!";
# greet $key, @row;
# we passed the key
push @retval, $row[0], $row[1], $currnode, $i
if ($nearest);
last;
}
}
} # end for
}
else
{
$i = $self->_insert_estimate($key, $arrsize, $height, $currarr, $icmp)
if ($std_search && ($arrsize > 5));
for (; $i < $arrsize; $i++)
{
# use array->[i=0] as sentinel record
my $packval = $currarr->[$i + 1];
# next unless (defined($packval));
my @r1;
@r1 = $self->_unpack_row($packval)
if (($i + 1) < $arrsize);
if ((($i + 1) == $arrsize) ||
(&$icmp ($key, $r1[0])))
# ($key < $r1[0]))
{
$packval = $currarr->[$i];
# XXX XXX: need to save prevkey versus just decrement
# i+1 if have possible null entries in branch nodes
# unless (defined($packval))
my @r2 = $self->_unpack_row($packval);
# link is array for IOT case
my $link1 = ($self->{use_IOT}) ? $r2[1]->[0] : $r2[1];
# search recursively
return ($self->_searchR($link1, $key, $height-1,
$ieq, $icmp,
$std_search, $nearest));
}
}
}
return @retval
if (scalar(@retval) || !$nearest);
if ($nearest && (0 == $height) && (0 == scalar(@retval)))
{
my @curr_meta1 = $self->_GetMeta($curr_ph);
whisper "search right";
# find right neighbor and keep looking
$currnode = $curr_meta1[A_RIGHT];
goto L_starti
if ((defined($currnode)) && ($currnode =~ /\d+/));
}
return @retval;
} # end searchR
sub btCLEAR
{
my $self = shift;
my $currnode = $self->{root};
return undef
unless ((defined($currnode)) && ($currnode =~ /\d+/));
return $self->_clearR($currnode, $self->{height});
}
sub _clearR
{
my ($self, $currnode, $height) = @_;
whoami $currnode;
my ($currarr, $curr_bce, $curr_ph) = $self->_getarr($currnode);
my $arrsize = scalar(@{$currarr});
my $i = 0;
my $retval;
if (0 == $height)
{
# return $curr_ph->CLEAR();
return splice(@{$currarr}); # use splice so the metadata isn't cleared
}
else
{
for (; $i < $arrsize; $i++)
{
# use array->[i=0] as sentinel record
my @r2 = $self->_unpack_row($currarr->[$i]);
# clear recursively
# link is array for IOT case
my $link1 = ($self->{use_IOT}) ? $r2[1]->[0] : $r2[1];
$retval = ($self->_clearR($link1, $height-1));
}
}
return $retval;
} # end clearR
# the array offset and hash key iterator functions take a "place"
# argument. Place arguments consist of a prefix, a node id, and a
# position. If the prefix is H for hash, then the position is a hash
# key in the pushhash tied to the current node. If the prefix is A
# for array, then the position is the array offset in the array tied
# to the current node.
our $PLACESEP = ":"; # place separator
our $PLACESEPRX = ":"; # place separator Regular eXpression
# private
sub _splitplace
{
# split into 3 parts - prefix, node, position,
# where position is either an array offset or a hash key.
# prefix is A for array, H for hash
# whoami @_;
unless ($_[1] =~ m/$PLACESEPRX/)
{
carp "could not split key: $_[1] "
if warnings::enabled();
return undef; # no separator
}
my @splitval = split(/$PLACESEPRX/,($_[1]), 3);
return @splitval;
}
sub _joinplace
{
my $self = shift;
return (join ($PLACESEP, @_));
}
sub offsetFIRSTKEY
{
my $self = shift;
my $currnode = $self->{leftmost};
return undef
unless ((defined($currnode)) && ($currnode =~ /\d+/));
return $self->offsetNEXTKEY($self->_joinplace("A", $currnode, -1));
}
sub offsetNEXTKEY
{
my ($self, $prevkey) = @_;
# whoami $prevkey;
# ASSERT PREFIX
my ($prefix, $currnode, $offset) = $self->_splitplace($prevkey);
while ((defined($currnode)) && ($currnode =~ /\d+/))
{
my ($currarr, $currbce, $currph) = $self->_getarr($currnode);
my $arrsize = scalar(@{$currarr});
$offset++;
return $self->_joinplace("A", $currnode, $offset)
if ($offset < $arrsize);
my @curr_meta1 = $self->_GetMeta($currph);
$currnode = $curr_meta1[A_RIGHT];
$offset = -1;
}
return undef;
}
# reverse iterator
sub offsetLASTKEY
{
my $self = shift;
my $currnode = $self->{rightmost};
return undef
unless ((defined($currnode)) && ($currnode =~ /\d+/));
my ($currarr, $currbce, $currph) = $self->_getarr($currnode);
my $arrsize = scalar(@{$currarr});
# XXX XXX arrsize - 1 ? or just return currnode:arrsize (no nextkey)?
return $self->offsetPREVKEY($self->_joinplace("A", $currnode, $arrsize));
}
sub offsetPREVKEY
{
my ($self, $nextkey) = @_;
# ASSERT PREFIX
my ($prefix, $currnode, $offset) = $self->_splitplace($nextkey);
while ((defined($currnode)) && ($currnode =~ /\d+/))
{
my ($currarr, $currbce, $currph) = $self->_getarr($currnode);
$offset--;
unless ($offset < scalar(@{$currarr}))
{
whisper "bad offset $offset, node $currnode";
last;
}
return $self->_joinplace("A", $currnode, $offset)
if ($offset > -1);
my @curr_meta1 = $self->_GetMeta($currph);
$currnode = $curr_meta1[A_LEFT];
last
unless ((defined($currnode)) && ($currnode =~ /\d+/));
($currarr, $currbce, $currph) = $self->_getarr($currnode);
$offset = scalar(@{$currarr});
}
return undef;
}
# hkeyFUNCTION : iterator functions using underlying RDBlock data
# entry hash keys, not the RDBArray offsets.
sub hkeyFIRSTKEY
{
my $self = shift;
my $currnode = $self->{leftmost};
return undef
unless ((defined($currnode)) && ($currnode =~ /\d+/));
return $self->hkeyNEXTKEY($self->_joinplace("H", $currnode, -1));
}
sub hkeyNEXTKEY
{
my ($self, $prevkey) = @_;
# whoami $prevkey;
# NOTE: use currph hash key, not offsets!
# ASSERT PREFIX
my ($prefix, $currnode, $hkey) = $self->_splitplace($prevkey);
while ((defined($currnode)) && ($currnode =~ /\d+/))
{
my ($currarr, $currbce, $currph) = $self->_getarr($currnode);
if ($hkey < 0)
{
$hkey = $currph->FIRSTKEY();
}
else
{
$hkey = $currph->NEXTKEY($hkey);
}
return $self->_joinplace("H", $currnode ,$hkey)
if (defined($hkey));
my @curr_meta1 = $self->_GetMeta($currph);
$currnode = $curr_meta1[A_RIGHT];
$hkey = -1;
}
return undef;
}
# reverse iterator
sub hkeyLASTKEY
{
my $self = shift;
my $currnode = $self->{rightmost};
return undef
unless ((defined($currnode)) && ($currnode =~ /\d+/));
return $self->hkeyPREVKEY($self->_joinplace("H", $currnode, -1));
}
sub hkeyPREVKEY
{
my ($self, $nextkey) = @_;
# NOTE: use currph hash key, not offsets!
# ASSERT PREFIX
my ($prefix, $currnode, $hkey) = $self->_splitplace($nextkey);
while ((defined($currnode)) && ($currnode =~ /\d+/))
{
my ($currarr, $currbce, $currph) = $self->_getarr($currnode);
if ($hkey < 0)
{
$hkey = $currph->_lastkey();
}
else
{
$hkey = $currph->_prevkey($hkey);
}
return $self->_joinplace("H", $currnode, $hkey)
if (defined($hkey));
my @curr_meta1 = $self->_GetMeta($currph);
$currnode = $curr_meta1[A_LEFT];
$hkey = -1;
}
return undef;
}
# fetch a btree "row" using the array offset
#
# NOTE: set getplace to return row value in searchR format
sub _fetch_row
{
my ($self, $place, $getplace) = @_;
# ASSERT PREFIX
my ($prefix, $currnode, $position) = $self->_splitplace($place);
while ((defined($currnode)) && ($currnode =~ /\d+/))
{
my ($currarr, $currbce, $currph) = $self->_getarr($currnode);
my @row;
if ($prefix =~ /A/) # ARRAY
{
my $offset = $position;
my $arrsize = scalar(@{$currarr});
return undef
unless ($offset < $arrsize);
@row = $self->_unpack_row($currarr->[$offset]);
}
elsif ($prefix =~ /H/) # HASH
{
my $val = $currph->FETCH($position);
return undef
unless (defined($val));
@row = $self->_unpack_row($val);
}
else
{
# XXX XXX: bad prefix
return undef;
}
# append the currnode and offset if requested to match searchR
# format
push @row, $currnode, $position
if (defined($getplace));
return @row;
}
return undef;
} # end _fetch_row
# NOTE: set getplace to return row value in searchR format
sub offsetFETCH
{
my $self = shift;
return $self->_fetch_row(@_);
}
# fetch a btree "row" using the underlying RDBlock data entry hash key
sub hkeyFETCH
{
my $self = shift;
return $self->_fetch_row(@_);
}
sub HCount
{
my $self = shift;
my $grandtot = 0;
my $currnode = $self->{leftmost};
while ((defined($currnode)) && ($currnode =~ /\d+/))
{
my ($currarr, $currbce, $currph) = $self->_getarr($currnode);
# $grandtot += $currph->HCount();
$grandtot += $currph->FETCHSIZE(); # Note: the RDBlock class isn't a
# true PushHash, so it doesn't have
# an HCount method...
my @curr_meta1 = $self->_GetMeta($currph);
$currnode = $curr_meta1[A_RIGHT];
}
return ($grandtot);
} # end HCount
# build a search handle similar to a DBI statement handle
#
sub SQLPrepare # get a DBI-style statement handle
{
# whoami;
my $self = shift;
my %optional = (ieq => $self->{equal},
icmp => $self->{compare},
BT_Fetch_Fix => 0);
my %args = (%optional,
@_); # start_key, stop_key
my $sth = Genezzo::Index::bt2_search->new(btree => $self, %args);
return $sth;
}
package Genezzo::Index::bt2_search;
use strict;
use warnings;
use Genezzo::Util;
sub _init
{
my $self = shift;
my %args = (@_);
return 0
unless (defined($args{btree}));
$self->{btree} = $args{btree};
# NOTE: start_key..stop_key is an inclusive interval -- the
# interval [1,10] is 1,2,3,4,5,6,7,8,9,10. Should find all
# duplicate values for start and stop keys as well. Use filters
# or adjust the start/stop keys as necessary for queries like:
# "select * from emp where id > 10 and id < 20"
if (exists($args{start_key}))
{
$self->{start_key} = $args{start_key};
}
if (exists($args{stop_key}))
{
$self->{stop_key} = $args{stop_key};
}
$self->{ieq} = $args{ieq};
$self->{icmp} = $args{icmp};
$self->{state} = 0;
$self->{exact_match} = 0;
if ( exists($args{start_key})
&& exists($args{stop_key}))
{
# we are looking for an exact key match if have
# identical start/stop keys
my $ieq = $args{ieq};
$self->{exact_match} = &$ieq($self->{start_key},
$self->{stop_key});
}
$self->{fetch_fix} = $args{BT_Fetch_Fix};
# greet "fetch_fix:" , $args{BT_Fetch_Fix};
return 1;
} # end init
sub new
{
# whoami;
my $invocant = shift;
my $class = ref($invocant) || $invocant ;
my $self = { };
my %args = (@_);
return undef
unless (_init($self,%args));
return bless $self, $class;
} # end new
# SQL-style execute and fetch functions
sub SQLExecute
{
my $self = shift;
$self->{state} = 1;
# XXX: define filters and fetchcols
return (1);
}
# XXX XXX XXX XXX: create a separate dynamic package to hold the fetch
# state, vs keeping the fetch state in the base btree. Then can
# maintain multiple independent SQLFetches open on same btree object.
# combine NEXTKEY and FETCH in a single operation
sub SQLFetch
{
# XXX XXX XXX XXX NOTE: must always supply equality function to
# get filtering ? Why??? Need to fix this API to support startkey/stopkey
# whoami;
my ($self, $f_eq, $k2) = @_;
# greet $f_eq, $k2;
if (0 == $self->{state})
{
# error - not started
return undef;
# NB: States are:
#
# 0 - not started
# 1 - first fetch
# 2 - subsequent fetch (and stopkey not found)
# 3 - subsequent fetch after stopkey discovered
}
while (1)
{
my @row;
if (1 == $self->{state}) # first fetch
{
if (exists($self->{start_key}))
{ # search for start key
# Note: do "nearest" search if start_key != stop_key
@row = $self->{btree}->_search2(
$self->{start_key},
!$self->{exact_match},
$self->{ieq},
$self->{icmp}
);
}
else # scan from first key
{
# whisper "no startkey";
my $bt = $self->{btree};
my $place = $bt->offsetFIRSTKEY();
last # Note: don't fetch if index is empty...
unless (defined($place));
@row = $bt->offsetFETCH($place, 1)
}
$self->{state}++; # advance the state
} # end firstfetch
else
{
# states 2, 3 - subsequent fetches
last
unless (exists($self->{SQLFetchKey})
&& defined($self->{SQLFetchKey}));
my $bt = $self->{btree};
my ($key, $val, $currnode, $offset) = @{$self->{SQLFetchKey}};
my $place =
$bt->offsetNEXTKEY(
$bt->_joinplace("A",
$currnode,
$offset));
last
unless (defined($place));
@row = $bt->offsetFETCH($place, 1);
}
# greet "rw", @row, "gg";
last
unless (scalar(@row) > 1);
# my $ieq = (defined($f_eq)) ? $f_eq : $self->{ieq};
my $ieq = $f_eq;
# XXX XXX XXX XXX XXX XXX : OBSOLETE - do filtering separately
if (defined($ieq)) # all rows must match this function
{
# if we have an equality function, (and a match key), make sure
# the next row matches
$k2 = $row[0]
unless (defined($k2));
last # EOF if no match
unless (&$ieq ($k2, $row[0]));
# if matched, save fetchkey
}
# XXX XXX: could optimize the stop key test by searching to
# find the location, then only testing when get to the
# appropriate leaf node -- maybe an api like
# searchX returns [startkey (nearest), stop_nodeid]?
# 'cos the current stop_key is just a filter...
if (exists($self->{stop_key}))
{
my $stop_key = $self->{stop_key};
my $ieq = $self->{ieq};
my $icmp = $self->{icmp};
if (3 == $self->{state}) # in a stop key region
{
# if in a stop key region and no longer matches
# stopkey then we are done
unless (&$ieq($self->{stop_key}, $row[0]))
{
# whisper "stopped!";
last;
}
}
elsif (2 == $self->{state}) # find first stop key
{
# move to state 3 if found first stop key --
# EOF when find first non-stopkey.
if (&$ieq($self->{stop_key}, $row[0]))
{
# whisper "found stopkey";
$self->{state}++;
}
else
{
# NOTE: check if we passed the key!!
unless (&$icmp ($row[0], $self->{stop_key}))
# (key > row[0])
{
whisper "passed the key";
# greet $self->{stop_key}, @row;
last;
}
}
}
else
{ # XXX XXX XXX
whisper "bad state";
last;
}
} # end if stopkey
my @foo = @row;
$self->{SQLFetchKey} = \@foo;
if ($self->{fetch_fix})
{
# Note: fixes to make btHash SQLFetch like RSTab. Make
# the sqlfetch return a standard rid/rowvalue pair, where
# the rid is the index row rid (not the table rid _trid),
# and the row value is the concatenated index key and
# value as an array. We need to re-arrange the current
# @row into a suitable format.
my @baz = @row;
my $offset = pop @baz; # remove the array offset
my $currnode = pop @baz; # remove the currnode
@row = ();
my $place = $self->_joinplace("A", $currnode, $offset);
push @row, $place;
# add _trid (single value) to index key to make a single
# array "rowvalue"
push @{$baz[0]}, $baz[1];
# push the rowvalue into the row after the rid
push @row, $baz[0];
# greet @row; # row now in key/@val format
}
return @row;
# return splice(@row, 0, 2);
last;
} # end while
delete $self->{SQLFetchKey};
return undef;
}
sub AUTOLOAD
{
my $self = shift;
my $bt = $self->{btree};
our $AUTOLOAD;
my $newfunc = $AUTOLOAD;
$newfunc =~ s/.*:://;
return if $newfunc eq 'DESTROY';
# greet $newfunc;
return ($bt->$newfunc(@_));
}
END {
}
# insert code here
1;
__END__
# Below is stub documentation for your module. You better edit it!
=head1 NAME
Genezzo::Index::bt2 - basic btree
A btree built of row directory blocks.
=head1 SYNOPSIS
use Genezzo::Index::bt?;
my $tt = Genezzo::Index::btree->new();
$tt->insert(1, "hi");
$tt->insert(7, "there");
=head1 DESCRIPTION
This btree algorithm is a bottom-up implementation based upon ideas
from Chapter 16 of "Algorithms in C++ (third edition)", by Robert
Sedgewick, 1998 and Chapter 15, "Access Paths", of "Transaction
Processing: Concepts and Techniques" by Jim Gray and Andreas Reuter,
1993. The pedagogical examples use a fixed number of entries per
node, or fixed-size keys in each block, but this implementation has
significant extensions to support variable numbers of variably-sized
keys in fixed-size disk blocks, with the associated error handling,
plus support for reverse scans.
=head1 FUNCTIONS
This package supports a constructor "new", plus standard b-tree
methods like insert, delete, search.
=head2 "new" constructor
The "new" constructor takes many arguments, but they are all optional.
If none are specified, the constructor will allocate 100 blocks of the
default size for a b-tree. The default assumption is to support
scalar string keys with a scalar string values. The tree will have a
maximum of 50 entries per node.
=over 4
=item maxsize (default 50)
The maximum number of entries in a node. If set to zero, the insert
will pack as many entries as space allows in each node
=item numblocks (default 100)
The constructor will allocate a private buffer cache for the b-tree of
up to the number of blocks specified. If numblocks=0, no cache is
created. In this case, the user must create a subclass to overload
the make_new_block and getarr methods.
=item blocksize (default DEFBLOCKSIZE)
The size of each block in the b-tree
=item key_type (null by default)
The key type is either a single scalar "c" (for char) or "n" (for
number), or a ref to an array of "c" and/or "n" values. If key_type
is specified, bt2 finds or constructs the appropriate compare/equals
and pack/unpack functions, overriding any user-supplied arguments.
If key type is not specified, bt2 processes the insert keys as a scalar
strings.
=item compare, equal (default string comparison -- ignored if key_type argument specified)
Supply methods to compare your key. This package contains
special comparison methods for numeric and multi-column keys, and
their associated packing functions.
=item pack_fn/unpack_fn (default single scalar key and value -- ignored if key_type specified)
"Packing" functions convert key/value pairs to and from a byte
representation which gets stored in the nodes of the b-tree. The
b-tree package supports scalar keys and values by default. It also
contains methods for multi-column keys with a single value.
=item use_IOT (default off)
special flag for Index Organized Tables, which means the "value" can be
an array, not a scalar. This approach requires a couple extra
checks in the branch nodes, since branches contain (key, nodeid)
pairs, and leaves contain (key, array of values). Normally, indexes
only have a scalar value: a nodeid or a rid.
=item unique_key (default off)
Enforce uniqueness (no duplicates) at insertion time
=item use_keycount (default off)
Special case for building non-unique indexes where the "value" is null
because it is already part of the key vector. In this usage, we
construct a unique index (unique_key=1) where the key vector is the
key columns *plus* the table rid, and the value is null. The key
columns might be duplicates, but the addition of the rid guarantees
uniqueness. The fetch is asymmetric: the table rid is returned as
both the last key column and the value.
Q: Why not just have a non-unique index and store the rids as regular
values?
A: This approach clusters related rids, so index scans are more
efficient and deletes are easier. Note that the basic index row
physical storage is unaffected. Only the unpack function needs an
extra argument to describe the number of key columns.
Q: But doesn't the extra comparison for the rid column make inserts
more expensive?
A: Yes, but we're trading off insert performance against index scan
performance. The workload of most database applications is typically
dominated by selects, not inserts.
=back
=head2 functions
=over 4
=item insert
=item delete
=item search
=item btCLEAR
=item hash_key/array_offset iterators: FIRSTKEY, NEXTKEY, FETCH,
plus reverse iterators LASTKEY, PREVKEY.
=item DBI-style search interface: SQLPrepare, Execute, Fetch
=back
=head2 EXPORT
none
=head1 TODO
=over 4
=item hkey/offset functions: should be able to convert between
different "place" formats (Array and Hash prefixes), like
the common fetch routine, or ASSERT that prefix matches.
=item add reverse scan to search/SQLFetch
=item support multicol keys, non-unique keys (via combo of key + rid as unique)
=item support transaction unique constraints -- probably via treat key+rid as
unique, then turn on true unique key, and scan for duplicates?
=item find out why can't do pctfree=0
=item Work on RDBlk_NN support.
=item search with startkey/stopkey support, vs supplying compare/equal methods.
restricting the search api to straight "=","<" comparisons means can
try the estimation function
=item need to handle partial startkey/stopkey comparison in searchR/SQLFetch
for multi-col keys
=item semantics of nulls in multi-col keys -- sort low?
=item simplify _pack_row with splice and a supplied split position, something
like -1 for normal indexes (n-1 key cols, 1 val col, so pop the val)
or "N=?" for index-organized tables (N key cols, M val cols, so splice N)
=item reorganize along the lines of "GiST"
Generalized Search Trees (Paul Aoki, J. Hellerstein, UCB)
=item ecount support?
=back
=head1 AUTHOR
Jeffrey I. Cohen, jcohen@genezzo.com
=head1 SEE ALSO
perl(1).
Copyright (c) 2003, 2004 Jeffrey I Cohen. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Address bug reports and comments to: jcohen@genezzo.com
For more information, please visit the Genezzo homepage
at L<http://www.genezzo.com>
=cut