package Bio::ConnectDots::DB;
use vars qw(@ISA @AUTO_ATTRIBUTES @OTHER_ATTRIBUTES %SYNONYMS);
use strict;
use DBI;
use File::Path;
use Class::AutoClass;
use Class::AutoClass::Args;
use Bio::ConnectDots::DotSet;
use Bio::ConnectDots::ConnectorSet;
@ISA = qw(Class::AutoClass);
@AUTO_ATTRIBUTES=qw(dsn dbh dbd database host port user password
read_only read_only_schema
_needs_disconnect _db_cursor _exists
load_name load_save load_chunksize load_cid_base
_ext_directory _load_fh _load_count _load_chunk sql_log
);
@OTHER_ATTRIBUTES=qw(ext_directory);
%SYNONYMS=(server=>'host');
Class::AutoClass::declare(__PACKAGE__);
# use 'double quotations to get case-sensitivity in label
# use 'not null' wherever possible to help query optimizier use indexes better
# denormalized connector to cut down the number of joins in big queries
my %SCHEMA=
(connectorset=>
qq(connectorset_id SERIAL,
"name" VARCHAR(255) NOT NULL,
"file_name" TEXT,
"version" VARCHAR(255) NOT NULL,
"source_date" VARCHAR(255),
"source_version" VARCHAR(255),
"download_date" VARCHAR(255),
"ftp" TEXT,
"ftp_files" TEXT,
"comment" TEXT,
PRIMARY KEY("connectorset_id"),UNIQUE("name","version")),
dotset=>
qq(dotset_id SERIAL,
"name" VARCHAR(255) NOT NULL,
PRIMARY KEY(dotset_id),UNIQUE("name")),
connectdotset=>
qq(connectdotset_id SERIAL,
connectorset_id INT NOT NULL,
dotset_id INT NOT NULL,
label_id INT NOT NULL,
PRIMARY KEY(connectdotset_id)),
label=>
qq(label_id SERIAL,
"label" VARCHAR(255) NOT NULL,
"source_label" VARCHAR(255),
"description" TEXT,
PRIMARY KEY(label_id),UNIQUE("label")),
connectortable=>
qq(connectortable_id SERIAL,
"name" VARCHAR(255) NOT NULL,
PRIMARY KEY(connectortable_id),UNIQUE("name")),
connectortableset=>
qq(connectortable_id INT NOT NULL,
connectorset_id INT NOT NULL,
"alias" VARCHAR(255) NOT NULL,
UNIQUE(connectortable_id,"alias")),
dottable=>
qq(dottable_id SERIAL,
"name" VARCHAR(255) NOT NULL,
PRIMARY KEY(dottable_id),UNIQUE("name")),
dottableset=>
qq(dottable_id INT NOT NULL,
dotset_id INT NOT NULL,
label_id INT NOT NULL,
cs_id INT NOT NULL,
"alias" VARCHAR(255) NOT NULL,
UNIQUE(dottable_id,"alias")),
connectdot=>
qq(connector_id INT NOT NULL,
connectorset_id INT NOT NULL,
dot_id INT NOT NULL,
label_id INT NOT NULL,
"id" TEXT NOT NULL),
dot=>
qq(dot_id SERIAL,
dotset_id INT NOT NULL,
"id" TEXT NOT NULL,
PRIMARY KEY(dot_id),UNIQUE("id",dotset_id)),
cdload=>
qq(connector_id INT NOT NULL,
connectorset_id INT NOT NULL,
dotset_id INT NOT NULL,
label_id INT NOT NULL,
"id" TEXT NOT NULL),
);
my %INDICIES = (
connectdot=>
['connectorset_id,connector_id,label_id',
'connectorset_id,dot_id,label_id',
'connectorset_id,label_id',
'"id"']
);
my @INDEX_NAMES;
my @TABLES=keys %SCHEMA;
# maximum number of rows loaded in one 'load infile' operation
my $LOAD_CHUNKSIZE=150000;
sub _init_self {
my($self,$class,$args)=@_;
return unless $class eq __PACKAGE__; # to prevent subclasses from re-running this
$self->_connect;
return unless $self->is_connected;
$self->_manage_schema($args);
if(!$self->ext_directory) {
$self->ext_directory("/usr/tmp/$ENV{USER}") if $ENV{USER};
}
$self->load_chunksize or $self->load_chunksize($LOAD_CHUNKSIZE);
}
sub is_connected {
$_[0]->dbh;
}
sub connect {
my($self,@args)=@_;
my $args=new Bio::ISB::AutoArgs(@args);
$self->Class::AutoClass::set_attributes([qw(dbh dsn dbd host server user password)],$args);
$self->_connect;
}
sub _connect {
my($self)=@_;
return $self->dbh if $self->dbh; # if dbh set, then already connected
my $dbd=lc($self->dbd)||'Pg';
$self->throw("-dbd must be 'Pg' at present") if $dbd && $dbd ne 'Pg';
my $dsn=$self->dsn;
if ($dsn) { # parse off the dbd, database, host elements
$dsn = "DBI:$dsn" unless $dsn=~ /^dbi/i;
} else {
my $database=$self->database;
my $host=$self->host;
my $port=$self->port;
return undef unless $database;
$dsn="DBI:$dbd:dbname=$database;";
$dsn .= "host=$host;" if $host;
$dsn .= "port=$port;" if $port;
}
# Try to establish connection with data source.
my $user=$self->user;
my $password = $self->password;
my $dbh = DBI->connect($dsn,$user,$password,
{AutoCommit=>1, ChopBlanks=>1, PrintError=>0, Warn=>0,});
$self->dsn($dsn);
$self->dbh($dbh);
$self->_needs_disconnect(1);
$self->throw("DBI::connect failed for dsn=$dsn, username=$user: ".DBI->errstr) unless $dbh;
return $dbh;
}
sub _manage_schema {
my($self,$args)=@_;
# grab schema modification parameters
my $read_only_schema=$self->read_only_schema || $self->read_only;
my $drop=$args->drop;
my $create=$args->create;
$self->throw("Schema changes not allowed by -read_only or -read_only_schema setting") if ($drop||$create) && $read_only_schema;
$self->drop if $drop;
$self->create if $create || !($self->exists && !defined $create);
}
# returns 1 if all tables exist, -1 if some exist, 0 if none exist
# note that Perl treats -1 as 'true'
sub exists {
my($self,$doit)=@_;
return $self->_exists if !$doit && defined $self->_exists;
$self->throw("Cannot check schema: database is not connected") unless $self->is_connected;
my $dbh=$self->dbh;
my $tables=$dbh->selectall_arrayref(qq(select tablename from pg_tables where schemaname='public'));
my $count;
for my $table (@TABLES) {
$count++ if grep {$table eq $_->[0]} @$tables;
}
my $exists;
$exists=0 if $count==0;
$exists=1 if $count==@TABLES;
$exists=-1 if $count>0 && $count!=@TABLES;
$self->_exists($exists);
}
sub drop {
my $self=shift;
$self->throw("Cannot drop database: database is not connected") unless $self->is_connected;
my @sql;
foreach my $tbl (@TABLES) {
push ( @sql, qq(DROP TABLE $tbl) ) if table_exist($tbl);
}
foreach my $indx (@INDEX_NAMES) {
push(@sql, qq(DROP INDEX $indx));
}
$self->do_sql(@sql);
$self->exists('DOIT'); # make sure schema was really dropped
}
### Returns true (1) if table exists in database, 0 otherwise
sub table_exist {
my ($self, $table_name)=@_;
$self->throw("Cannot create database: database is not connected") unless $self->is_connected;
$table_name = lc($table_name);
my $query = "SELECT tablename FROM pg_tables WHERE tablename='$table_name'";
my $dbh=$self->dbh;
my $rslt = $dbh->selectrow_arrayref($query);
return $rslt ? 1 : 0;
}
sub create {
my $self=shift;
$self->throw("Cannot create database: database is not connected") unless $self->is_connected;
$self->drop if $self->exists;
my @sql;
while(my($table,$schema)=each %SCHEMA) {
push(@sql,qq(CREATE TABLE $table ($schema)));
if ($INDICIES{$table}) {
my $num=0;
foreach my $tbl_index (@{ $INDICIES{$table} }) {
my $index_name = $table .'_index_'. ($num+1);
push( @INDEX_NAMES, $index_name );
$INDICIES{$table}->[$num] eq 'id'?
push( @sql, qq(CREATE INDEX $index_name ON $table USING BTREE ($INDICIES{$table}->[$num])) ) :
push( @sql, qq(CREATE INDEX $index_name ON $table ($INDICIES{$table}->[$num])) );
$num++;
}
}
}
$self->do_sql(@sql);
$self->exists('DOIT'); # make sure schema was really created
}
sub analyze {
my $self=shift;
$self->throw("Cannot analyze database: database is not connected") unless $self->is_connected;
my @sql=map {qq(ANALYZE $_)} @TABLES;
$self->do_sql(@sql);
}
# load dots and connectdots
sub load_init {
my($self,$load_name,$load_save,$load_chunksize)=@_;
my $max=$self->dbh->selectrow_array
(qq(select max(connector_id) from connectdot)) || 0;
$self->set
(load_name=>$load_name,
load_save=>$load_save,
load_chunksize=>$load_chunksize||$LOAD_CHUNKSIZE,
load_cid_base=>$max,
_load_fh=>undef,_load_count=>0,_load_chunk=>0);
}
sub load_row {
my($self,$connector_id,$connectorset_id,$id,$dotset_id,$label_id)=@_;
my($ext_directory,$load_name,$load_fh,$load_count,$load_chunk)=
$self->get(qw(ext_directory load_name _load_fh _load_count _load_chunk));
my $load_file="$ext_directory/load.$load_name.$load_chunk";
if (!defined $load_fh) {
open($load_fh, "> $load_file") || $self->throw("Cannot open load file $load_file: $!");
$self->_load_fh($load_fh);
} elsif ($load_count>=$self->load_chunksize) {
close $load_fh;
$self->load($load_file);
$load_chunk++;
$load_count=0;
my $load_file="$ext_directory/load.$load_name.$load_chunk"; # bug found by YW 04-01-15
open($load_fh, "> $load_file") || $self->throw("Cannot open load file $load_file: $!");
$self->set(_load_fh=>$load_fh,_load_chunk=>$load_chunk);
}
$connector_id+=$self->load_cid_base;
$id=$self->escape($id); # escape special chars
print $load_fh join("\t",$connector_id,$connectorset_id,$dotset_id,$label_id,$id),"\n";
$self->_load_count($load_count+1);
}
sub load_finish {
my($self)=@_;
my($ext_directory,$load_name,$load_fh,$load_count,$load_chunk)=
$self->get(qw(ext_directory load_name _load_fh _load_count _load_chunk));
if (defined $load_fh) {
close $load_fh;
my $load_file="$ext_directory/load.$load_name.$load_chunk";
$self->load($load_file,'last');
}
}
sub load {
my($self,$load_file,$last)=@_;
my $dbh=$self->dbh;
my @sql;
push(@sql,
qq(set enable_hashjoin to off),
qq(set enable_mergejoin to off));
push(@sql, # load data
qq(COPY cdload (connector_id,connectorset_id,dotset_id,label_id,id) FROM '$load_file'));
push(@sql, qq(SELECT cdload.connector_id,cdload.connectorset_id,cdload.dotset_id,dot.dot_id,cdload.label_id,cdload.id
INTO TABLE cdload_dot
FROM cdload LEFT JOIN dot ON cdload.id=dot.id));
push(@sql,qq(INSERT INTO dot (dotset_id,id) SELECT DISTINCT dotset_id,id FROM cdload_dot WHERE dot_id IS NULL));
push(@sql,qq(INSERT INTO connectdot (connector_id,connectorset_id,dot_id,label_id,id)
SELECT connector_id,connectorset_id,dot_id,label_id,id FROM cdload_dot WHERE dot_id IS NOT NULL));
push(@sql,qq(INSERT INTO connectdot (connector_id,connectorset_id,dot_id,label_id,id)
SELECT cdload_dot.connector_id,cdload_dot.connectorset_id,dot.dot_id,cdload_dot.label_id,cdload_dot.id
FROM cdload_dot,dot
WHERE cdload_dot.dot_id IS NULL AND cdload_dot.id=dot.id));
push(@sql,qq(DROP TABLE cdload));
push(@sql,qq(CREATE TABLE cdload ($SCHEMA{'cdload'})));
push(@sql,qq(DROP TABLE cdload_dot));
push(@sql,qq(ANALYZE));
$self->do_sql(@sql);
$self->do_sql(qq(set enable_hashjoin to on));
$self->do_sql(qq(set enable_mergejoin to on));
unlink($load_file) unless $self->load_save eq 'all' || ($last && $self->load_save eq $last) ;
}
sub ext_directory {
my $self=shift;
if (@_) {
my $ext_directory=shift;
mkpath([$ext_directory]) if $ext_directory;
return $self->_ext_directory($ext_directory);
}
$self->_ext_directory;
}
sub create_table_sql {
my($self,$name,$sql,$indexed_columns,$sql_columns)=@_;
$name = lc($name); # Postgres has inconsistent support for capitalization of table names
my @sql;
push (@sql, "DROP TABLE $name") if $self->table_exist($name);
push (@sql, "CREATE TABLE $name AS $sql");
my $num=0;
foreach (@$indexed_columns) {
my $index_name = $name ."_index_".$_ . $num ;
push( @INDEX_NAMES, $index_name );
push( @sql, qq(CREATE INDEX $index_name ON $name ($_)) );
$num++;
}
push (@sql, "ANALYZE $name");
$self->do_sql(@sql);
}
sub create_file_sql {
my($self,$file,$sql)=@_;
unlink($file);
# print "$sql ",`date`;
my $dbh=$self->dbh;
$dbh->do($sql) || $self->throw($dbh->errstr);
}
sub do_sql {
my $self=shift;
my @sql=_flatten(@_);
$self->throw("Cannot run SQL: database is not connected") unless $self->is_connected;
my $dbh=$self->dbh;
for my $sql (@sql) {
if($self->sql_log) {
my $file = $self->sql_log;
open (LOG, ">>$file") or $self->throw("Can not open SQL log file: $file");
print LOG "#", `date`;
print LOG "$sql\n\n";
close(LOG);
}
$dbh->do($sql) || do { print "### SQL: $sql\n"; $self->throw($dbh->errstr); }
}
}
sub quote {
my($self,$value)=@_;
$self->dbh->quote($value);
}
sub quote_dot {
my($self,$value)=@_;
$self->dbh->quote($value);
}
sub escape {
my($self,$field)=@_;
my $q_field=$self->dbh->quote($field);
$q_field=~s/^\'|\'$//g;
$q_field;
}
sub _flatten {map {'ARRAY' eq ref $_? @$_: $_} @_;}
1;
__END__
=head1 NAME
Bio::ConnectDots::DB -- Database adapter for 'connect-the-dots'
=head1 SYNOPSIS
use Bio::ConnectDots::DB;
my $db=new Bio::ConnectDots::DB
(-database=>'test',-host=>'socks',-user=>'ngoodman',-password=>'secret');
=head1 DESCRIPTION
This class manages database connections and encapsulates all database
access for 'connect-the-dots'.
=head1 AUTHOR - David Burdick, Nat Goodman
Email dburdick@systemsbiology.org, natg@shore.net
=head1 COPYRIGHT
Copyright (c) 2005 Institute for Systems Biology (ISB). All Rights Reserved.
This module is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
=head1 APPENDIX
The rest of the documentation describes the methods.
=head2 Constructors
Title : new
Usage : $db=new Bio::ConnectDots::DB
(-database=>'test',-host=>'socks',-user=>'ngoodman',-password=>'secret');
Function: Connects to database
Args : -database => name of PostgreSQL database to use
-host => hostname of PostgreSQL database server
-server => synonym for host
-user => name of PostgreSQL user
-password => password of PostgreSQL user
-ext_directory => directory for temporary files used for loading and fetching data
default /usr/tmp/<user>, eg, /usr/tmp/ngoodman
-load_save => controls whether load files are saved after use. Helpful
for debugging
default - files not saved
'all' -- files are saved
'last' -- only last file is saved
-load_chunksize => number of Dots loaded at a time. Tuning parameter.
default 100000
Returns : Bio::ConnectDots::DB object
=head2 Methods to manage database
Title : exists
Usage : print "Database exists" if $db->exists
Function: Tells whether the 'connect-the-dots' database exists
Returns : boolean
Title : drop
Usage : $db->drop;
Function: Drop all 'connect-the-dots' tables
Returns : Nothing
Note : Only drops the built-in tables, not the ones created by queries
Title : create
Usage : $db->create;
Function: Create all 'connect-the-dots' tables
Returns : Nothing
Title : analyze
Usage : $db->analyze;
Function: Run ANALYZE TABLE on all built-in 'connect-the-dots' tables
Returns : Nothing
=cut