The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl
#
# $Header: /Users/claude/fuzz/lib/Genezzo/Row/RCS/RSTab.pm,v 7.5 2006/10/19 09:04:13 claude Exp claude $
#
# copyright (c) 2003,2004,2005 Jeffrey I Cohen, all rights reserved, worldwide
#
#
package Genezzo::Row::RSTab;
use strict;
use warnings;
use Carp;
use Genezzo::Util;
use Genezzo::PushHash::HPHRowBlk;
use Genezzo::PushHash::PushHash;
use Genezzo::PushHash::PHFixed;
use Data::Dumper;
use Genezzo::BufCa::BCFile;
use Genezzo::SpaceMan::SMFile;
use Genezzo::Row::RSFile;
use warnings::register;

our @ISA = qw(Genezzo::PushHash::HPHRowBlk);

our $GZERR = sub {
    my %args = (@_);

    return 
        unless (exists($args{msg}));

    if (exists($args{self}))
    {
        my $self = $args{self};
        if (defined($self) && exists($self->{GZERR}))
        {
            my $err_cb = $self->{GZERR};
            return &$err_cb(%args);
        }
    }

    my $warn = 0;
    if (exists($args{severity}))
    {
        my $sev = uc($args{severity});
        $sev = 'WARNING'
            if ($sev =~ m/warn/i);

        # don't print 'INFO' prefix
        if ($args{severity} !~ m/info/i)
        {
            printf ("%s: ", $sev);
            $warn = 1;
        }

    }
    # XXX XXX XXX
    print __PACKAGE__, ": ",  $args{msg};
#    print $args{msg};
#    carp $args{msg}
#      if (warnings::enabled() && $warn);
    
};

sub make_fac1 {
    my $tclass = shift;
    my %args = (
                @_);

    if (exists($args{hashref}))
    {    
        carp "cannot supply hashref to factory method - deleting !\n"
            if warnings::enabled();

        delete $args{hashref};
    }

    my %td_hash1 = ();

    my $newfunc          = 
        sub {
#            whoami @_;
            my $tiehash1 = 
                tie %td_hash1, $tclass, %args;

            return $tiehash1;
        };
    return $newfunc;
}

# private
sub _init
{
    #whoami;
    #greet @_;
    my $self      =  shift;
    my %required  =  (
                      tablename => "no tablename !",
                      object_id => "no object id !",
                      tso       => "no tso",
                      bufcache  => "no buffer cache",
                      object_type => "no object type"
                      );
    my %optional  =  (
                      dbh_ctx  => {} # XXX XXX XXX: is this used?
                      );
    
    my %args = (@_);

    return 0
        unless (Validate(\%args, \%required));

    $self->{tablename} = $args{tablename};
    $self->{tso}       = $args{tso};
    $self->{bc}        = $args{bufcache};
    $self->{object_id} = $args{object_id};
    $self->{object_type} = $args{object_type};

    $self->{fac1} = make_fac1('Genezzo::PushHash::PushHash');

    $self->{get_filenum}      = {}; # get the filenum based upon chunkno
    $self->{get_chunkno}      = {}; # get the chunkno based upon filenum
    # preload so 0/0 works for nextkey
    $self->{get_filenum}->{0} = 0; 
    $self->{get_chunkno}->{0} = 0;

    # XXX XXX XXX : self->splitrow - if no split rows then simplify store/fetch


    # get the href info from TSTableAFU...
    my $s2 = $self->{tso}->TSTableAFU (tablename => $args{tablename},
                                       object_id => $args{object_id});

    if (defined($s2))
    {
        my $sth = shift @{$s2};
        if (defined($sth))
        {
            # XXX XXX XXX XXX 
#            greet $sth->SQLFetch();
#            greet $s2;
        }
        else
        {
            greet "no afu for $args{tablename}";
#            greet $s2;
#            greet $href;
        }
    }

    my $href = shift @{$s2};

    foreach my $i (@{ $href->{filesused} })
    {
        # XXX: use magic loadme flag
        return 0
            unless (defined(_make_new_chunk($self, $i)));
    }

    # the "small row" threshold is only a small fraction of the
    # default block size in order to avoid wasting space.  For
    # example, if the current block has 500 bytes of freespace, and
    # the packed row is 1000 bytes, it would be more efficient to
    # split the row over two blocks versus leaving 500 bytes free and
    # allocating a new block.  
    $self->{small_row} = 200; # use HSuck on rows > small_row bytes

    return 1;
}

sub TIEHASH
{ #sub new 
#    greet @_;
    my $invocant = shift;
    my $class    = ref($invocant) || $invocant ; 
    my $self     = $class->SUPER::TIEHASH(@_);

    my %args = (@_);
    return undef
        unless (_init($self,%args));

    if ((exists($args{GZERR}))
        && (defined($args{GZERR}))
        && (length($args{GZERR})))
    {
        # NOTE: don't supply our GZERR here - will get
        # recursive failure...
        $self->{GZERR} = $args{GZERR};
    }

    return bless $self, $class;

} # end new

# private routines

