The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package UR::DataSource::Pg;
use strict;
use warnings;

require UR;
our $VERSION = "0.46"; # UR $VERSION;

UR::Object::Type->define(
    class_name => 'UR::DataSource::Pg',
    is => ['UR::DataSource::RDBMS'],
    is_abstract => 1,
);

# RDBMS API

sub driver { "Pg" }

#sub server {
#    my $self = shift->_singleton_object();
#    $self->_init_database;
#    return $self->_database_file_path;
#}

sub owner { shift->_singleton_object->login }

#sub login {
#    undef
#}
#
#sub auth {
#    undef
#}

sub _default_sql_like_escape_string { return '\\\\' };

sub _format_sql_like_escape_string {
    my $class = shift;
    my $escape = shift;
    return "E'$escape'";
}

sub can_savepoint { 1;}

sub set_savepoint {
my($self,$sp_name) = @_;

    my $dbh = $self->get_default_handle;
    $dbh->pg_savepoint($sp_name);
}

sub rollback_to_savepoint {
my($self,$sp_name) = @_;

    my $dbh = $self->get_default_handle;
    $dbh->pg_rollback_to($sp_name);
}


*_init_created_dbh = \&init_created_handle;
sub init_created_handle
{
    my ($self, $dbh) = @_;
    return unless defined $dbh;
    $dbh->{LongTruncOk} = 0;
    return $dbh;
}

sub _ignore_table {
    my $self = shift;
    my $table_name = shift;
    return 1 if $table_name =~ /^(pg_|sql_)/;
}


sub _get_next_value_from_sequence {
my($self,$sequence_name) = @_;

    # we may need to change how this db handle is gotten
    my $dbh = $self->get_default_handle;
    my($new_id) = $dbh->selectrow_array("SELECT nextval('$sequence_name')");

    if ($dbh->err) {
        die "Failed to prepare SQL to generate a column id from sequence: $sequence_name.\n" . $dbh->errstr . "\n";
        return;
    }

    return $new_id;
}

# The default for PostgreSQL's serial datatype is to create a sequence called
# tablename_columnname_seq
sub _get_sequence_name_for_table_and_column {
my($self,$table_name, $column_name) = @_;
    return sprintf("%s_%s_seq",$table_name, $column_name);
}


sub get_bitmap_index_details_from_data_dictionary {
    # FIXME Postgres has bitmap indexes, but we don't support them yet.  See the Oracle
    # datasource module for details about how to get it working
    return [];
}


sub get_unique_index_details_from_data_dictionary {
    my($self, $owner_name, $table_name) = @_;

    my $sql = qq(
        SELECT c_index.relname, a.attname
        FROM pg_catalog.pg_class c_table
        JOIN pg_catalog.pg_index i ON i.indrelid = c_table.oid
        JOIN pg_catalog.pg_class c_index ON c_index.oid = i.indexrelid
        JOIN pg_catalog.pg_attribute a ON a.attrelid = c_index.oid
        JOIN pg_catalog.pg_namespace n ON c_table.relnamespace = n.oid
        WHERE c_table.relname = ? AND n.nspname = ?
          and (i.indisunique = 't' or i.indisprimary = 't')
          and i.indisvalid = 't'
    );
    
    my $dbh = $self->get_default_handle();
    return undef unless $dbh;

    my $sth = $dbh->prepare($sql);
    return undef unless $sth;

    #my $db_owner = $self->owner();  # We should probably do something with the owner/schema
    $sth->execute($table_name, $owner_name);

    my $ret;
    while (my $data = $sth->fetchrow_hashref()) {
        $ret->{$data->{'relname'}} ||= [];
        push @{ $ret->{ $data->{'relname'} } }, $data->{'attname'};
    }

    return $ret;
}

