The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Apache2::Translation::BDB;

use 5.008008;
use strict;

use File::Spec;
use BerkeleyDB;
use Storable ();
use Class::Member::HASH -CLASS_MEMBERS=>qw/bdbenv readonly extra_db
					   _db1 _db2 _txn parent_txn
					   _connected root/;
our @CLASS_MEMBERS;

use Apache2::Translation::_base;
use base 'Apache2::Translation::_base';

use warnings;
no warnings qw(uninitialized);
undef $^W;

our $VERSION = '0.05';

# _db1 maps id=>[block, order, action, id, key, uri]
# _db2 is a secondary index of (key,uri). It is associated with _db1
# extra_db contains the timestamp and the id-sequence

our $last_created_provider;

sub new {
  my $parent=shift;
  my $class=ref($parent) || $parent;
  my $I=bless {}=>$class;
  my $x=0;
  my %o=map {($x=!$x) ? lc($_) : $_} @_;

  if( ref($parent) ) {         # inherit first
    foreach my $m (@CLASS_MEMBERS) {
      $I->$m=$parent->$m;
    }
  }

  # then override with named parameters
  foreach my $m (@CLASS_MEMBERS) {
    $I->$m=$o{$m} if( exists $o{$m} );
  }

  unless( ref($I->bdbenv) ) {
    if( defined $I->bdbenv ) {
      die "ERROR: bdbenv must be set to a directory name\n"
	unless( ref($I->bdbenv) or length $I->bdbenv );

      my $dir=$I->bdbenv;
      $dir=File::Spec->catdir( $I->root, $dir )
	if( length $I->root and !File::Spec->file_name_is_absolute($dir) );
      -d $dir or mkdir $dir;
    } else {
      if( $last_created_provider ) {
	$I->bdbenv=$last_created_provider;
      } else {
	die "ERROR: bdbenv must be set to a directory name\n";
      }
    }
  }

  return $last_created_provider=$I;
}

sub connect {
  my ($I)=@_;

  return $I if( $I->_connected );

  my $env=$I->bdbenv;
  if( ref($env) ) {
    if( $I->bdbenv->isa(__PACKAGE__) ) {
      $I->bdbenv->connect;
      $I->bdbenv=$env=$I->bdbenv->bdbenv;
    }
  } else {
    my $dir=$I->bdbenv;
    $dir=File::Spec->catdir( $I->root, $dir )
      if( length $I->root and !File::Spec->file_name_is_absolute($dir) );
    $I->bdbenv=$env=BerkeleyDB::Env->new
      (
       -Home  => $dir,
       -Flags => DB_CREATE| DB_INIT_MPOOL | DB_INIT_LOCK | DB_INIT_TXN |
                 DB_INIT_LOG,
       -LockDetect => DB_LOCK_MINWRITE,
       #-ErrFile=>\*STDERR,
       #-Verbose=>10,
      ) or die "ERROR: Cannot create BDB environment $dir: $BerkeleyDB::Error\n";
  }

  my $txn;
  $txn=$env->txn_begin($I->parent_txn ? $I->parent_txn : undef, 0)
    unless($I->readonly);
  my $name1=ref($I).'-id.db';
  $I->_db1=BerkeleyDB::Btree->new
    (
     -Filename     => $name1,
     -Flags        => ($I->readonly ? DB_RDONLY | DB_AUTO_COMMIT
                                    : DB_CREATE),
     -Env          => $env,
     -Txn          => $txn,
    );
  my $name2=ref($I).'-keyuri.db';
  $I->_db2=BerkeleyDB::Btree->new
    (
     -Filename     => $name2,
     -Flags        => ($I->readonly ? DB_RDONLY | DB_AUTO_COMMIT
                                    : DB_CREATE),
     -Env          => $env,
     -Txn          => $txn,
     -Property     => DB_DUP,
    );
  my $name3=ref($I).'-extra.db';
  $I->extra_db=BerkeleyDB::Btree->new
    (
     -Filename     => $name3,
     -Flags        => ($I->readonly ? DB_RDONLY | DB_AUTO_COMMIT
                                    : DB_CREATE),
     -Env          => $env,
     -Txn          => $txn,
    );

  unless($I->readonly) {
    if( $I->_db1 and $I->_db2 and $I->extra_db ) {
      $txn->txn_commit
	and die "ERROR: Cannot commit DB_CREATE: $BerkeleyDB::Error\n";
    } else {
      $txn->txn_abort;
      die "ERROR: Cannot open database: $BerkeleyDB::Error\n";
    }
  }

  $I->begin($I->parent_txn);
  $I->_db1->associate
    ($I->_db2, sub {
       my ($pkey, $pdata)=@_;
       $_[2]=join( "\t", @{decode($pdata)}[KEY,URI] );
       return 0;
     }) if($I->_db1 and $I->_db2);
  $I->commit;

  $I->_connected=1;

  return $I;
}