sub _make_new_chunk # override the hph method
{
#    whoami @_;

    my $self = shift;

    # XXX: rstab specific
    my $loadme = shift;

    my $harr   = $self->{ "Genezzo::PushHash::hph" . ":H_ARRAY"  };
    my $hcount = scalar(@{$harr});

    my $tso  = $self->{tso};

    my $fileinfo;
    if (defined ($loadme))
    {
        $fileinfo = $tso->TSFileInfo (fileno => $loadme);
    }
    else
    {
        # if we have a current file number then get the next one, else
        # don't advance
        my $neednext   = ($hcount > 0);
        my $currfileno = $tso->TS_get_fileno (nextfile => $neednext);
        
        $fileinfo = $tso->TSFileInfo (fileno => $currfileno);
    }
    return (undef)
        unless (defined($fileinfo));

    my ($filename, $filesize, $fileblks, $filenumber) = @{$fileinfo};

    my %realhash = ();

    my %args = (
                hashref   => \%realhash ,
                factory   => $self->{fac1}, 
                tablename => $self->{tablename},
                object_id => $self->{object_id},
                object_type => $self->{object_type},
                filename  => $filename,
                numbytes  => $filesize,
                numblocks => $fileblks,
                filenumber => $filenumber,
                bufcache  => $self->{bc},
                tso       => $self->{tso}
                );

    if (exists($self->{GZERR}) &&
        defined($self->{GZERR}))
    {
        $args{GZERR} = $self->{GZERR};
    }

#    if ($MAXCOUNT > 1)
 #   {
  #      return undef
   #         unless ($hcount < $MAXCOUNT);
    #}
    my $tiehash = 
        $self->{ "Genezzo::PushHash::hph" . ":PushHash_Factory"  }->(%args);

    unless (defined ($tiehash))
    {
        carp "factory could not allocate pushhash"
            if warnings::enabled();
        return undef; # factory out of hashes
    }

    # can only work with pushhash - else die
    croak "not a pushhash"
        unless ($tiehash->isa("Genezzo::PushHash::PushHash"));

    return undef # push failed
        unless( (push @{$harr}, $tiehash) > $hcount);

    # get file number as defined by initial bc file registration
    my $absolute_fileno = 
        $self->{bc}->FileReg(FileName => $filename, 
                             FileNumber => $filenumber);

    $self->{get_filenum}->{($hcount+1)}      = $absolute_fileno;
    $self->{get_chunkno}->{$absolute_fileno} = ($hcount + 1);

    {
        return undef
            unless
                $tso->TSTableUseFile (
                                      tablename  => $self->{tablename},
                                      object_id  => $self->{object_id},
                                      filenumber => $absolute_fileno,
#                                      href => $href
                                      );
        
#            push (@{$href->{filesused}}, $absolute_fileno);
    }
    
    # NOTE: treat array as 1-based (versus 0 based)
    return $harr->[$hcount];
}

sub _splitrid # override the hph method
{
    my $self = $_[0]; # no shift

    # split into 2 parts - chunkno and sliceno
    unless ($_[1] =~ m/$Genezzo::PushHash::hph::RIDSEPRX/)
    {
        my $msg = "could not split key: $_[1] \n";
        my %earg = (self => $self, msg => $msg,
                    severity => 'warn');
        
        &$GZERR(%earg)
            if (defined($GZERR));

        return undef; # no rid separator
    }
    my @splitval = split(/$Genezzo::PushHash::hph::RIDSEPRX/,($_[1]), 2);

    # given an input rid based upon the absolute file number, convert
    # to chunk number
    my $absfileno = $splitval[0];
#    whisper "chunkno  $_[0]->{get_chunkno}->{$absfileno}";

    $splitval[0] = $_[0]->{get_chunkno}->{$absfileno};

    return @splitval;
}

sub _joinrid # override the hph method
{
    my $self = shift;

    my @args = @_;

    my $chunkno = $args[0];

#    whisper "fileno $self->{get_filenum}->{$chunkno}";

    # convert the internal chunknumber to the absolute file number
    $args[0] = $self->{get_filenum}->{$chunkno};

    return (join ($Genezzo::PushHash::hph::RIDSEP, @args));
}

# Constraint Methods

# get/set the constraint list
sub _constraint_check
{
#    whoami;
    my $self = shift;

#    greet $self->{tablename};

    $self->{constraint_list} = shift if @_ ;

    return $self->{constraint_list};
}

# check constraint when inserting new row
sub check_insert
{
    my $self = shift;

    return 0
        unless (exists($self->{constraint_list}));

    my $cons_list = $self->{constraint_list};

    return 0
        unless (defined($cons_list) &&
                exists($cons_list->{check_insert}));

    my $cci_check = $cons_list->{check_insert};
    
    my $val;
    # be very paranoid - filter might be invalid perl
    eval {$val = &$cci_check(@_) };

    # XXX XXX: figure out name of defective constraint
    if ($@)
    {
        whisper "check constraint blew up: $@";
        greet  $cci_check;
        my $msg = "bad check constraint: $@\n";
        my %earg = (self => $self, msg => $msg,
                    severity => 'warn');
        
        &$GZERR(%earg)
            if (defined($GZERR));

        return 1; #### undef;
    }

    return ($val);
} # end check insert

# insert new values into index when inserting new row
sub index_insert
{
    my $self = shift;

    return 0
        unless (exists($self->{constraint_list}));

    my $cons_list = $self->{constraint_list};

    return 0
        unless (defined($cons_list) &&
                exists($cons_list->{index_insert}));

    my $cci = $cons_list->{index_insert};

    return (&$cci(@_));
} # end index insert

