The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package DBIx::DBSchema::DBD::Pg;
use base qw(DBIx::DBSchema::DBD);

use strict;
use DBD::Pg 1.41;

our $VERSION = '0.20';

our %typemap = (
  'BLOB'           => 'BYTEA',
  'LONG VARBINARY' => 'BYTEA',
  'TIMESTAMP'      => 'TIMESTAMP WITH TIME ZONE',
);

=head1 NAME

DBIx::DBSchema::DBD::Pg - PostgreSQL native driver for DBIx::DBSchema

=head1 SYNOPSIS

use DBI;
use DBIx::DBSchema;

$dbh = DBI->connect('dbi:Pg:dbname=database', 'user', 'pass');
$schema = new_native DBIx::DBSchema $dbh;

=head1 DESCRIPTION

This module implements a PostgreSQL-native driver for DBIx::DBSchema.

=cut

sub default_db_schema  { 'public'; }

sub columns {
  my($proto, $dbh, $table) = @_;
  my $sth = $dbh->prepare(<<END) or die $dbh->errstr;
    SELECT a.attname, t.typname, a.attlen, a.atttypmod, a.attnotnull,
           a.atthasdef, a.attnum
    FROM pg_class c, pg_attribute a, pg_type t
    WHERE c.relname = '$table'
      AND a.attnum > 0 AND a.attrelid = c.oid AND a.atttypid = t.oid
    ORDER BY a.attnum
END
  $sth->execute or die $sth->errstr;

  map {

    my $type = $_->{'typname'};
    $type = 'char' if $type eq 'bpchar';

    my $len = '';
    if ( $_->{attlen} == -1 && $_->{atttypmod} != -1 
         && $_->{typname} ne 'text'                  ) {
      $len = $_->{atttypmod} - 4;
      if ( $_->{typname} eq 'numeric' ) {
        $len = ($len >> 16). ','. ($len & 0xffff);
      }
    }

    my $default = '';
    if ( $_->{atthasdef} ) {
      my $attnum = $_->{attnum};
      my $d_sth = $dbh->prepare(<<END) or die $dbh->errstr;
        SELECT substring(d.adsrc for 128) FROM pg_attrdef d, pg_class c
        WHERE c.relname = '$table' AND c.oid = d.adrelid AND d.adnum = $attnum
END
      $d_sth->execute or die $d_sth->errstr;

      $default = $d_sth->fetchrow_arrayref->[0];

      if ( _type_needs_quoting($type) ) {
        $default =~ s/::([\w ]+)$//; #save typecast info?
        if ( $default =~ /^'(.*)'$/ ) {
          $default = $1;
          $default = \"''" if $default eq '';
        } else {
          my $value = $default;
          $default = \$value;
        }
      } elsif ( $default =~ /^[a-z]/i ) { #sloppy, but it'll do
        my $value = $default;
        $default = \$value;
      }

    }

    [
      $_->{'attname'},
      $type,
      ! $_->{'attnotnull'},
      $len,
      $default,
      ''  #local
    ];

  } @{ $sth->fetchall_arrayref({}) };
}

sub primary_key {
  my($proto, $dbh, $table) = @_;
  my $sth = $dbh->prepare(<<END) or die $dbh->errstr;
    SELECT a.attname, a.attnum
    FROM pg_class c, pg_attribute a, pg_type t
    WHERE c.relname = '${table}_pkey'
      AND a.attnum > 0 AND a.attrelid = c.oid AND a.atttypid = t.oid
END
  $sth->execute or die $sth->errstr;
  my $row = $sth->fetchrow_hashref or return '';
  $row->{'attname'};
}

sub unique {
  my($proto, $dbh, $table) = @_;
  my $gratuitous = { map { $_ => [ $proto->_index_fields($dbh, $_ ) ] }
      grep { $proto->_is_unique($dbh, $_ ) }
        $proto->_all_indices($dbh, $table)
  };
}

sub index {
  my($proto, $dbh, $table) = @_;
  my $gratuitous = { map { $_ => [ $proto->_index_fields($dbh, $_ ) ] }
      grep { ! $proto->_is_unique($dbh, $_ ) }
        $proto->_all_indices($dbh, $table)
  };
}

sub _all_indices {
  my($proto, $dbh, $table) = @_;
  my $sth = $dbh->prepare(<<END) or die $dbh->errstr;
    SELECT c2.relname
    FROM pg_class c, pg_class c2, pg_index i
    WHERE c.relname = '$table' AND c.oid = i.indrelid AND i.indexrelid = c2.oid
END
  $sth->execute or die $sth->errstr;
  map { $_->{'relname'} }
    grep { $_->{'relname'} !~ /_pkey$/ }
      @{ $sth->fetchall_arrayref({}) };
}