*encode=\&Storable::nfreeze;
*decode=\&Storable::thaw;

*start=\&connect;
sub stop {}
sub can_notes {1}

sub fetch {
  my ($I, $key, $uri, $with_notes)=@_;

  $key.="\t".$uri;
  my (@l, $v, $c, $stat);
  $c=$I->_db2->db_cursor;
  if( $with_notes ) {
    for( $stat=$c->c_get($key, $v, DB_SET);
	 $stat==0;
	 $stat=$c->c_get($key, $v, DB_NEXT_DUP) ) {
      push @l, [@{decode($v)}[BLOCK,ORDER,ACTION,ID,NOTE]];
    }
  } else {
    for( $stat=$c->c_get($key, $v, DB_SET);
	 $stat==0;
	 $stat=$c->c_get($key, $v, DB_NEXT_DUP) ) {
      push @l, [@{decode($v)}[BLOCK,ORDER,ACTION,ID]];
    }
  }
  return sort {$a->[BLOCK] <=> $b->[BLOCK] or $a->[ORDER] <=> $b->[ORDER]} @l;
}

sub list_keys {
  my $I=$_[0];

  my (%h, $k, $v, $c, $stat);
  $c=$I->_db2->db_cursor;
  for( $stat=$c->c_get($k, $v, DB_FIRST);
       $stat==0;
       $stat=$c->c_get($k, $v, DB_NEXT_NODUP) ) {
    undef $h{(split /\t/, $k)[0]};
  }

  return map {[$_]} sort keys %h;
}

sub list_keys_and_uris {
  my ($I,$key)=@_;
  $key='' unless( defined $key );

  my (@l, $k, $v, $c, $stat);
  $c=$I->_db2->db_cursor;

  for( $stat=$c->c_get($k, $v, DB_FIRST);
       $stat==0;
       $stat=$c->c_get($k, $v, DB_NEXT_NODUP) ) {
    my @v=split /\t/, $k;
    push @l, [@v] if( !length($key) or $key eq $v[0] );
  }
  return sort {$a->[0] cmp $b->[0] or $a->[1] cmp $b->[1]} @l;
}

sub begin {
  my ($I)=@_;

  $I->_txn=$I->bdbenv->txn_begin
    ( $I->parent_txn ? $I->parent_txn : undef, DB_TXN_NOSYNC );
  die "ERROR: Cannot create transaction: $BerkeleyDB::Error\n"
    unless( defined $I->_txn );
  $I->_txn->Txn($I->_db1, $I->_db2, $I->extra_db);
}

sub commit {
  my ($I)=@_;

  my $rc=!$I->_txn->txn_commit;
  undef $I->_txn;
  $I->bdbenv->txn_checkpoint(0,0);
  return $rc;
}

sub rollback {
  my ($I)=@_;

  return unless( $I->_txn );
  my $rc=!$I->_txn->txn_abort;
  undef $I->_txn;
  return $rc;
}

sub update {
  my $I=shift;
  my $old=shift;
  my $new=shift;

  my ($v, $c, $stat, $rc);
  $c=$I->_db1->db_cursor;

  if( ($rc=$c->c_get($old->[oID], $v, DB_SET))==0 ) {
    my $el=decode($v);
    if( $el->[BLOCK]==$old->[oBLOCK] and $el->[ORDER]==$old->[oORDER] ) {
      @{$el}[BLOCK,ORDER,ACTION,KEY,URI,NOTE]=
	@{$new}[nBLOCK,nORDER,nACTION,nKEY,nURI,nNOTE];
      $rc=$c->c_put($old->[oID],
		    encode($el),
		    DB_CURRENT);
      die "__RETRY__\n" if( $rc==DB_LOCK_DEADLOCK );
      return $rc==0 ? 1 : 0;
    }
  } elsif( $rc==DB_LOCK_DEADLOCK ) {
    die "__RETRY__\n" if( $rc==DB_LOCK_DEADLOCK );
  }
  return "0 but true";
}