sub HSuck
{
#    whoami;
    my $self = shift;
    my %args = (
                @_);

    # CONSTRAINT

#    greet %args;
    return undef
        unless (defined($args{value}));

    my $val = $args{value};
    my $off = (defined($args{offset})) ? 
        $args{offset} : scalar(@{$val}); # offsets are 1 based, not 0
    my $next = $args{next};
    my $headless = $args{headless}; # set if not a true row head piece

    $next = ':' . $next
        if (defined($next));

    my ($place, $frag, $blk_place, $newoff);

    my $firsttry = 1;

  L_whileoff:
    while (defined($off))
    {
        if ($firsttry)
        { # try to fit row piece in current block
            $blk_place = $self->_get_current_block();
        }
        else
        {
            $blk_place = $self->_make_new_block();
        }

        last
            unless (defined($blk_place));

        my ($blktie, $blocknum, $bceref, $href_tie) = 
            $self->_get_block_and_bce($blk_place);

        # Note: we don't pack the value here like 
        # HPush/STORE -- packing is performed at the block level.
        # XXX XXX: need a way to specify a packing method for the
        # underlying HSuck
        ($place, $newoff, $frag) = $blktie->HSuck (
                                                   value  => $val,
                                                   next   => $next,
                                                   offset => $off,
                                                   headless => $headless
                                                   );

        unless (defined($place))
        {
            # allow failure on first try - current block might be too
            # small to hold any part of split row, but new block
            # should always be able to take a piece.

            if ($firsttry)
            { 
                $firsttry = 0;
                next  L_whileoff; # try again in new block
            }

            # XXX XXX: should store list of inserts so can delete
            # pieces if run out of space
            return undef;
        }
        $firsttry = 0;

        if (defined($frag))
        {
            $next = $frag . ":";
        }
        else
        {
            $next = ":";
        }
        # build next pointer as fully qualified rid -- set block_place
        # final sliceno to $place
        my $sloty = '(0$)';         # XXX : replace trailing zero
        $next .=  $blk_place;
        $next =~ s/$sloty/$place/;

        # offset is now new offset
        $off = $newoff;

    } # end while

    my @outi;

    if (defined($blk_place) && defined($place))
    {
        my $slotyy = '(0$)';            # XXX : replace trailing zero
        $next =  $blk_place;
        $next =~ s/$slotyy/$place/;
    }
    else
    {
        # XXX XXX: is this bad?  Probably not.  
        greet "bad:", $firsttry, $blk_place, $place, $off, $next;
    }

    push @outi, $next;
    if (defined($off))
    {
        push @outi, $off;
    }
    if (defined($frag))
    {
        push @outi, $frag; # column was fragmented
    }

    return @outi;
} # end HSuck

# HPush public method (not part of standard hash)
sub HPush
{
    my ($self, $value) = @_;

    my $toobig  = 0;
    my $maxsize = 2 * $self->{small_row}; 

    # XXX XXX: need a "mutating" constraint to support default values
    # for null columns, numeric precision, column uppercasing, etc.

    # CONSTRAINT - "check constraints" first

    if ($self->check_insert($value, undef, $self->{tablename}))
    {
        # Note: $place is undefined, because we haven't pushed
        # yet.  But might need it for check constraints that
        # require the rid (very very improbable - should probably
        # be illegal)

        {
            # Check constraint FAILED
            return undef;
        }
    }

    my $packstr = PackRowCheck($value, $maxsize);

    if (defined($packstr))
    {
        # fit maybe
    }
    else
    {
        $toobig = 1;
    }
    my $place = $self->_localHPush($packstr, $value, $toobig);

    return undef
        unless (defined($place));

    # CONSTRAINT - index_inserts, like primary/unique key.  Need to do
    # after the push because need a rid for the index.  maybe
    # restructure so don't have to push huge rows that would violate
    # constraint.  bt->insert_maybe with localhpush callback to
    # generate rid
    #
    # val TBD callback needs a place_ref so subsequent cons_lists can
    # get correct place, and so HPush can return place to caller
    #
    # cons_list ($value, null, val_TBD_cb, place_ref)

    if ($self->index_insert($value, $place))
    {
        {
            whisper "undo insert!!";
            # and don't specify the constraint check.  Why?
            # Because if the insert failed with a duplicate key,
            # then delete will remove it from the index.
            $self->_localDELETE($place);
            return undef;
        }
    }

    return $place;
}

# count estimation
sub FirstCount 
{
#    whoami;
    my $self = shift;

    my $key = $self->FIRSTKEY();

    my @outi;
    push @outi, $key;

    return $self->NextCount(@outi); 

} # FirstCount

# count estimation
sub NextCount
{
#    whoami;
    my ($self, $prevkey, $esttot, $sum, $sumsq, $chunkcount, $totchunk) = @_;

    return undef
        unless (defined($prevkey));

    my ($chunkno, $prevsliceno) = $self->_splitrid($prevkey);
    my $chunk;

    unless (defined($esttot))
    {
#        greet "first first";

        $prevsliceno = ();
        ($esttot, $sum, $sumsq, $chunkcount, $totchunk) = (0,0,0,0,0);
        $chunkno = $self->_First_Chunkno();

        while (defined($chunkno))
        {
            $chunk = $self->_get_a_chunk($chunkno);
            my @foo = $chunk->FirstCount();

#            greet @foo;
            #                        XXX XXX: why not defined?
            if ((scalar(@foo) > 4) && defined($foo[-1]))
            {
                $totchunk += $foo[-1];
            }
            $chunkno = $self->_Next_Chunkno($chunkno);
        }
        $chunkno = $self->_First_Chunkno();
    }

    my @outi;
#    push @outi, $prevkey, $esttot, $sum, $sumsq, $chunkcnt, $totchunk;

    my $quitLoop = 1; # XXX XXX 
    my $loopCnt  = 0;

    while (defined($chunkno))
    {
        $chunk = $self->_get_a_chunk($chunkno);
        $loopCnt++;

        if (defined ($prevsliceno))
        {
            @outi = $chunk->NextCount($prevsliceno, 
                                      $esttot, $sum, $sumsq,
                                      $chunkcount, $totchunk);
        }
        else
        {
            my @oldouti = @outi;

            @outi = $chunk->FirstCount();
            if (scalar(@outi))
            {
                $outi[2]  += $sum;
                $outi[3]  += $sumsq;
                $outi[4]  += $chunkcount;
                $outi[5]  = $totchunk;

#    $esttot = $sum * ($totchunk/$chunkcount)
#        if (($sum > 0) && ($chunkcount > 0) && ($totchunk > 0));
                
                $outi[1]  = $outi[2] * ($totchunk/$outi[4])
                    if (($outi[2] > 0) && ($outi[4] > 0) 
                        && ($totchunk > 0));

    # current sum + (current avg * remaining chunks)
#           $outi[1] = $outi[2] + (($outi[2]/$outi[4])*($outi[5]-$outi[4]))


            }
            else
            {
                @outi = @oldouti;
            }
        }

        last
            if ($quitLoop && scalar(@outi) && defined($outi[0]));

        $prevsliceno = ();
        $chunkno = $self->_Next_Chunkno($chunkno);

        # XXX XXX: add logic here
        $quitLoop = 1
            if $loopCnt > 10;
    } # end while chunkno

    return @outi
        unless (scalar(@outi) && defined($chunkno) && defined($outi[0]));

    my $sliceno = shift @outi;
    unshift @outi, $self->_joinrid($chunkno, $sliceno);

    return @outi;
} # nextcount