sub _index_fields {
  my($proto, $dbh, $index) = @_;
  my $sth = $dbh->prepare(<<END) or die $dbh->errstr;
    SELECT a.attname, a.attnum
    FROM pg_class c, pg_attribute a, pg_type t
    WHERE c.relname = '$index'
      AND a.attnum > 0 AND a.attrelid = c.oid AND a.atttypid = t.oid
    ORDER BY a.attnum
END
  $sth->execute or die $sth->errstr;
  map { $_->{'attname'} } @{ $sth->fetchall_arrayref({}) };
}

sub _is_unique {
  my($proto, $dbh, $index) = @_;
  my $sth = $dbh->prepare(<<END) or die $dbh->errstr;
    SELECT i.indisunique
    FROM pg_index i, pg_class c, pg_am a
    WHERE i.indexrelid = c.oid AND c.relname = '$index' AND c.relam = a.oid
END
  $sth->execute or die $sth->errstr;
  my $row = $sth->fetchrow_hashref or die 'guru meditation #420';
  $row->{'indisunique'};
}

#using this
#******** QUERY **********
#SELECT conname,
#  pg_catalog.pg_get_constraintdef(r.oid, true) as condef
#FROM pg_catalog.pg_constraint r
#WHERE r.conrelid = '16457' AND r.contype = 'f' ORDER BY 1;
#**************************

# what's this do?
#********* QUERY **********
#SELECT conname, conrelid::pg_catalog.regclass,
#  pg_catalog.pg_get_constraintdef(c.oid, true) as condef
#FROM pg_catalog.pg_constraint c
#WHERE c.confrelid = '16457' AND c.contype = 'f' ORDER BY 1;
#**************************

sub constraints {
  my($proto, $dbh, $table) = @_;
  my $sth = $dbh->prepare(<<END) or die $dbh->errstr;
    SELECT conname, pg_catalog.pg_get_constraintdef(r.oid, true) as condef
      FROM pg_catalog.pg_constraint r
        WHERE r.conrelid = ( SELECT oid FROM pg_class
                               WHERE relname = '$table'
                                 AND pg_catalog.pg_table_is_visible(oid)
                           )
          AND r.contype = 'f'
END
  $sth->execute;

  map { $_->{condef}
        =~ /^FOREIGN KEY \(([\w\, ]+)\) REFERENCES (\w+)\(([\w\, ]+)\)\s*(.*)$/i
            or die "unparsable constraint: ". $_->{condef};
        my($columns, $table, $references, $etc ) = ($1, $2, $3, $4);
        my $match = ( $etc =~ /MATCH (\w+)/i ) ? "MATCH $1" : '';
        my $on_delete = ( $etc =~ /ON DELETE ((NO |SET )?\w+)/i ) ? $1 : '';
        my $on_update = ( $etc =~ /ON UPDATE ((NO |SET )?\w+)/i ) ? $1 : '';
        +{ 'constraint' => $_->{conname},
           'columns'    => [ split(/,\s*/, $columns) ],
           'table'      => $table,
           'references' => [ split(/,\s*/, $references) ],
           'match'      => $match,
           'on_delete'  => $on_delete,
           'on_update'  => $on_update,
         };
      }
    grep $_->{condef} =~ /^\s*FOREIGN\s+KEY/,
      @{ $sth->fetchall_arrayref( {} ) };
}

sub add_column_callback {
  my( $proto, $dbh, $table, $column_obj ) = @_;
  my $name = $column_obj->name;

  my $pg_server_version = $dbh->{'pg_server_version'};
  my $warning = '';
  unless ( $pg_server_version =~ /\d/ ) {
    $warning = "WARNING: no pg_server_version!  Assuming >= 7.3\n";
    $pg_server_version = 70300;
  }

  my $hashref = { 'sql_after' => [], };

  if ( $column_obj->type =~ /^(\w*)SERIAL$/i ) {

    $hashref->{'effective_type'} = uc($1).'INT';

    #needs more work for old Pg?
      
    my $nextval;
    warn $warning if $warning;
    if ( $pg_server_version >= 70300 ) {
      my $db_schema  = default_db_schema();
      $nextval = "nextval('$db_schema.${table}_${name}_seq'::text)";
    } else {
      $nextval = "nextval('${table}_${name}_seq'::text)";
    }

    push @{ $hashref->{'sql_after'} }, 
      "ALTER TABLE $table ALTER COLUMN $name SET DEFAULT $nextval",
      "CREATE SEQUENCE ${table}_${name}_seq",
      "UPDATE $table SET $name = $nextval WHERE $name IS NULL",
    ;

  }

  if ( ! $column_obj->null ) {
    $hashref->{'effective_null'} = 'NULL';

    warn $warning if $warning;
    if ( $pg_server_version >= 70300 ) {

      push @{ $hashref->{'sql_after'} },
        "ALTER TABLE $table ALTER $name SET NOT NULL";

    } else {

      push @{ $hashref->{'sql_after'} },
        "UPDATE pg_attribute SET attnotnull = TRUE ".
        " WHERE attname = '$name' ".
        " AND attrelid = ( SELECT oid FROM pg_class WHERE relname = '$table' )";

    }

  }

  $hashref;

}