my %ur_data_type_for_vendor_data_type = (
     # DB type      UR Type
     'SMALLINT'  => ['Integer', undef],
     'BIGINT'    => ['Integer', undef],
     'SERIAL'    => ['Integer', undef],
     'TEXT'      => ['Text', undef],
     'BYTEA'     => ['Blob', undef],
     'CHARACTER VARYING' => ['Text', undef],
     'TIMESTAMP WITHOUT TIME ZONE' => ['DateTime', undef],
     'NUMERIC'   => ['Number', undef],

     'DOUBLE PRECISION' => ['Number', undef],
);
sub ur_data_type_for_data_source_data_type {
    my($class,$type) = @_;

    $type = $class->normalize_vendor_type($type);
    my $urtype = $ur_data_type_for_vendor_data_type{$type};
    unless (defined $urtype) {
        $urtype = $class->SUPER::ur_data_type_for_data_source_data_type($type);
    }
    return $urtype;
}

sub _vendor_data_type_for_ur_data_type {
    return ( BOOLEAN     => 'BOOLEAN',
             XML         => 'XML',
             shift->SUPER::_vendor_data_type_for_ur_data_type(),
            );
}

sub _alter_sth_for_selecting_blob_columns {
    my($self, $sth, $column_objects) = @_;

    for (my $n = 0; $n < @$column_objects; $n++) {
        next unless defined ($column_objects->[$n]);  # No metaDB info for this one
        if (uc($column_objects->[$n]->data_type) eq 'BLOB') {
            require DBD::Pg;
            $sth->bind_param($n+1, undef, { pg_type => DBD::Pg::PG_BYTEA() });
        }
    }
}

my $DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS';
my $TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.US';
sub cast_for_data_conversion {
    my($class, $left_type, $right_type, $operator, $sql_clause) = @_;

    my @retval = ('%s','%s');

    # compatible types
    if ($left_type->isa($right_type)
        or
        $right_type->isa($left_type)
    ) {
        return @retval;
    }

    # So far, the only casting is to support using 'like' and one or both are strings
    if ($operator ne 'like'
        or
        ( ! $left_type->isa('UR::Value::Text') and ! $right_type->isa('UR::Value::Text') )
    ) {
        return @retval;
    }

    # Figure out which one is the non-string
    my($data_type, $i) = $left_type->isa('UR::Value::Text')
                        ? ( $right_type, 1)
                        : ( $left_type, 0);

    if ($data_type->isa('UR::Value::Timestamp')) {
        $retval[$i] = qq{to_char(%s, '$TIMESTAMP_FORMAT')};

    } elsif ($data_type->isa('UR::Value::DateTime')) {
        $retval[$i] = qq{to_char(%s, '$DATE_FORMAT')};

    } else {
        @retval = $class->SUPER::cast_for_data_conversion($left_type, $right_type, $operator);
    }

    return @retval;
}

sub _resolve_order_by_clause_for_column {
    my($self, $column_name, $query_plan, $property_meta) = @_;

    my $column_clause = $column_name;

    my $is_text_type = $property_meta->is_text;
    if ($is_text_type) {
        # Tell the DB to sort the same order as Perl's cmp
        $column_clause .= q( COLLATE "C");
    }

    my $is_desc = $query_plan->order_by_column_is_descending($column_name);
    if ($is_desc) {
        $column_clause .= q( DESC);
    }

    return $column_clause;
}

sub _assure_schema_exists_for_table {
    my($self, $table_name, $dbh) = @_;

    $dbh ||= $self->get_default_handle;

    my ($schema_name, undef) = $self->_extract_schema_and_table_name($table_name);
    if ($schema_name) {
        my $exists = $dbh->selectrow_array("SELECT schema_name FROM information_schema.schemata WHERE schema_name = ?;",
            undef, $schema_name);
        unless ($exists) {
            $dbh->do("CREATE SCHEMA $schema_name")
                or Carp::croak("Could not create schema $schema_name: " . $dbh->errstr);
        }
    }
}

1;

=pod

=head1 NAME

UR::DataSource::Pg - PostgreSQL specific subclass of UR::DataSource::RDBMS

=head1 DESCRIPTION

This module provides the PostgreSQL-specific methods necessary for interacting with
PostgreSQL databases

=head1 SEE ALSO

L<UR::DataSource>, L<UR::DataSource::RDBMS>

=cut