sub _localHPush
{
    my ($self, $packstr, $value, $toobig) = @_;

    # NOTE: space management busted here -- pushing a huge row 
    # (packstr >= blocksize) bounces off the new block, which then
    # allocates a new file.  Avoid the issue by only pushing small
    # strings
    if (!($toobig) && 
        (length($packstr) < $self->{small_row}))
    {
        my $place = $self->SUPER::HPush($packstr);

        return $place
            if (defined($place));
    }

    # if string was too big, or push failed, try HSuck
    my @stat = $self->HSuck (value => $value);

    # null stat for failure, or extra cols indicates partial pack --
    # should see a single column for complete pack.
    return undef
        unless (scalar(@stat) == 1);

    return $stat[0]; # rid for first rowpiece
}

sub STORE
{
    my ($self, $place, $value) = @_;

    # CONSTRAINT
    my $cons_list = $self->{constraint_list};
    my ($cci_index, $ccu, $ccd);
    my @cc_op_list; # delete/insert operations

    if (defined($cons_list))
    {
        $ccu = $cons_list->{update};
        $ccd = $cons_list->{delete};
        $cci_index = $cons_list->{index_insert};
    }

    # CONSTRAINT - do "check constraints" before update
    if ($self->check_insert($value, $place, $self->{tablename}))
    {
        # Check constraint FAILED
        return undef;
    }

    my $oldvalue;

    if ($place !~ m/^PUSH$/)
    {
        if (defined($cci_index))
        {
            # clear out the insert callback -- ccu will generate an
            # op_list of delete/inserts if necessary.
            $cci_index = (); 
        }

        if (defined($ccu))
        {
            $oldvalue = $self->FETCH($place);

            if (&$ccu($value, $oldvalue, $place, \@cc_op_list))
            {
                whisper "updated key - delete from index";
                greet @cc_op_list;
                greet $place, $value, $oldvalue;
#                greet &$ccd($oldvalue, $place);
            }
            else
            {
                whisper "keys match - index not updated";
                $oldvalue  = (); # keys match - don't keep old value 
            } 
        }
    } # end !push

    # 2 cases: 
    # either PUSHing a new key *or* 
    # the old keys didn't match so we deleted them from the index.
    #
    # NOTE that we do the "index_insert" before the update, because the
    # rid exists already.  

    my $maxj;

    if (defined($cci_index))
    {
        return undef
            if (&$cci_index($value, $place));
    }
    elsif (scalar(@cc_op_list))
    {
        my $maxi = scalar(@cc_op_list);
     
        for my $i (0..($maxi - 1))
        {
            my $opv = $cc_op_list[$i];

            greet $opv;
            my $cnam = $opv->[0];
            my $del1 = $opv->[1];
            my $ins1 = $opv->[2];

            if (defined($del1) && defined($oldvalue))
            {
                greet &$del1($oldvalue, $place);
            }

            if (defined($ins1))
            {
                if (&$ins1($value, $place))
                {
                    # Index insert FAILED - revert indexes to old state
                    $maxj = $i;
                    if (defined($oldvalue)) # had an old key
                    {
                        whisper " attempt to restore old value";
                        &$ins1($oldvalue, $place);
                    }
                    return undef 
                        unless ($i); # we're done if only a single index
                    last;
                }

            }

        }
    }

    if (defined($maxj))
    {
        for my $j (0..($maxj - 1))
        {
            my $opv = $cc_op_list[$j];

            greet $opv;
            my $cnam = $opv->[0];
            my $del1 = $opv->[1];
            my $ins1 = $opv->[2];

            if (defined($del1))
            {
                # delete the new value
                greet &$del1($value, $place);
            }

            if (defined($ins1)  && defined($oldvalue))
            {
                # restore the old value if possible
                if (&$ins1($oldvalue, $place))
                {
                    greet "really screwed up, sorry!";
                    my $msg = "Serious error during update!!\n";
                    my %earg = (self => $self, msg => $msg,
                                severity => 'warn');
        
                    &$GZERR(%earg)
                        if (defined($GZERR));

                    return undef;
                }
            }
        } # end for
        return undef;
    } # end if maxj
            

    my $stat = $self->_localStore($place, $value);
    
    if (!defined($stat))
    {
        if (defined($cci_index))
        {
            # localStore FAILED - revert indexes to old state

            whisper "delete new key from index";
            &$ccd($value, $place);
            if (defined($oldvalue))
            {
                whisper " attempt to restore old value";
                &$cci_index($oldvalue, $place);
            }
        }
        elsif (scalar(@cc_op_list))
        {
            my $maxi = scalar(@cc_op_list);
     
            for my $i (0..($maxi - 1))
            {
                my $opv = $cc_op_list[$i];

                greet $opv;
                my $cnam = $opv->[0];
                my $del1 = $opv->[1];
                my $ins1 = $opv->[2];

                if (defined($del1))
                {
                    greet &$del1($value, $place);
                }

                if (defined($ins1) && defined($oldvalue))
                {
                    if (&$ins1($oldvalue, $place))
                    {
                        whisper "oh gosh!";
                        my $msg = "Really serious error during update!!\n";
                        my %earg = (self => $self, msg => $msg,
                                    severity => 'warn');
                        
                        &$GZERR(%earg)
                            if (defined($GZERR));
                    }
                }
            } # end for
        }
    } # end if not defined stat

    return ($stat);
}

