The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

package Bio::DB::SeqFeature::Store::DBI::Pg;
use DBD::Pg qw(:pg_types);
use MIME::Base64;
# $Id: 14656 2008-04-14 15:05:37Z lstein $

=head1 NAME

Bio::DB::SeqFeature::Store::DBI::Pg -- PostgreSQL implementation of Bio::DB::SeqFeature::Store


  use Bio::DB::SeqFeature::Store;

  # Open the sequence database
  my $db = Bio::DB::SeqFeature::Store->new(-adaptor => 'DBI::Pg',
                                          -dsn     => 'dbi:Pg:test');

  # get a feature from somewhere
  my $feature = Bio::SeqFeature::Generic->new(...);

  # store it
  $db->store($feature) or die "Couldn't store!";

  # primary ID of the feature is changed to indicate its primary ID
  # in the database...
  my $id = $feature->primary_id;

  # get the feature back out
  my $f  = $db->fetch($id);

  # change the feature and update it
  $db->update($f) or die "Couldn't update!";

  # searching...
  # id
  my @features = $db->fetch_many(@list_of_ids);

  # name
  @features = $db->get_features_by_name('ZK909');

  # alias
  @features = $db->get_features_by_alias('sma-3');

  # type
  @features = $db->get_features_by_name('gene');

  # location
  @features = $db->get_features_by_location(-seq_id=>'Chr1',-start=>4000,-end=>600000);

  # attribute
  @features = $db->get_features_by_attribute({description => 'protein kinase'})

  # the GFF "Note" field
  @result_list = $db->search_notes('kinase');

  # arbitrary combinations of selectors
  @features = $db->features(-name => $name,
                            -type => $types,
                            -seq_id => $seqid,
                            -start  => $start,
                            -end    => $end,
                            -attributes => $attributes);

  # ...using an iterator
  my $iterator = $db->get_seq_stream(-name => $name,
                                     -type => $types,
                                     -seq_id => $seqid,
                                     -start  => $start,
                                     -end    => $end,
                                     -attributes => $attributes);

  while (my $feature = $iterator->next_seq) {
    # do something with the feature

  # ...limiting the search to a particular region
  my $segment  = $db->segment('Chr1',5000=>6000);
  my @features = $segment->features(-type=>['mRNA','match']);

  # getting & storing sequence information
  # Warning: this returns a string, and not a PrimarySeq object
  my $sequence = $db->fetch_sequence('Chr1',5000=>6000);

  # what feature types are defined in the database?
  my @types    = $db->types;

  # create a new feature in the database
  my $feature = $db->new_feature(-primary_tag => 'mRNA',
                                 -seq_id      => 'chr3',
                                 -start      => 10000,
                                 -end        => 11000);


Bio::DB::SeqFeature::Store::Pg is the Pg adaptor for
Bio::DB::SeqFeature::Store. You will not create it directly, but
instead use Bio::DB::SeqFeature::Store-E<gt>new() to do so.

See L<Bio::DB::SeqFeature::Store> for complete usage instructions.

=head2 Using the Pg adaptor

Before you can use the adaptor, you must use the Pgadmin tool to
create a database and establish a user account with write
permission. In order to use "fast" loading, the user account must have
"file" privileges.

To establish a connection to the database, call
Bio::DB::SeqFeature::Store-E<gt>new(-adaptor=E<gt>'DBI::Pg',@more_args). The
additional arguments are as follows:

  Argument name       Description
  -------------       -----------

 -dsn              The database name. You can abbreviate
                   "dbi:Pg:foo" as "foo" if you wish.

 -user             Username for authentication.

 -pass             Password for authentication.

 -namespace        Creates a SCHEMA for the tables. This allows you
                   to have several virtual databases in the same
                   physical database.

 -temp             Boolean flag. If true, a temporary database
                   will be created and destroyed as soon as
                   the Store object goes out of scope. (synonym -temporary)

 -autoindex        Boolean flag. If true, features in the database will be
                   reindexed every time they change. This is the default.

 -tmpdir           Directory in which to place temporary files during "fast" loading.
                   Defaults to File::Spec->tmpdir(). (synonyms -dump_dir, -dumpdir, -tmp)

 -dbi_options      A hashref to pass to DBI->connect's 4th argument, the "attributes."
                   (synonyms -options, -dbi_attr)

 -write            Pass true to open database for writing or updating.

If successful, a new instance of
Bio::DB::SeqFeature::Store::DBI::Pg will be returned.

In addition to the standard methods supported by all well-behaved
Bio::DB::SeqFeature::Store databases, several following
adaptor-specific methods are provided. These are described in the next


use strict;

use base 'Bio::DB::SeqFeature::Store::DBI::mysql';
use Bio::DB::SeqFeature::Store::DBI::Iterator;
use DBI;
use Memoize;
use Cwd 'abs_path';
use Bio::DB::GFF::Util::Rearrange 'rearrange';
use File::Copy;
use File::Spec;
use constant DEBUG=>0;

use constant MAX_INT =>  2_147_483_647;
use constant MIN_INT => -2_147_483_648;
use constant MAX_BIN =>  1_000_000_000;  # size of largest feature = 1 Gb
use constant MIN_BIN =>  1000;           # smallest bin we'll make - on a 100 Mb chromosome, there'll be 100,000 of these

# object initialization
# NOTE: most of this code can be refactored and inherited from DBI or DBI::mysql adapter
sub init {
  my $self          = shift;
  my ($dsn,
     ) = rearrange(['DSN',

  $dbi_options  ||= {pg_server_prepare => 0};
  $writeable    = 1 if $is_temporary or $dump_dir;

  $dsn or $self->throw("Usage: ".__PACKAGE__."->init(-dsn => \$dbh || \$dsn)");

  my $dbh;
  if (ref $dsn) {
    $dbh = $dsn;
  } else {
    $dsn = "dbi:Pg:$dsn" unless $dsn =~ /^dbi:/;
    $dbh = DBI->connect($dsn,$user,$pass,$dbi_options) or $self->throw($DBI::errstr);
  $dbh->do('set client_min_messages=warning') if $dbh;
  $self->{'original_arguments'} = {
      'dsn' => $dsn,
      'user' => $user,
      'pass' => $pass,
      'dbh_options' => $dbi_options,
  $self->{dbh}       = $dbh;
  $self->{dbh}->{InactiveDestroy} = 1;
  $self->{is_temp}   = $is_temporary;
  $self->{writeable} = $writeable;
  $self->{namespace} = $namespace || $schema || 'public';

  $self->autoindex($autoindex)                   if defined $autoindex;
  $self->dumpdir($dump_dir)                      if $dump_dir;
  if ($self->is_temp) {
    # warn "creating a temp database isn't supported";
  } elsif ($create) {

sub table_definitions {
  my $self = shift;
  return {
	  feature => <<END,
  id       serial primary key,
  typeid   int      not null,
  seqid    int,
  start    int,
  "end"      int,
  strand   int      default 0,
  tier     int,
  bin      int,
  indexed  int default 1,
  object     bytea not null
  CREATE INDEX feature_stuff ON feature(seqid,tier,bin,typeid);
  CREATE INDEX feature_typeid ON feature(typeid);

	  locationlist => <<END,
  id         serial primary key,
  seqname    text   not null
); CREATE INDEX locationlist_seqname ON locationlist(seqname);

	  typelist => <<END,
  id       serial primary key,
  tag      text  not null
); CREATE INDEX typelist_tab ON typelist(tag);
	  name => <<END,
  id           int       not null,
  name         text  not null,
  display_name int       default 0
  CREATE INDEX name_id ON name( id );
  CREATE INDEX name_name ON name( name );
  CREATE INDEX name_lcname ON name( lower(name) );
  CREATE INDEX name_lcname_varchar_patt_ops ON name USING BTREE (lower(name) varchar_pattern_ops);

	  attribute => <<END,
  id               int       not null,
  attribute_id     int   not null,
  attribute_value  text
  CREATE INDEX attribute_id ON attribute(id);
  CREATE INDEX attribute_id_val ON attribute(attribute_id,SUBSTR(attribute_value, 1, 10));

	  attributelist => <<END,
  id       serial primary key,
  tag      text  not null
  CREATE INDEX attributelist_tag ON attributelist(tag);
	  parent2child => <<END,
  id               int       not null,
  child            int       not null
  CREATE UNIQUE INDEX parent2child_id_child ON parent2child(id,child);

	  meta => <<END,
  name      text primary key,
  value     text not null
	  sequence => <<END,
  id       int not null,
  "offset"   int not null,
  sequence text,
  primary key(id,"offset")

    interval_stats => <<END,
   typeid            int not null,
   seqid             int not null,
   bin               int not null,
   cum_count         int not null
 CREATE UNIQUE INDEX interval_stats_id ON interval_stats(typeid,seqid,bin);

sub schema {
  my ($self, $schema) = @_;
  $self->{'schema'} = $schema if defined($schema);
  if ($schema) {
    $self->dbh->do("SET search_path TO " . $self->{'schema'} );
  } else {
    $self->dbh->do("SET search_path TO public");
  return $self->{'schema'};
# wipe database clean and reinstall schema
sub _init_database {
  my $self = shift;
  my $erase = shift;

  my $dbh    = $self->dbh;
  my $namespace = $self->namespace;
  my $tables = $self->table_definitions;
    my $temporary = $self->is_temp ? 'TEMPORARY' : '';
  foreach (keys %$tables) {
    next if $_ eq 'meta';      # don't get rid of meta data!
    my $table = $self->_qualify($_);
    $dbh->do("DROP TABLE IF EXISTS $table") if $erase;
    my @table_exists = $dbh->selectrow_array("SELECT * FROM pg_tables WHERE tablename = '$table' AND schemaname = '$self->namespace'");
		if (!scalar(@table_exists)) {
			my $query = "CREATE $temporary TABLE $table $tables->{$_}";
			$dbh->do($query) or $self->throw($dbh->errstr);
  $self->subfeatures_are_indexed(1) if $erase;

sub maybe_create_meta {
  my $self = shift;
  return unless $self->writeable;
  my $namespace    = $self->namespace;
  my $table        = $self->_qualify('meta');
  my $tables       = $self->table_definitions;
  my $temporary    = $self->is_temp ? 'TEMPORARY' : '';
  my @table_exists = $self->dbh->selectrow_array("SELECT * FROM pg_tables WHERE tablename = 'meta' AND schemaname = '$namespace'");
  $self->dbh->do("CREATE $temporary TABLE $table $tables->{meta}")
      unless @table_exists;

# check if the namespace/schema exists, if not create it

sub _check_for_namespace {
  my $self = shift;
  my $namespace = $self->namespace;
  return if $namespace eq 'public';
  my $dbh  = $self->dbh;
  my @schema_exists = $dbh->selectrow_array("SELECT * FROM pg_namespace WHERE nspname = '$namespace'");
  if (!scalar(@schema_exists)) {
      my $query = "CREATE SCHEMA $namespace";
	  $dbh->do($query) or $self->throw($dbh->errstr);

	  # if temp parameter is set and schema created for this process then enable removal in remove_namespace()
	  if ($self->is_temp) {
	      $self->{delete_schema} = 1;

# Overiding inherited mysql _qualify (We do not need to qualify for PostgreSQL as we have set the search_path above)
sub _qualify {
  my $self = shift;
  my $table_name = shift;
  return $table_name;

# when is_temp is set and the schema did not exist beforehand then we are able to remove it
sub remove_namespace {
  my $self = shift;
  if ($self->{delete_schema}) {
      my $namespace = $self->namespace;
      $self->dbh->do("DROP SCHEMA $namespace") or $self->throw($self->dbh->errstr);

####Overiding the inherited mysql function _prepare

sub _prepare {
   my $self = shift;
   my $query = shift;
   my $dbh   = $self->dbh;
   my $schema = $self->{namespace};

   if ($schema) {
     $dbh->do("SET search_path TO " . $self->{'schema'} );
   } else {
     $dbh->do("SET search_path TO public");
   my $sth   = $dbh->prepare_cached($query, {}, 3) or

sub _finish_bulk_update {
  my $self = shift;
  my $dbh  = $self->dbh;
  my $dir  = $self->{dumpdir} || '.';
  for my $table ('feature',$self->index_tables) {
    my $fh   = $self->dump_filehandle($table);
    my $path = $self->dump_path($table);
    my $qualified_table = $self->_qualify($table);
    copy($path, "$path.bak");
    # Get stuff from file into STDIN so we don't have to be superuser
    open my $FH, '<', $path or $self->throw("Could not read file '$path': $!");
    print STDERR "Loading file $path\n";
    $dbh->do("COPY $qualified_table FROM STDIN CSV QUOTE '''' DELIMITER '\t'") or $self->throw($dbh->errstr);
    while (my $line = <$FH>) {
    $dbh->pg_endcopy() or $self->throw($dbh->errstr);
    close $FH;
    #unlink $path;
  delete $self->{bulk_update_in_progress};
  delete $self->{filehandles};

# Add a subparts to a feature. Both feature and all subparts must already be in database.
sub _add_SeqFeature {
  my $self     = shift;

  # special purpose method for case when we are doing a bulk update
  return $self->_dump_add_SeqFeature(@_) if $self->{bulk_update_in_progress};

  my $parent   = shift;
  my @children = @_;

  my $dbh = $self->dbh;
  local $dbh->{RaiseError} = 1;

  my $child_table = $self->_parent2child_table();
  my $count = 0;

  my $querydel = "DELETE FROM $child_table WHERE id = ? AND child = ?";
  my $query = "INSERT INTO $child_table (id,child) VALUES (?,?)";
  my $sthdel = $self->_prepare($querydel);
  my $sth = $self->_prepare($query);

  my $parent_id = (ref $parent ? $parent->primary_id : $parent)
    or $self->throw("$parent should have a primary_id");

  $self->begin_work or $self->throw($dbh->errstr);
  eval {
    for my $child (@children) {
      my $child_id = ref $child ? $child->primary_id : $child;
      defined $child_id or die "no primary ID known for $child";
      $sthdel->execute($parent_id, $child_id);

  if ($@) {
    warn "Transaction aborted because $@";
  else {

# because this is a reserved word in postgresql
# get primary sequence between start and end
sub _fetch_sequence {
  my $self = shift;
  my ($seqid,$start,$end) = @_;

  # backward compatibility to the old days when I liked reverse complementing
  # dna by specifying $start > $end
  my $reversed;
  if (defined $start && defined $end && $start > $end) {
    ($start,$end) = ($end,$start);
  $start-- if defined $start;
  $end--   if defined $end;

  my $offset1 = $self->_offset_boundary($seqid,$start || 'left');
  my $offset2 = $self->_offset_boundary($seqid,$end   || 'right');
  my $sequence_table = $self->_sequence_table;
  my $locationlist_table = $self->_locationlist_table;

  my $sth     = $self->_prepare(<<END);
SELECT sequence,"offset"
   FROM $sequence_table as s,$locationlist_table as ll
     AND ll.seqname= ?
     AND "offset" >= ?
     AND "offset" <= ?
   ORDER BY "offset"

  my $seq = '';
  $sth->execute($seqid,$offset1,$offset2) or $self->throw($sth->errstr);

  while (my($frag,$offset) = $sth->fetchrow_array) {
    substr($frag,0,$start-$offset) = '' if defined $start && $start > $offset;
    $seq .= $frag;
  substr($seq,$end-$start+1) = '' if defined $end && $end-$start+1 < length($seq);
  if ($reversed) {
    $seq = reverse $seq;
    $seq =~ tr/gatcGATC/ctagCTAG/;

sub _offset_boundary {
  my $self = shift;
  my ($seqid,$position) = @_;

  my $sequence_table     = $self->_sequence_table;
  my $locationlist_table = $self->_locationlist_table;

  my $sql;
  $sql =  $position eq 'left'  ? "SELECT min(\"offset\") FROM $sequence_table as s,$locationlist_table as ll WHERE AND ll.seqname=?"
         :$position eq 'right' ? "SELECT max(\"offset\") FROM $sequence_table as s,$locationlist_table as ll WHERE AND ll.seqname=?"
	 :"SELECT max(\"offset\") FROM $sequence_table as s,$locationlist_table as ll WHERE AND ll.seqname=? AND \"offset\"<=?";

  my $sth = $self->_prepare($sql);
  my @args = $position =~ /^-?\d+$/ ? ($seqid,$position) : ($seqid);
  $sth->execute(@args) or $self->throw($sth->errstr);
  my $boundary = $sth->fetchall_arrayref->[0][0];
  return $boundary;

sub _name_sql {
  my $self = shift;
  my ($name,$allow_aliases,$join) = @_;
  my $name_table   = $self->_name_table;

  my $from  = "$name_table as n";
  my ($match,$string) = $self->_match_sql($name);

  my $where = "$join AND lower( $match";
  $where   .= " AND n.display_name>0" unless $allow_aliases;
  return ($from,$where,'',$string);

sub _search_attributes {
  my $self = shift;
  my ($search_string,$attribute_names,$limit) = @_;
  my @words               = map {quotemeta($_)} split /\s+/,$search_string;
  my $name_table          = $self->_name_table;
  my $attribute_table     = $self->_attribute_table;
  my $attributelist_table = $self->_attributelist_table;
  my $type_table          = $self->_type_table;
  my $typelist_table      = $self->_typelist_table;

  my @tags    = @$attribute_names;
  my $tag_sql = join ' OR ',("al.tag=?") x @tags;

  my $perl_regexp = join '|',@words;

  my @wild_card_words = map { "%$_%" } @words;
  my $sql_regexp = join ' OR ',("a.attribute_value SIMILAR TO ?")  x @words;
  my $sql = <<END;
SELECT name,attribute_value,tl.tag,
  FROM $name_table as n,$attribute_table as a,$attributelist_table as al,$type_table as t,$typelist_table as tl
    AND n.display_name=1
    AND ($tag_sql)
    AND ($sql_regexp)
  $sql .= "LIMIT $limit" if defined $limit;
  $self->_print_query($sql,@tags,@wild_card_words) if DEBUG || $self->debug;
  my $sth = $self->_prepare($sql);
  $sth->execute(@tags,@wild_card_words) or $self->throw($sth->errstr);

  my @results;
  while (my($name,$value,$type,$id) = $sth->fetchrow_array) {
    my (@hits) = $value =~ /$perl_regexp/ig;
    my @words_in_row = split /\b/,$value;
    my $score  = int(@hits*100/@words/@words_in_row);
    push @results,[$name,$value,$score,$type,$id];
  @results = sort {$b->[2]<=>$a->[2]} @results;
  return @results;

# overridden here because the mysql adapter uses
# a non-standard query hint
sub _attributes_sql {
  my $self = shift;
  my ($attributes,$join) = @_;

  my ($wf,@bind_args)       = $self->_make_attribute_where('a','al',$attributes);
  my ($group_by,@group_args)= $self->_make_attribute_group('a',$attributes);

  my $attribute_table       = $self->_attribute_table;
  my $attributelist_table   = $self->_attributelist_table;

  my $from = "$attribute_table as a, $attributelist_table as al";

  my $where = <<END;$join
  AND ($wf)

  my $group = $group_by;

  my @args  = (@bind_args,@group_args);
  return ($from,$where,$group,@args);

sub _match_sql {
  my $self = shift;
  my $name = shift;

  my ($match,$string);
  if ($name =~ /(?:^|[^\\])[*?]/) {
    $name =~ s/(^|[^\\])([%_])/$1\\$2/g;
    $name =~ s/(^|[^\\])\*/$1%/g;
    $name =~ s/(^|[^\\])\?/$1_/g;
    $match = "LIKE ?";
    $string  = lc($name);
  } else {
    $match = "= lower(?)";
    $string  = lc($name);
  return ($match,$string);

# overridden because of differences between LIKE behavior in mysql and postgres
# as well as case-sensitivity of matches
sub _types_sql {
  my $self  = shift;
  my ($types,$type_table) = @_;
  my ($primary_tag,$source_tag);

  my @types = ref $types eq 'ARRAY' ?  @$types : $types;

  my $typelist      = $self->_typelist_table;
  my $from = "$typelist AS tl";

  my (@matches,@args);

  for my $type (@types) {

    if (ref $type && $type->isa('Bio::DB::GFF::Typename')) {
      $primary_tag = $type->method;
      $source_tag  = $type->source;
    } else {
      ($primary_tag,$source_tag) = split ':',$type,2;

    if ($source_tag) {
      push @matches,"lower(tl.tag)=lower(?)";
      push @args,"$primary_tag:$source_tag";
    } else {
      push @matches,"tl.tag ILIKE ?";
      push @args,"$primary_tag:%";
  my $matches = join ' OR ',@matches;

  my $where = <<END;$type_table.typeid
   AND   ($matches)

  return ($from,$where,'',@args);

# overridden because mysql adapter uses the non-standard REPLACE syntax
sub setting {
  my $self = shift;
  my ($variable_name,$value) = @_;
  my $meta  = $self->_meta_table;

  if (defined $value && $self->writeable) {
    my $querydel = "DELETE FROM $meta WHERE name = ?";
    my $query    = "INSERT INTO $meta (name,value) VALUES (?,?)";
    my $sthdel   = $self->_prepare($querydel);
    my $sth      = $self->_prepare($query);
    $sth->execute($variable_name,$value) or $self->throw($sth->errstr);
    $self->{settings_cache}{$variable_name} = $value;
  else {
    return $self->{settings_cache}{$variable_name} if exists $self->{settings_cache}{$variable_name};
    my $query = "SELECT value FROM $meta as m WHERE";
    my $sth = $self->_prepare($query);
#    $sth->execute($variable_name) or $self->throw($sth->errstr);
    unless ($sth->execute($variable_name)) {
      my $errstr = $sth->errstr;
      $sth = $self->_prepare("SHOW search_path");
      $errstr .= "With search_path " . $sth->fetchrow_arrayref->[0] . "\n";

    my ($value) = $sth->fetchrow_array;
    return $self->{settings_cache}{$variable_name} = $value;

# overridden because of use of REPLACE in mysql adapter
# Replace Bio::SeqFeatureI into database.
sub replace {
  my $self       = shift;
  my $object     = shift;
  my $index_flag = shift || undef;

  # ?? shouldn't need to do this
  # $self->_load_class($object);
  my $id = $object->primary_id;
  my $features = $self->_feature_table;

  my $query = "INSERT INTO $features (id,object,indexed,seqid,start,\"end\",strand,tier,bin,typeid) VALUES (?,?,?,?,?,?,?,?,?,?)";
  my $query_noid = "INSERT INTO $features (object,indexed,seqid,start,\"end\",strand,tier,bin,typeid) VALUES (?,?,?,?,?,?,?,?,?)";
  my $querydel = "DELETE FROM $features WHERE id = ?";

  my $sthdel = $self->_prepare($querydel);
  my $sth = $self->_prepare($query);
  my $sth_noid = $self->_prepare($query_noid);

  my @location = $index_flag ? $self->_get_location_and_bin($object) : (undef)x6;

  my $primary_tag = $object->primary_tag;
  my $source_tag  = $object->source_tag || '';
  $primary_tag    .= ":$source_tag";
  my $typeid   = $self->_typeid($primary_tag,1);

  if ($id) {
    $sth->execute($id,encode_base64($self->freeze($object), ''),$index_flag||0,@location,$typeid) or $self->throw($sth->errstr);
  } else {
    $sth_noid->execute(encode_base64($self->freeze($object), ''),$index_flag||0,@location,$typeid) or $self->throw($sth->errstr);

  my $dbh = $self->dbh;

  $object->primary_id($dbh->last_insert_id(undef, undef, undef, undef, {sequence=>$features."_id_seq"})) unless defined $id;

  $self->flag_for_indexing($dbh->last_insert_id(undef, undef, undef, undef, {sequence=>$features."_id_seq"})) if $self->{bulk_update_in_progress};

=head2 types

 Title   : types
 Usage   : @type_list = $db->types
 Function: Get all the types in the database
 Returns : array of Bio::DB::GFF::Typename objects
 Args    : none
 Status  : public


# overridden because "offset" is reserved in postgres
# Insert a bit of DNA or protein into the database
sub _insert_sequence {
  my $self = shift;
  my ($seqid,$seq,$offset) = @_;
  my $id = $self->_locationid($seqid);
  my $seqtable = $self->_sequence_table;
  my $sthdel = $self->_prepare("DELETE FROM $seqtable WHERE id = ? AND \"offset\" = ?");
  my $sth = $self->_prepare(<<END);
INSERT INTO $seqtable (id,"offset",sequence) VALUES (?,?,?)
  $sth->execute($id,$offset,$seq) or $self->throw($sth->errstr);

# overridden because of mysql adapter's use of REPLACE
# This subroutine flags the given primary ID for later reindexing
sub flag_for_indexing {
  my $self = shift;
  my $id   = shift;
  my $needs_updating = $self->_update_table;

  my $querydel = "DELETE FROM $needs_updating WHERE id = ?";
  my $query = "INSERT INTO $needs_updating VALUES (?)";
  my $sthdel = $self->_prepare($querydel);
  my $sth = $self->_prepare($query);

  $sth->execute($id) or $self->throw($self->dbh->errstr);

# overridden because of the different ways that mysql and postgres
# handle id sequences
sub _genericid {
  my $self = shift;
  my ($table,$namefield,$name,$add_if_missing) = @_;
  my $qualified_table = $self->_qualify($table);
  my $sth = $self->_prepare(<<END);
SELECT id FROM $qualified_table WHERE $namefield=?
  $sth->execute($name) or die $sth->errstr;
  my ($id) = $sth->fetchrow_array;
  return $id if defined $id;
  return     unless $add_if_missing;

  $sth = $self->_prepare(<<END);
INSERT INTO $qualified_table ($namefield) VALUES (?)
  $sth->execute($name) or die $sth->errstr;
  my $dbh = $self->dbh;
  return $dbh->last_insert_id(undef, undef, undef, undef, {sequence=>$qualified_table."_id_seq"});

# overridden because of differences in binding between mysql and postgres adapters
# given a statement handler that is expected to return rows of (id,object)
# unthaw each object and return a list of 'em
sub _sth2objs {
  my $self = shift;
  my $sth  = shift;
  my @result;
  my ($id, $o);
	$sth->bind_col(1, \$id);
	$sth->bind_col(2, \$o, { pg_type => PG_BYTEA});
  #while (my ($id,$o) = $sth->fetchrow_array) {
  while ($sth->fetch) {
    my $obj = $self->thaw(decode_base64($o) ,$id);
    push @result,$obj;
  return @result;

# given a statement handler that is expected to return rows of (id,object)
# unthaw each object and return a list of 'em
sub _sth2obj {
  my $self = shift;
  my $sth  = shift;
  my ($id,$o) = $sth->fetchrow_array;
  return unless $o;
  my $obj = $self->thaw(decode_base64($o) ,$id);

# SQL Fragment generators

# overridden because of base64 encoding needed here
# special-purpose store for bulk loading - write to a file rather than to the db
sub _dump_store {
  my $self    = shift;
  my $indexed = shift;

  my $count = 0;
  my $store_fh = $self->dump_filehandle('feature');
  my $dbh      = $self->dbh;

  my $autoindex = $self->autoindex;

  for my $obj (@_) {
    my $id       = $self->next_id;
    my ($seqid,$start,$end,$strand,$tier,$bin) = $indexed ? $self->_get_location_and_bin($obj) : (undef)x6;
    my $primary_tag = $obj->primary_tag;
    my $source_tag  = $obj->source_tag || '';
    $primary_tag    .= ":$source_tag";
    my $typeid   = $self->_typeid($primary_tag,1);

		my $frozen_object = encode_base64($self->freeze($obj), '');
                # TODO: Fix this, why does frozen object start with quote but not end with one
    print $store_fh join("\t",$id,$typeid,$seqid,$start,$end,$strand,$tier,$bin,$indexed,$frozen_object),"\n";
    $self->_update_indexes($obj) if $indexed && $autoindex;

  # remember whether we are have ever stored a non-indexed feature
  unless ($indexed or $self->{indexed_flag}++) {

sub _enable_keys  { }  # nullop
sub _disable_keys { }  # nullop

sub _add_interval_stats_table {
    my $self = shift;
    my $tables          = $self->table_definitions;
    my $interval_stats  = $self->_interval_stats_table;
    ##check to see if it exists yet; if it does, just return because
    ##there is a drop from in the next step
    my $dbh = $self->dbh;
    my @table_exists = $dbh->selectrow_array("SELECT * FROM pg_tables WHERE tablename
 = '$interval_stats' AND schemaname = '".$self->namespace."'");
    if (!scalar(@table_exists)) {
        my $query = "CREATE TABLE $interval_stats $tables->{interval_stats}";
        $dbh->do($query) or $self->throw($dbh->errstr);

sub _fetch_indexed_features_sql {
    my $self     = shift;
    my $features = $self->_feature_table;
    return <<END;
SELECT typeid,seqid,start-1,"end"
  FROM $features as f
 WHERE f.indexed=1
  ORDER BY typeid,seqid,start