sub insert {
  my ($I, $new)=@_;

  # fetch a new id
  my ($k, $v, $c, $id, $rc);
  $c=$I->extra_db->db_cursor;
  if( ($rc=$c->c_get($k='id_seq', $v, DB_SET))==0 ) {
    $rc=$c->c_put( $k, $id=$v+1, DB_CURRENT );
  } else {
    die "__RETRY__\n" if( $rc==DB_LOCK_DEADLOCK );
    $rc=$I->extra_db->db_put( $k, $id=1 );
  }
  die "__RETRY__\n" if( $rc==DB_LOCK_DEADLOCK );

  my $el=[];
  @{$el}[BLOCK,ORDER,ACTION,KEY,URI,NOTE,ID]=
    (@{$new}[nBLOCK,nORDER,nACTION,nKEY,nURI,nNOTE], $id);

  $rc=$I->_db1->db_put( $id, encode($el) );
  die "__RETRY__\n" if( $rc==DB_LOCK_DEADLOCK );
  return $rc==0 ? 1 : 0;
}

sub delete {
  my ($I, $old)=@_;

  my ($v, $c, $stat, $rc);
  $c=$I->_db1->db_cursor;

  if( ($rc=$c->c_get($old->[oID], $v, DB_SET))==0 ) {
    my $el=decode($v);
    if( $el->[BLOCK]==$old->[oBLOCK] and $el->[ORDER]==$old->[oORDER] ) {
      $rc=$c->c_del;
      die "__RETRY__\n" if( $rc==DB_LOCK_DEADLOCK );
      return $rc==0 ? 1 : 0;
    }
  } elsif( $rc==DB_LOCK_DEADLOCK ) {
    die "__RETRY__\n";
  }
  return "0 but true";
}

sub clear {
  my ($I)=@_;

  my ($k, $v, $c, $stat, $count, $rc);

  $count=0;
  $c=$I->_db1->db_cursor;
  while( ($rc=$c->c_get($k, $v, DB_NEXT))==0 ) {
    $rc=$c->c_del;
    die "__RETRY__\n" if( $rc==DB_LOCK_DEADLOCK );
    $count+=($rc==0 ? 1 : 0);
  }
  die "__RETRY__\n" if( $rc==DB_LOCK_DEADLOCK );
  die "__RETRY__\n" if( $I->extra_db->db_del("\ttmstmp")==DB_LOCK_DEADLOCK );

  return $count;
}

sub iterator {
  my $c=$_[0]->_db2->db_cursor;
  my @blocklist;

  return sub {
    unless( @blocklist ) {
      my ($key, $k, $v, $rc);
      if( ($rc=$c->c_get($key, $v, DB_NEXT))==0 ) {
	my $new=[];
	@{$new}[nBLOCK,nORDER,nACTION,nKEY,nURI,nNOTE,nID]=
	  @{decode($v)}[BLOCK,ORDER,ACTION,KEY,URI,NOTE,ID];
	push @blocklist, $new;
	while( ($rc=$c->c_get($k=$key, $v, DB_NEXT_DUP))==0 ) {
	  $new=[];
	  @{$new}[nBLOCK,nORDER,nACTION,nKEY,nURI,nNOTE,nID]=
	    @{decode($v)}[BLOCK,ORDER,ACTION,KEY,URI,NOTE,ID];
	  push @blocklist, $new;
	}
	@blocklist=sort {$a->[nBLOCK]<=>$b->[nBLOCK] or
			 $a->[nORDER]<=>$b->[nORDER]} @blocklist;
      } elsif( $rc==DB_LOCK_DEADLOCK ) {
	die "__RETRY__\n";
      } else {
	undef $c;
	return;
      }
    }
    return shift @blocklist;
  };
}

sub extra {
  my ($I, $k, $v)=@_;

  my ($res);
  die "__RETRY__\n" if( $I->extra_db->db_get($k, $res)==DB_LOCK_DEADLOCK );
  if( @_>2 ) {
    die "__RETRY__\n" if( $I->extra_db->db_put($k, $v)==DB_LOCK_DEADLOCK );
  }

  return $res;
}

sub timestamp {
  my ($I, $stamp)=@_;

  no warnings qw(numeric);
  return $I->extra( "\ttmstmp", $stamp )+0;
}

1;
__END__