# if setfwdptr (set Forwarding pointer), then only storing an empty
# header with pointer to the first rowpiece.
sub _localStore
{
#    whoami;
    my ($self, $place, $value) = @_;

    my $toobig  = 0;
    my $oldsize = 0;
    my @estat   = $self->_exists2($place); # HPHRowBlk method

    if (   (scalar(@estat) > 2) 
        && Genezzo::Block::RDBlock::_isheadrow($estat[0])
        && Genezzo::Block::RDBlock::_istailrow($estat[0])
           )
    {
        # if the row already exists as a single, contiguous buffer,
        # set maxsize for PackRowCheck to determine if can update in
        # place.
        $oldsize = $estat[2];
    }
    my $maxsize = 2 * $self->{small_row}; 

#    greet "max: $maxsize, old: $oldsize";

    $maxsize = $oldsize
        if ($oldsize > $maxsize);

    my $packstr = PackRowCheck($value, $maxsize);

#    greet "packstr: ", length($packstr) if (defined($packstr));

    unless ($oldsize)
    {
        # keep maxsize a bit small unless space is already allocated
        $maxsize = $self->{small_row}; 
    }

    if (defined($packstr))
    {
#        greet "fit maybe";
    }
    else
    {
#        greet "toobig";
        $toobig = 1;
    }

    if ($place =~ m/^PUSH$/)
    {
        $place = $self->_localHPush($packstr, $value, $toobig);
        return undef 
            unless (defined($place));
        return $value;
    }

    # XXX XXX: race condition here -- would need to lock row to ensure
    # that rowstat doesn't change.  Probably need a new STORE with the
    # specification to only update if both head and tail, and the
    # _exists2 is merely a hint.

    # estat = ($rowstat, $rowposn, $rowlen)
    @estat = $self->_exists2($place); # HPHRowBlk method

    if (   (scalar(@estat) < 3) # no such row or bad rid (so fail on STORE)
        # or the row fits in a block   
        ||
           (   (scalar(@estat) > 2) 
            && Genezzo::Block::RDBlock::_isheadrow($estat[0])
            && Genezzo::Block::RDBlock::_istailrow($estat[0])
            && !($toobig)                    # toobig check
            && $estat[2] >= length($packstr) # see if still fits   
           )
        )
    {
        if (!($toobig)
            && (length($packstr) <= $maxsize)) # avoid long rows
        {
#            greet "should fit";
            my $stat = $self->SUPER::STORE($place, $packstr);
            return $value
                if (defined($stat));
            return undef;
        }
    }

    # set up arguments for HSuck.  HSuck new value as "headless",
    # since it gets stored as a continuation of the first piece of the
    # old row
    my %nargs = (value    => $value,
                 headless => 1); 

    # XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX 
    # if the original row is split over multiple blocks, then push the
    # new value and replace the first piece of the original row with a
    # forwarding pointer to the new value
    #
    # It's not an optimal or efficient replacement strategy, but it is
    # straightforward.  Need to figure out how to:
    #   1. re-use freed space from old row
    #   2. figure out how to update individual columns more
    #      efficiently.  One possible variation is do current replacement
    #      strategy starting at column N, i.e., if updating column N,
    #      truncate the row from column N onward and stick the forwarding
    #      pointer at col N.
    #
    # XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX 

    my $split_row = !(Genezzo::Block::RDBlock::_istailrow($estat[0]));

    # suck new value first, so we still have the old row if it fails...
    
    # sukk = (newplace, offset, frag flag)
    my @sukk = $self->HSuck (%nargs);
    # XXX XXX: HSuck could fail here - that would be bad...
    greet "sukk:", @sukk;

    return undef # XXX XXX: need to test this...
        unless (scalar(@sukk) && (defined($sukk[0])));

    # if the old value is split over multiple blocks
    if ($split_row)
    {
        whisper "long row";

        # use localFetchDelete to delete the existing row pieces, but
        # keep the first piece and mark it as complete (both head and
        # tail bits get set).  Note: First piece *must* be large
        # enough to hold the forwarding pointer.

        my $val = $self->_localFetchDelete($place, 1, 1); # delete trailing 
                                                          # pieces, 
                                                          # but keep head 
        # now only the first piece is left, 
#        greet "oldval: ", $val;
    }

    {
        # create a forwarding pointer to the row.  It needs to an
        # empty col1 fragment that points to col1 in the new row.
        my @fakerow;
        push @fakerow, ""; # blank col1
        push @fakerow, "F:".$sukk[0];

        my $packfake = PackRow(\@fakerow);
        my $sstat = $self->SUPER::STORE($place, $packfake);

        # XXX XXX: STORE could fail here too - that would be bad

        # XXX: don't call localstore recursively...

####     my $sstat = $self->_localStore($place, \@fakerow, 1);
#        greet "stat:", $sstat;
#        greet $self->_exists2($sukk[0]); # HPHRowBlk method
#        greet "e1:", @estat;

        # clear the tail flag of the first piece, since it now points to
        # the new value
        $estat[0] &= ~($Genezzo::Block::RDBlock::RowStats{tail});
        @estat = $self->_exists2($place, $estat[0]); # HPHRowBlk method
#        greet "e2:", @estat;
    }
    return $value;
}

#sub FETCH    
#{ 
#    my ($self, $place) = @_;
#    
#    my $value = ($self->SUPER::FETCH($place));
#
#    return (undef)
#        unless (defined($value));
#
#    my @outarr = UnPackRow($value);
#    
#    return (\@outarr);
#
#}
sub FETCH    
{ 
    my ($self, $place) = @_;

    # CONSTRAINT
    
    return $self->_localFetchDelete($place);
}