sub alter_column_callback {
  my( $proto, $dbh, $table, $old_column, $new_column ) = @_;
  my $name = $old_column->name;

  my %canonical = (
    'SMALLINT'         => 'INT2',
    'INT'              => 'INT4',
    'BIGINT'           => 'INT8',
    'SERIAL'           => 'INT4',
    'BIGSERIAL'        => 'INT8',
    'DECIMAL'          => 'NUMERIC',
    'REAL'             => 'FLOAT4',
    'DOUBLE PRECISION' => 'FLOAT8',
    'BLOB'             => 'BYTEA',
    'TIMESTAMP'        => 'TIMESTAMPTZ',
  );
  foreach ($old_column, $new_column) {
    $_->type($canonical{uc($_->type)}) if $canonical{uc($_->type)};
  }

  my $pg_server_version = $dbh->{'pg_server_version'};
  my $warning = '';
  unless ( $pg_server_version =~ /\d/ ) {
    $warning = "WARNING: no pg_server_version!  Assuming >= 7.3\n";
    $pg_server_version = 70300;
  }

  my $hashref = {};

  #change type
  if ( ( $canonical{uc($old_column->type)} || uc($old_column->type) )
         ne ( $canonical{uc($new_column->type)} || uc($new_column->type) )
       || $old_column->length ne $new_column->length
     )
  {

    warn $warning if $warning;
    if ( $pg_server_version >= 80000 ) {

      $hashref->{'sql_alter_type'} =
        "ALTER COLUMN ". $new_column->name.
        " TYPE ". $new_column->type.
        ( ( defined($new_column->length) && $new_column->length )
              ? '('.$new_column->length.')'
              : ''
        )

    } else {
      warn "WARNING: can't yet change column types for Pg < version 8\n";
    }

  }

  # change nullability from NOT NULL to NULL
  if ( ! $old_column->null && $new_column->null ) {

    warn $warning if $warning;
    if ( $pg_server_version < 70300 ) {
      $hashref->{'sql_alter_null'} =
        "UPDATE pg_attribute SET attnotnull = FALSE
          WHERE attname = '$name'
            AND attrelid = ( SELECT oid FROM pg_class
                               WHERE relname = '$table'
                           )";
    }

  }

  # change nullability from NULL to NOT NULL...
  # this one could be more complicated, need to set a DEFAULT value and update
  # the table first...
  if ( $old_column->null && ! $new_column->null ) {

    warn $warning if $warning;
    if ( $pg_server_version < 70300 ) {
      $hashref->{'sql_alter_null'} =
        "UPDATE pg_attribute SET attnotnull = TRUE
           WHERE attname = '$name'
             AND attrelid = ( SELECT oid FROM pg_class
                                WHERE relname = '$table'
                            )";
    }

  }

  $hashref;

}

sub column_value_needs_quoting {
  my($proto, $col) = @_;
  _type_needs_quoting($col->type);
}

sub _type_needs_quoting {
  my $type = shift;
  $type !~ m{^(
               int(?:2|4|8)?
             | smallint
             | integer
             | bigint
             | (?:numeric|decimal)(?:\(\d+(?:\s*\,\s*\d+\))?)?
             | real
             | double\s+precision
             | float(?:\(\d+\))?
             | serial(?:4|8)?
             | bigserial
             )$}ix;
}


=head1 AUTHOR

Ivan Kohler <ivan-dbix-dbschema@420.am>

=head1 COPYRIGHT

Copyright (c) 2000 Ivan Kohler
Copyright (c) 2000 Mail Abuse Prevention System LLC
Copyright (c) 2009-2013 Freeside Internet Services, Inc.
All rights reserved.
This program is free software; you can redistribute it and/or modify it under
the same terms as Perl itself.

=head1 BUGS

columns doesn't return column default information.

=head1 SEE ALSO

L<DBIx::DBSchema>, L<DBIx::DBSchema::DBD>, L<DBI>, L<DBI::DBD>

=cut 

1;