# if doDelete is set then perform recursive delete on chained rows
sub _localFetchDelete
{ 
    my ($self, $place, $doDelete, $keepHead) = @_;

    # XXX : should be able to do FETCH directly if know have
    # information that no split rows...
    # XXX XXX: maybe ok to piggy-back fetch2 into normal FETCH?

    # fetcha = (value, rowstat)
    my @fetcha = $self->_fetch2($place); # HPHRowBlk method

    # XXX: check status?
    my $deleteStatus;
    if (defined($doDelete))
    {    
#        greet "a", $keepHead, $place, @fetcha;
        unless (defined($keepHead))
        {
            $deleteStatus = $self->SUPER::DELETE($place);
        }
    }

    return undef
        unless (   (scalar(@fetcha) > 1)
                && defined($fetcha[0]) # value
                && defined($fetcha[1]) # rowstat
                && Genezzo::Block::RDBlock::_isheadrow($fetcha[1]));
    
    my @rowpiece = 
        UnPackRow($fetcha[0], 
                  $Genezzo::Util::UNPACK_TEMPL_ARR); # first row piece 
    
    # Note: just return if row was not split.  Avoid the extra push in
    # the while loop
    return (\@rowpiece)
        if (Genezzo::Block::RDBlock::_istailrow($fetcha[1]));

    if (defined($doDelete) && defined($keepHead))
    {
        # keep the head, but define it as a completed piece
#        greet "del, keephead", @fetcha;
        my @estat = $self->_exists2($place, 
                                    (
                                     $fetcha[1] |
                                     ($Genezzo::Block::RDBlock::RowStats{tail})
                                     )
                                    ); # HPHRowBlk method
        # Note: fetcha[1] ("rowstat") is checked to terminate while loop, 
        # so don't update it here!
#        greet @estat;
    }

    my $gotFrag = 0;
    my @outarr; # current rowpiece loaded into outarr in while loop

    # Fetch the remaining row pieces, and re-assemble the row.  If the
    # piece isn't the tail (end) of the row, the last column is a
    # "next pointer", a pointer to the next piece, with a flag which
    # indicates whether the last column (the real last column, not the
    # aforementioned next pointer) was split.

    my $piececount = 0;

  L_rowpiece:
    while (1)
    {
        if ($gotFrag)
        { # column was fragmented - merge the next column piece 
            my $h1 = shift @rowpiece;
            $outarr[-1] .= $h1; # append remainder to end of last column
        }
        
        # append next set of columns to existing row
        push @outarr, @rowpiece;

#        greet "count:", $piececount, @outarr;

        last L_rowpiece # done when last piece of row is fetched
            if (Genezzo::Block::RDBlock::_istailrow($fetcha[1]));

        my $nextp = pop @outarr; # last column was pointer to next piece,
                                 # so remove it from output

        # check next pointer to see if column was fragmented (split)
        my ($frag, $pieceplace) = split(':', $nextp);

        # XXX XXX: clean this up - centralize knowledge of frag flag somewhere
        $gotFrag = (defined($frag)) && ($frag =~ m/F/);

        # get the next piece
        @fetcha   = $self->_fetch2($pieceplace);

        $piececount++;

        unless (   (scalar(@fetcha) > 1)
                && defined($fetcha[0]) 
                && defined($fetcha[1]) 
               )
        { # ERROR: remainder of row not found
            if  (scalar(@outarr))
            {
 #               greet "b", $pieceplace, @fetcha;
 #               greet @outarr;
                my $tname = $self->{tablename};
                whisper "table $tname: malformed row $place at $pieceplace";

                my $msg = "table $tname: malformed row $place at " .
                    "$pieceplace\n";
                my %earg = (self => $self, msg => $msg,
                            severity => 'warn');
        
                &$GZERR(%earg)
                    if (defined($GZERR));
            }
            return undef;
        }
        if (defined($doDelete))
        {
            # XXX: should check delete status here
#            greet "c", $pieceplace, @fetcha;
            whisper "delete piece $pieceplace";
            $deleteStatus = $self->SUPER::DELETE($pieceplace);
#            greet $deleteStatus;
        }

        @rowpiece = UnPackRow($fetcha[0], $Genezzo::Util::UNPACK_TEMPL_ARR); 
    } # end while l_rowpiece

    return (\@outarr);
} # end localFetchDelete  

sub DELETE
{
#    whoami;
    my ($self, $place) = @_;

    # CONSTRAINT
    my $cons_list = $self->{constraint_list};

    return $self->_localDELETE($place, $cons_list);
}
sub _localDELETE
{
#    whoami;
    my ($self, $place, $cons_list) = @_;

    # CONSTRAINT

    # XXX XXX XXX XXX: need to support foreign key constraint, also
    # DELETE CASCADE

    my $ccd;

    if (defined($cons_list))
    {
        $ccd = $cons_list->{delete};
    }

    my @estat = $self->_exists2($place); # HPHRowBlk method

    if (   (scalar(@estat) < 3) # no such row or bad rid
        # or the row fits in a block   
        || (   (scalar(@estat) > 2) 
            && Genezzo::Block::RDBlock::_isheadrow($estat[0])
            && Genezzo::Block::RDBlock::_istailrow($estat[0])
           )
        )
    {
        my $val = $self->SUPER::DELETE($place);

        return undef
            unless (defined($val));

        if (defined($ccd))
        {
            my @outarr = UnPackRow($val, $Genezzo::Util::UNPACK_TEMPL_ARR);
            &$ccd(\@outarr, $place); # delete from index
            return (\@outarr);
        }
        else
        {
            return (UnPackRow($val, $Genezzo::Util::UNPACK_TEMPL_ARR));
        }
    }

    # fetch the pieces and delete them
    if (defined($ccd))
    {
        my @outarr =  $self->_localFetchDelete($place, 1);
        &$ccd(\@outarr, $place);
        return (\@outarr);
    }
    else
    {
        return $self->_localFetchDelete($place, 1);
    }

} # end DELETE

sub SQLPrepare # get a DBI-style statement handle
{
    my $self = shift;
    my %args = @_;
    $args{pushhash} = $self;
    $args{tablename} = $self->{tablename};

    if ((exists($self->{GZERR}))
        && (defined($self->{GZERR})))
    {
        $args{GZERR} = $self->{GZERR};
    }

    my $sth = Genezzo::Row::SQL_RSTab->new(%args);

    return $sth;
}

package Genezzo::Row::SQL_RSTab;
use strict;
use warnings;
use Genezzo::Util;

sub _init
{
    my $self = shift;
    my %args = (@_);

    return 0
        unless (defined($args{pushhash}));
    $self->{pushhash} = $args{pushhash};
    $self->{tablename} = $args{tablename};
    if (defined($args{alias}))
    {
        $self->{tablename} = $args{alias};
    }

    if (defined($args{filter}))
    {
        $self->{SQLFilter} = $args{filter}; 
        my $ff = $args{filter}; 
#        greet $ff;

        my $cons_list = $self->{pushhash}->_constraint_check();

        if (defined($cons_list))
        {
            my @both_keys = Genezzo::Util::GetIndexKeys($ff);
            
            if ((scalar(@both_keys) > 1) 
                && (exists($cons_list->{SQLPrepare})))
            {
#                greet @both_keys; # keys in table colidx order

                my $get_search = $cons_list->{SQLPrepare};

                # prepare an index search if have startkey/stopkey

                my $searchhandle = &$get_search(@both_keys);

                $self->{IndexSth} = $searchhandle
                    if (defined($searchhandle));
            }
        }
    }
    $self->{rownum} = 0;

    return 1;
}

sub new
{
 #   whoami;
    my $invocant = shift;
    my $class = ref($invocant) || $invocant ; 
    my $self = { };

    my %args = (@_);

    if ((exists($args{GZERR}))
        && (defined($args{GZERR}))
        && (length($args{GZERR})))
    {
        # NOTE: don't supply our GZERR here - will get
        # recursive failure...
        $self->{GZERR} = $args{GZERR};
    }

    return undef
        unless (_init($self,%args));

    return bless $self, $class;

} # end new


# SQL-style execute and fetch functions
sub SQLExecute
{
    my ($self, $filter) = @_;

#    $self->{SQLFilter} = $filter; # check this
    if (exists($self->{IndexSth}))
    {
        greet "index execute";

        $self->{SQLFetchKey} = 1;
        return $self->{IndexSth}->SQLExecute();
    }

    $self->{SQLFetchKey} = $self->FIRSTKEY();

    # XXX: define filters and fetchcols
    return (1);
}

# XXX XXX XXX XXX:  create a separate dynamic package to
# hold the fetch state, vs keeping the fetch state in the base
# pushhash.  Then can maintain multiple independent SQLFetches open
# on same RSTab object.

# combine NEXTKEY and FETCH in a single operation
sub SQLFetch
{
    my ($self, $key) = @_;
    my $fullfilter = $self->{SQLFilter};
    my $filter = (defined($fullfilter)) ? $fullfilter->{filter} : undef;

    # use explicit key if necessary
#    $self->{SQLFetchKey} = $key
#        if (defined($key));

    while (defined($self->{SQLFetchKey}))
    {
        my $currkey;

        if (exists($self->{IndexSth}))
        {
#            greet "index fetch";

            my @idx_row = $self->{IndexSth}->SQLFetch();

#            greet @idx_row;

            unless (scalar(@idx_row) > 1)
            {
                $self->{SQLFetchKey} = undef;
                return undef;
            }

            pop @idx_row; # remove extra search cols
            pop @idx_row;
            
            $currkey = pop @idx_row; # val is rowid for table
        }
        else
        {
            $currkey = $self->{SQLFetchKey};
        }

        my $outarr  = $self->FETCH($currkey);
        my $tablename = $self->{tablename};
        my $get_col_alias = {$tablename => $outarr};

        # save the value of the key because we pre-advance to the next one
        $self->{SQLFetchKey} = $self->NEXTKEY($currkey)
            unless (exists($self->{IndexSth}));        

        # Note: always return the rid
        return ($currkey, $outarr)
            unless (defined($filter));

        # filter is defined
        my $val;

        my $rownum = $self->{rownum} + 1;
        # be very paranoid - filter might be invalid perl
        eval {$val = &$filter($self, $currkey, $outarr, 
                              $get_col_alias, $rownum) };
        if ($@)
        {
            whisper "filter blew up: $@";
            greet   $fullfilter;

            my $msg = "bad filter: $@\n" ;
#            $msg .= Dumper($fullfilter)
#               if (defined($fullfilter));
            my %earg = (self => $self, msg => $msg,
                        severity => 'warn');
            
            &$GZERR(%earg)
                if (defined($GZERR));

            return undef;
        }
        unless (!$val)
        {
            $self->{rownum} += 1;
            return ($currkey, $outarr);
        }

    }

    return undef;
}

sub AUTOLOAD 
{
    my $self = shift;
    my $ph = $self->{pushhash};

    our $AUTOLOAD;
    my $newfunc = $AUTOLOAD;
    $newfunc =~ s/.*:://;
    return if $newfunc eq 'DESTROY';

#    greet $newfunc;
    return ($ph->$newfunc(@_));
}

END {

}

1;

__END__
# Below is stub documentation for your module. You better edit it!

=head1 NAME

Genezzo::Row::RSTab.pm - Row Source TABle tied hash class. 

=head1 SYNOPSIS

 use Genezzo::Row::RSTab;

 # see Tablespace.pm -- implementation and usage is tightly tied
 # to genezzo engine...

 # make a factory for rsfile
 my $fac2 = make_fac2('Genezzo::Row::RSFile');
        
 my %args = (
             factory   => $fac2,
             # need tablename, bufcache, etc...
             tablename => ...
             tso       => ...
             bufcache  => ...
                    );

  my %td_hash;
  $tie_val = 
    tie %td_hash, 'Genezzo::Row::RSTab', %args;

 # pushhash style 
 my @rowarr = ("this is a test", "and this is too");
 my $newkey = $tie_val->HPush(\@rowarr);

 @rowarr = ("update this entry", "and this is too");
 $tied_hash{$newkey} = \@rowarr;

 my $getcount = $tie_val->HCount();

=head1 DESCRIPTION

RSTab is a hierarchical pushhash (see L<Genezzo::PushHash::hph>) class
that stores perl arrays as rows in a table, writing them into a block
(byte buffer) via B<Genezzo::Row::RSFile> and B<Genezzo::Block::RDBlock>.

=head1 ARGUMENTS

=over 4

=item tablename
(Required) - the name of the table

=item tso
(Required) - tablespace object from B<Genezzo::Tablespace>

=item bufcache
(Required) - buffer cache object from B<Genezzo::BufCa::BCFile>


=back


=head1 CONCEPTS

Logically, a table is made of rows, and rows are vectors of columns.
Physically (at least from an OS implementation viewpoint), a table is
made up of blocks stored in files.  The RSTab hierarchical pushhash
(hph) uses an RSFile factory, though it could be constructed as an hph
of arbitrary depth.  The basic HPush mechanism takes an array,
flattens it into a string, and pushes the string into one of the
underlying blocks.

While the RSTab api is primarily intended as a row-based interface, it
has some extensions to directly manipulate the underlying blocks.
These extensions are useful for building specialized index mechanisms
(see L<Genezzo::Index>) like B-trees, or for supporting rows that span
multiple blocks.

=head2 Basic PushHash

You can use RSTab as a persistent hash of arrays of scalars if you
like.  The arrays and scalars can be of arbitrary length (as long as
they fit in your datafiles).  

=head2 SQL DBI-style interface

RSTab is designed to efficiently support prepare/execute/fetch
operations against tables.  What distinguishes this API from a
standard hash is that the "prepare" operation generates a custom,
stateful iterator that understands filters and range selection.  A
filter is simply a predicate which is applied to every row -- rows
which pass are returned to the caller, and rows which fail are
"filtered out".  Range selection is somewhat similar, with the notion
of start and stop keys -- the iterator only returns the rows which are
restricted to a certain range of values.  In general, range selection
is driven off a separate indexing mechanism that positions the fetch
to specifically retrieve the range in an efficient manner, versus
fetching all rows and filtering rows outside the range.

=head2 HPHRowBlk - Row and Block operations

HPHRowBlk is a special pushhash subclass with certain direct block
manipulation methods.  One very useful function is HSuck, which
provides support for rows that span multiple blocks.  While the
standard HPush fails if a row exceeds the space in a single block, the
HSuck api lets the underlying blocks consume the rows in pieces --
each block "sucks up" as much of the row as it can.  The RSTab HPush
is re-implemented on top of HSuck to support large rows.

=head2 Counting, Estimation, Approximation

RSTab has some support for count estimation, inspired by some of Peter
Haas' work (Sequential Sampling Procedures for Query Size Estimation,
ACM SIGMOD 1992, Online Aggregation (with J. Hellerstein and H. Wang),
ACM SIGMOD 1997 Ripple Joins for Online Aggregation (with
J. Hellerstein) ACM SIGMOD 1999).  It could use support for confidence
intervals, so drop me a line if you understand Central Limit Theorem,
Hoeffding and Chebyshev inequalites.  Knowledge of change-points and
time-series is also a plus.

=head1 FUNCTIONS

RSTab support all standard hph hierarchical pushhash operations, with
the extension that it manipulates arrays of scalars, not individual
scalars.

=head2 EXPORT

=head1 LIMITATIONS

various

=head1 TODO

=over 4

=item rownum filter support to move to separate package

=item $href: remove - need a dict function to return allfileused via tso

=item HSuck: need a way to specify packing method 

=item HSuck: fix trailing zero replacement

=item NextCount: fix quitloop

=item localPush/Store: qualify length packstr as percentage of blocksize (1/3?)

=item localStore: race condition on rowstat 

=item localFetchDelete: frag flag info, delete status.  Could express
this function as a generalized "RowSplice" (as distinct from
RDBlkA::HSplice, which is a block splice operator).  Would need be
able to splice based upon column number/array offset, as well as
substring byte offset -- the inverse functionality of PackRow2/HSuck

=item DBI - support Bind and projection (returning only certain
specified columns, versus all columns)

=item _init: change to use TSTableAFU support versus href->{filesused}

=item need support for constraints that "mutate" supplied values,
      e.g. manipulate numeric precision or supply default values for 
      columns.  Also need support for foreign keys in delete.

=back

=head1 AUTHOR

Jeffrey I. Cohen, jcohen@genezzo.com

=head1 SEE ALSO

L<Genezzo::PushHash::HPHRowBlk>,
L<Genezzo::PushHash::hph>,
L<Genezzo::PushHash::PushHash>,
L<Genezzo::Tablespace>,
L<Genezzo::Row::RSFile>,
L<Genezzo::Row::RSBlock>,
L<Genezzo::Block::RDBlock>,
L<Genezzo::BufCa::BCFile>,
L<Genezzo::BufCa::BufCaElt>,
L<perl(1)>.

Copyright (c) 2003, 2004, 2005 Jeffrey I Cohen.  All rights reserved.

    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 2 of the License, or
    any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA

Address bug reports and comments to: jcohen@genezzo.com

For more information, please visit the Genezzo homepage 
at L<http://www.genezzo.com>

=cut