The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

######################################################################
## File: $Id: DBI.pm 10837 2008-02-26 17:31:14Z spadkins $
######################################################################

use App;
use App::Repository;

package App::Repository::DBI;
$VERSION = (q$Revision: 10837 $ =~ /(\d[\d\.]*)/)[0];  # VERSION numbers generated by svn

@ISA = ( "App::Repository" );

use Data::Dumper;

use strict;

=head1 NAME

App::Repository::DBI - a repository which relies on a DBI interface to a relational database (no caching)

=head1 SYNOPSIS

   use App::Repository::DBI;

   (see man page for App::Repository for additional methods)

   $rep = App::Repository::DBI->new();        # looks for %ENV, then config file
   $rep = App::Repository::DBI->new("mysql","mydb","user001","pass001");
   $rep = App::Repository::DBI->new("mysql","mydb","user001","pass001","port=3307");
   $rep = App::Repository::DBI->new("mysql","mydb","user001","pass001","port=3307","user001");

   $ok = $rep->_connect();         # initialize repository (will happen automatically in constructor)
   $ok = $rep->_disconnect();      # cleanup repository (will happen automatically in destructor)
   $rep->_is_connected();          # returns 1 if connected (ready for use), 0 if not
   $errmsg = $rep->error();       # returns the error string for prev op ("" if no error)
   $numrows = $rep->numrows();    # returns the number of rows affected by prev op
   print $rep->error(), "\n" if (!$rep->_connect());

   $value  = $rep->get ($table, $key,     $col,   \%options);
   $value  = $rep->get ($table, \%params, $col,   \%options);
   @row    = $rep->get ($table, $key,     \@cols, \%options);
   @row    = $rep->get ($table, \%params, \@cols, \%options);

   $nrows = $rep->set($table, $key,     $col,   $value,    \%options);
   $nrows = $rep->set($table, \%params, $col,   $value,    \%options);

   $row    = $rep->get_row ($table, $key,     \@cols, \%options);
   $row    = $rep->get_row ($table, \%params, \@cols, \%options);

   $nrows = $rep->set_row($table, $key,     \@cols, $row, \%options);
   $nrows = $rep->set_row($table, \%params, \@cols, $row, \%options);
   $nrows = $rep->set_row($table, undef,    \@cols, $row, \%options);

   $colvalues = $rep->get_column ($table, \%params, $col, \%options);

   $rows = $rep->get_rows ($table, \%params, \@cols, \%options);
   $rows = $rep->get_rows ($table, \%params, $col,   \%options);
   $rows = $rep->get_rows ($table, \@keys,   \@cols, \%options);

   $nrows = $rep->set_rows($table, \%params, \@cols, $rows, \%options);
   $nrows = $rep->set_rows($table, undef,    \@cols, $rows, \%options);
   $nrows = $rep->set_rows($table, \@keys,   \@cols, $rows, \%options);

   $values = $rep->get_values ($table, $key,     \@cols, \%options);
   $values = $rep->get_values ($table, \%params, \@cols, \%options);
   $values = $rep->get_values ($table, $key,     undef,  \%options);
   $values = $rep->get_values ($table, \%params, undef,  \%options);

   $values_list = $rep->get_values_list ($table, $key,     \@cols, \%options);
   $values_list = $rep->get_values_list ($table, \%params, \@cols, \%options);
   $values_list = $rep->get_values_list ($table, $key,     undef,  \%options);
   $values_list = $rep->get_values_list ($table, \%params, undef,  \%options);

   $nrows = $rep->set_values ($table, $key,     \@cols, $values, \%options);
   $nrows = $rep->set_values ($table, $key,     undef,  $values, \%options);
   $nrows = $rep->set_values ($table, undef,    \@cols, $values, \%options);
   $nrows = $rep->set_values ($table, undef,    undef,  $values, \%options);
   $nrows = $rep->set_values ($table, \%params, \@cols, $values, \%options);
   $nrows = $rep->set_values ($table, \%params, undef,  $values, \%options);

=cut

######################################################################
# ATTRIBUTES
######################################################################

# CONNECTION ATTRIBUTES
# $self->{dbdriver}   # standard DBI driver name ("mysql", "Oracle", etc.)
# $self->{dbname}     # the name of the database
# $self->{dbuser}     # database user name
# $self->{dbpass}     # database password
# $self->{dbschema}   # name of the schema within the database
# $self->{dbioptions} # additional dbi options to tack onto the dsn
# $self->{dbh}        # open DBI database handle

######################################################################
# INHERITED ATTRIBUTES
######################################################################

# BASIC
# $self->{name}       # name of this repository (often "db")
# $self->{conf}       # hash of config file data

# CURRENT STATE
# $self->{error}      # most recent error generated from this module
# $self->{numrows}

# METADATA - Database Types
# $self->{types}
# $self->{type}{$type}{name}
# $self->{type}{$type}{num}
# $self->{type}{$type}{type}
# $self->{type}{$type}{column_size}
# $self->{type}{$type}{literal_prefix}
# $self->{type}{$type}{literal_suffix}
# $self->{type}{$type}{unsigned_attribute}
# $self->{type}{$type}{auto_unique_value}
# $self->{type}{$type}{quoted}

# METADATA - Tables and Columns
# $self->{table_names}
# $self->{table}{$table}{readonly}
# $self->{table}{$table}{columns}
# $self->{table}{$table}{column}{$column}
# $self->{table}{$table}{column}{$column}{name}
# $self->{table}{$table}{column}{$column}{type_name}
# $self->{table}{$table}{column}{$column}{type}
# $self->{table}{$table}{column}{$column}{notnull}
# $self->{table}{$table}{column}{$column}{quoted}

=head1 DESCRIPTION

The App::Repository::DBI class encapsulates all access to the database,
changing SQL statements into get(), save(), and delete() methods.

=cut

#############################################################################
# PUBLIC METHODS
#############################################################################

=head1 Public Methods

=cut

#############################################################################
# _connect()
#############################################################################

=head2 _connect()

    * Signature: $repository->_connect();
    * Param:     void
    * Return:    void
    * Throws:    App::Exception::Repository
    * Since:     0.50

    Sample Usage: 

    $repository->_connect();

Connects to the repository.  Most repositories have some connection
initialization that takes time and therefore should be done once.
Then many operations may be executed against the repository.
Finally the connection to the repository is closed (_disconnect()).

The default implementation of _connect() does nothing.
It is intended to be overridden in the subclass (if necessary).

=cut

sub _connect {
    &App::sub_entry if ($App::trace);
    my $self = shift;

    if (!defined $self->{dbh}) {
        my $dsn = $self->_dsn();
        my $attr = $self->_attr();

        while (1) {
            eval {
                $self->{dbh} = DBI->connect($dsn, $self->{dbuser}, $self->{dbpass}, $attr);
            };
            if ($@) {
                delete $self->{dbh};
                my $retryable_connection_error_regex = $self->retryable_connection_error_regex();
                if ($@ =~ /$retryable_connection_error_regex/i) {
                    $self->{context}->log({level=>1},"DBI Exception (retrying) in _connect(): $@");
                    sleep(1);
                }
                else {
                    $self->{context}->log({level=>1},"DBI Exception (fail) in _connect(): $@");
                    die $@;
                }
            }
            else {
                last;
            }
        }
        die "Can't connect to database" if (!$self->{dbh});
        delete $self->{in_transaction};
    }

    &App::sub_exit(defined $self->{dbh}) if ($App::trace);
    return(defined $self->{dbh});
}

sub retryable_connection_error_regex {
    return "Lost connection|server has gone away";
}

sub retryable_modify_error_regex {
    return "Lost connection|server has gone away|Deadlock found";
}

# likely overridden at the subclass level
sub _dsn {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my ($dbdriver, $dbname, $dbuser, $dbpass, $dbschema);

    my $dsn = $self->{dbdsn};
    if (!$dsn) {
        my $dbdriver   = $self->{dbdriver} || $self->{dbdriver};
        my $dbname     = $self->{dbname};
        my $dbuser     = $self->{dbuser};
        my $dbpass     = $self->{dbpass};
        my $dbschema   = $self->{dbschema};

        die "ERROR: missing DBI driver and/or db name [$dbdriver,$dbname] in configuration.\n"
            if (!$dbdriver || !$dbname);

        $dsn = "dbi:${dbdriver}:database=${dbname}";
    }

    &App::sub_exit($dsn) if ($App::trace);
    return($dsn);
}

# likely overridden at the subclass level
sub _attr {
    &App::sub_entry if ($App::trace);
    my $attr = {
        PrintError         => 0,
        AutoCommit         => 1,
        RaiseError         => 1,
        #ShowErrorStatement => 1,  # this doesn't seem to include the right SQL statement. very confusing.
    };
    &App::sub_exit($attr) if ($App::trace);
    return($attr);
}

sub _dbh {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    &App::sub_exit($self->{dbh}) if ($App::trace);
    return($self->{dbh});
}

#############################################################################
# _disconnect()
#############################################################################

=head2 _disconnect()

    * Signature: $repository->_disconnect();
    * Param:     void
    * Return:    void
    * Throws:    App::Exception::Repository
    * Since:     0.50

    Sample Usage: 

    $repository->_disconnect();

Disconnects from the repository.

The default implementation of _disconnect() does nothing.
It is intended to be overridden in the subclass (if necessary).

All implementations of _disconnect() by a subclass must be sensitive to
whether the object is actually currently connected to the repository.
Thus, _disconnect() should be callable without negative consequences
even when the repository is already disconnected.

=cut

sub _disconnect {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    if (defined $self->{dbh} && !($self->{preconnected})) {
        my $dbh = $self->{dbh};
        $dbh->disconnect;
        delete $self->{dbh};
        delete $self->{in_transaction};
    }
    &App::sub_exit(1) if ($App::trace);
    1;
}

sub _disconnect_client_only {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    if ($self->{dbh}) {
        $self->{dbh}{InactiveDestroy} = 1;
        delete $self->{dbh};
        delete $self->{in_transaction};
    }
    &App::sub_exit(1) if ($App::trace);
    1;
}

sub _shutdown_unshareable_resources {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    $self->_disconnect_client_only();
    &App::sub_exit() if ($App::trace);
}

sub _is_connected {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    my $retval = ((defined $self->{dbh}) ? 1 : 0);
    &App::sub_exit($retval) if ($App::trace);
    return ($retval);
}

#############################################################################
# PRIVATE METHODS
#############################################################################

=head1 Private Methods

=cut

######################################################################
# INITIALIZATION
######################################################################

sub _init2 {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    my ($name);

    $name = $self->{name};
    if (defined $self->{dbh}) {
        $self->{preconnected} = 1;
    }
    else {
        my $options = $self->{context}{options} || {};

        my $config_from_options = 1;
        my $config_from_ext_options = 0;
        foreach my $var qw(dbdsn dbdriver dbhost dbport dbsocket dbname dbuser dbpass dbschema dbioptions) {
            if ($self->{$var}) {
                $config_from_options = 0;
            }
            if ($options->{"${name}.${var}"}) {
                $config_from_ext_options = 1;
            }
        }

        if ($config_from_options) {
            if ($config_from_ext_options) {
                foreach my $var qw(dbdsn dbdriver dbhost dbport dbsocket dbname dbuser dbpass dbschema dbioptions) {
                    if (defined $options->{"${name}.${var}"}) {
                        $self->{$var} = $options->{"${name}.${var}"};
                    }
                }
            }
            else {
                foreach my $var qw(dbdsn dbdriver dbhost dbport dbsocket dbname dbuser dbpass dbschema dbioptions) {
                    if (defined $options->{$var}) {
                        $self->{$var} = $options->{$var};
                    }
                }
            }
        }
    }
    &App::sub_exit() if ($App::trace);
}

sub _get_row {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $params, $cols, $options) = @_;

    # we only need the first row
    $options = {} if (!$options);
    if (! $options->{endrow}) {
        $options->{endrow} = $options->{startrow} || 1;
    }

    my ($sql, $dbh, $row);
    if ($self->{table}{$table}{rawaccess}) {
        $sql = $self->_mk_select_sql($table, $params, $cols, $options);
    }
    else {
        $sql = $self->_mk_select_joined_sql($table, $params, $cols, $options);
    }
    $self->{sql} = $sql;

    $dbh = $self->{dbh};
    if (!$dbh) {
        $self->_connect();
        $dbh = $self->{dbh};
    }

    my $context_options = $self->{context}{options};
    my $debug_sql = $context_options->{debug_sql};
    my ($timer, $elapsed_time);
    if ($debug_sql) {
        $timer = $self->_get_timer();
        print $App::DEBUG_FILE "DEBUG_SQL: _get_row()\n";
        print $App::DEBUG_FILE $sql;
    }
    if ($context_options->{explain_sql}) {
        $self->explain_sql($sql);
    }
    ### TODO: make this similar to the _connect code, using a regex named retryable_select_error_regex
    while (1) {
        eval {
            $row = $dbh->selectrow_arrayref($sql);
        };
        if ($@) {
            $row = undef;
            if ($@ =~ /Lost connection/ || $@ =~ /server has gone away/) {
                $self->{context}->log({level=>1},"DBI Exception (retrying) in _get_row(): $@");
                $self->_disconnect();
                sleep(1);
                $self->_connect();
                $dbh = $self->{dbh};
            }
            else {
                $self->{context}->log({level=>1},"DBI Exception (fail) in _get_rows(): $@$sql");
                die $@;
            }
        }
        else {
            last;
        }
    }
    if ($debug_sql) {
        $elapsed_time = $self->_read_timer($timer);
        print $App::DEBUG_FILE "DEBUG_SQL: nrows [", (defined $row ? 1 : 0), "] ($elapsed_time sec) $DBI::errstr\n";
        if ($debug_sql >= 2) {
            print $App::DEBUG_FILE "DEBUG_SQL: [", ($row ? join("|",map { defined $_ ? $_ : "undef" } @$row) : ""), "]\n";
        }
        print $App::DEBUG_FILE "\n";
    }

    &App::sub_exit($row) if ($App::trace);
    return($row);
}

sub _get_rows {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $params, $cols, $options) = @_;

    my ($sql, $rows, $startrow, $endrow);
    my $table_def = $self->get_table_def($table);
    #print $App::DEBUG_FILE "DBI._get_rows : table=[$table] rawaccess=[$table_def->{rawaccess}]\n";
    #if ($self->{table}{$table}{rawaccess}) {
    if ($table_def->{rawaccess}) {
        $sql = $self->_mk_select_sql($table, $params, $cols, $options);
    }
    else {
        $sql = $self->_mk_select_joined_sql($table, $params, $cols, $options);
    }
    $self->{sql} = $sql;
    die "empty SQL query for table [$table] (does table exist?)" if (!$sql);

    $self->_connect() if (!$self->{dbh});

    $options  = {} if (!$options);
    $startrow = $options->{startrow} || 0;
    $endrow   = $options->{endrow} || 0;

    my $context_options = $self->{context}{options};
    my $debug_sql = $context_options->{debug_sql};
    my ($timer, $elapsed_time);
    if ($debug_sql) {
        $timer = $self->_get_timer();
        print $App::DEBUG_FILE "DEBUG_SQL: _get_rows()\n";
        print $App::DEBUG_FILE $sql;
    }
    if ($context_options->{explain_sql}) {
        $self->explain_sql($sql);
    }
    while (1) {
        eval {
            $rows = $self->_selectrange_arrayref($sql, $startrow, $endrow);
        };
        if ($@) {
            $rows = [];
            if ($@ =~ /Lost connection/ || $@ =~ /server has gone away/) {
                $self->{context}->log({level=>1},"DBI Exception (retrying) in _get_rows(): $@");
                $self->_disconnect();
                sleep(1);
                $self->_connect();
            }
            else {
                $self->{context}->log({level=>1},"DBI Exception (fail) in _get_rows(): $@$sql");
                die $@;
            }
        }
        else {
            last;
        }
    }
    if ($debug_sql) {
        $elapsed_time = $self->_read_timer($timer);
        print $App::DEBUG_FILE "DEBUG_SQL: nrows [", (defined $rows ? ($#$rows+1) : 0), "] ($elapsed_time sec) $DBI::errstr\n";
        if ($debug_sql >= 2) {
            foreach my $row (@$rows) {
                print $App::DEBUG_FILE "DEBUG_SQL: [", join("|",map { defined $_ ? $_ : "undef"} @$row), "]\n";
            }
        }
        print $App::DEBUG_FILE "\n";
    }

    &App::sub_exit($rows) if ($App::trace);
    return($rows);
}

sub _get_default_columns {
    &App::sub_entry if ($App::trace);
    my ($self, $table) = @_;
    $self->_load_table_metadata($table) if (!defined $self->{table}{$table}{loaded});
    my $table_def = $self->{table}{$table};
    my $columns = $table_def->{default_columns} || "";
    if (ref($columns) eq "ARRAY") {
        # do nothing
    }
    elsif ($columns eq "configured") {
        $columns = $table_def->{columns};
    }
    elsif (!$columns || $columns eq "physical") {
        $columns = $table_def->{phys_columns};
    }
    if (!$columns || ref($columns) ne "ARRAY") {
        my $table_def = $self->{table}{$table};
        my $repname = $table_def->{repository};
        my $realtable = $table_def->{table} || $table;
        if (defined $repname && $repname ne $self->{name}) {
            my $rep = $self->{context}->repository($repname);
            $columns = $rep->_get_default_columns($realtable);
        }
        elsif (defined $realtable && $realtable ne $table) {
            $columns = $self->_get_default_columns($realtable);
        }
    }
    if (!$columns || ref($columns) ne "ARRAY") {
        $columns = [];
    }
    &App::sub_exit($columns) if ($App::trace);
    return($columns);
}

# modified from the DBD::_::db::selectall_arrayref in DBI.pm
sub _selectrange_arrayref {
    &App::sub_entry if ($App::trace);
    my ($self, $stmt, $startrow, $endrow, $attr, @bind) = @_;
    my $dbh = $self->{dbh};
    return [] if (!$dbh);

    my $sth = (ref $stmt) ? $stmt : $dbh->prepare($stmt, $attr);
    if ($sth) {
        $sth->execute(@bind) || return;
        my $slice = $attr->{Slice}; # typically undef, else hash or array ref
        if (!$slice and $slice=$attr->{Columns}) {
            if (ref $slice eq 'ARRAY') { # map col idx to perl array idx
                $slice = [ @{$attr->{Columns}} ];       # take a copy
                for (@$slice) { $_-- }
            }
        }
        my $retval = $self->_fetchrange_arrayref($sth, $startrow, $endrow, $slice);
        &App::sub_exit($retval) if ($App::trace);
        return($retval);
    }
    else {
        &App::sub_exit() if ($App::trace);
        return();
    }
}

# modified from the DBD::_::st::fetchall_arrayref in DBI.pm
sub _fetchrange_arrayref {
    &App::sub_entry if ($App::trace);
    my ($self, $sth, $startrow, $endrow, $slice) = @_;
    $slice = [] if (! defined $slice);
    $startrow = 0 if (!defined $startrow);
    $endrow = 0 if (!defined $endrow);
    my $mode = ref $slice;
    my @rows;
    my $row;
    my ($rownum);
    if ($mode eq 'ARRAY') {
        # we copy the array here because fetch (currently) always
        # returns the same array ref. XXX
        if (@$slice) {
            $rownum = 0;
            while ($row = $sth->fetch) {
                $rownum++;
                last if ($endrow > 0 && $rownum > $endrow);
                push @rows, [ @{$row}[ @$slice] ] if ($rownum >= $startrow);
            }
            $sth->finish if ($endrow > 0 && $rownum > $endrow);
        }
        else {
            # return $sth->_fetchall_arrayref;
            $rownum = 0;
            while ($row = $sth->fetch) {
                $rownum++;
                last if ($endrow > 0 && $rownum > $endrow);
                push @rows, [ @$row ] if ($rownum >= $startrow);
            }
            $sth->finish if ($endrow > 0 && $rownum > $endrow);
        }
    }
    elsif ($mode eq 'HASH') {
        if (keys %$slice) {
            my @o_keys = keys %$slice;
            my @i_keys = map { lc } keys %$slice;
            $rownum = 0;
            while ($row = $sth->fetchrow_hashref('NAME_lc')) {
                my %hash;
                @hash{@o_keys} = @{$row}{@i_keys};
                $rownum++;
                last if ($endrow > 0 && $rownum > $endrow);
                push @rows, \%hash if ($rownum >= $startrow);
            }
            $sth->finish if ($endrow > 0 && $rownum > $endrow);
        }
        else {
            # XXX assumes new ref each fetchhash
            while ($row = $sth->fetchrow_hashref()) {
                $rownum++;
                last if ($endrow > 0 && $rownum > $endrow);
                push @rows, $row if ($rownum >= $startrow);
            }
            $sth->finish if ($endrow > 0 && $rownum > $endrow);
        }
    }
    else { Carp::croak("fetchall_arrayref($mode) invalid") }
    &App::sub_exit(\@rows) if ($App::trace);
    return \@rows;
}

######################################################################
# SQL CREATE METHODS (new methods not defined in App::Repository)
######################################################################

sub _mk_where_clause {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $params, $options) = @_;
    my ($where, $column, $param, $value, $colnum, $repop, $sqlop, $column_def, $quoted);
    my ($tabledef, $tabcols, $alias, $dbexpr);

    my $dbh = $self->{dbh};

    $tabledef = $self->{table}{$table};
    $alias    = $tabledef->{alias};
    $tabcols  = $tabledef->{column};
    my %sqlop = (
        "contains"     => "like",
        "matches"      => "like",
        "not_contains" => "not like",
        "not_matches"  => "not like",
        "eq"           => "=",
        "ne"           => "!=",
        "le"           => "<=",
        "lt"           => "<",
        "ge"           => ">=",
        "gt"           => ">",
        "in"           => "in",
        "not_in"       => "not in",
    );
    my %repop = (
        "=~" => "contains",
        "~"  => "contains",
        "!~" => "not_contains",
        "==" => "eq",
        "="  => "eq",
        "!"  => "ne",
        "!=" => "ne",
        "<=" => "le",
        "<"  => "lt",
        ">=" => "ge",
        ">"  => "gt",
        "=/" => "regexp",
        "/"  => "regexp",
        "!/" => "not_regexp",
    );

    $where = "";
    $params = {} if (!$params);
    my $param_order = $params->{"_order"};
    if (!defined $param_order && ref($params) eq "HASH") {
        $param_order = [ (keys %$params) ];
    }
    if (defined $param_order && $#$param_order > -1) {
        my ($include_null, $inferred_op, @where);
        for ($colnum = 0; $colnum <= $#$param_order; $colnum++) {
            $param = $param_order->[$colnum];
            $column = $param;
            $sqlop = "=";
            $repop = "";
            $inferred_op = 1;
            # check if $column contains an embedded operation, i.e. "name.eq", "name.contains"
            if ($param =~ /^(.*)\.([^.]+)$/) {
                $repop = $2;
                $inferred_op = 0;
                if ($sqlop{$repop}) {
                    $column = $1;
                    $sqlop = $sqlop{$repop};
                }
            }
            $value = $params->{$param};
            if (!$repop && $value && $value =~ s/^(=~|~|!~|==|=|!=|!|<=|<|>=|>)//) {
                $repop = $repop{$1};
                $sqlop = $sqlop{$repop};
                $inferred_op = 0 if ($1 eq "==");
            }
            if (!$repop && $value && $value =~ /[\*\?]/) {
                $repop = "matches";
                $sqlop = $sqlop{$repop};
            }

            if ($repop eq "verbatim") {
                push(@where, "$params->{$param}");
                next;
            }

            $column_def = $tabcols->{$column};

            if (!defined $column_def) {
                if ($param =~ /^begin_(.*)/) {
                    $column = $1;
                    $sqlop = ">=";
                    $inferred_op = 0;
                }
                elsif ($param =~ /^end_(.*)/) {
                    $column = $1;
                    $sqlop = "<=";
                    $inferred_op = 0;
                }
                $column_def = $tabcols->{$column};
            }

            next if (!defined $column_def);  # skip if the column is unknown

            if (! defined $value) {
                # $value = "?";   # TODO: make this work with the "contains/matches" operators
                if (!$sqlop || $sqlop eq "=") {
                    push(@where, "$column is null");
                }
                elsif ($sqlop eq "!=") {
                    push(@where, "$column is not null");
                }
            }
            else {
                next if ($inferred_op && $value eq "ALL");

                if (ref($value) eq "ARRAY") {
                    $value = join(",", @$value);
                }

                if ($value =~ s/^@\[(.*)\]$/$1/) {  # new @[] expressions replace !expr!
                    $quoted = 0;
                }
                elsif ($value =~ s/^@\{(.*)\}$/$1/) {  # replaced !expr!, but @{x} is interp'd by perl so deprecate!
                    $quoted = 0;
                }
                elsif ($value =~ s/^!expr!//) { # deprecated (ugh!)
                    $quoted = 0;
                }
                elsif ($value =~ /,/ && ! $tabledef->{param}{$param}{no_auto_in_param}) {
                    $quoted = (defined $column_def->{quoted}) ? ($column_def->{quoted}) : ($value !~ /^-?[0-9.,]+$/);
                }
                else {
                    $quoted = (defined $column_def->{quoted}) ? ($column_def->{quoted}) : ($value !~ /^-?[0-9.]+$/);
                }

                next if ($inferred_op && !$quoted && $value eq "");

                $include_null = 0;

                if ($repop eq "contains" || $repop eq "not_contains") {
                    $value = $dbh->quote("%" . $value . "%");
                }
                elsif ($repop eq "matches" || $repop eq "not_matches") {
                    $value = $dbh->quote($value);
                    $value =~ s/_/\\_/g;
                    $value =~ s/\*/%/g;
                    $value =~ s/\?/_/g;
                }
                elsif ($sqlop eq "in" || ($inferred_op && $sqlop eq "=")) {
                    if (! defined $value || $value eq "NULL") {
                        $sqlop = "is";
                        $value = "null";
                    }
                    else {
                        if ($value =~ s/NULL,//g || $value =~ s/,NULL//) {
                            $include_null = 1;
                        }
                        if ($quoted) {
                            $value = $dbh->quote($value);
                            if ($value =~ /,/ && ! $tabledef->{param}{$param}{no_auto_in_param}) {
                                $value =~ s/,/','/g;
                                $value = "($value)";
                                $sqlop = "in";
                            }
                            else {
                                $sqlop = "=";
                            }
                        }
                        else {
                            if ($value =~ /,/ && ! $tabledef->{param}{$param}{no_auto_in_param}) {
                                $value = "($value)";
                                $sqlop = "in";
                            }
                            else {
                                $sqlop = "=";
                            }
                        }
                    }
                }
                elsif ($sqlop eq "not in" || ($inferred_op && $sqlop eq "!=")) {
                    if (! defined $value || $value eq "NULL") {
                        $sqlop = "is not";
                        $value = "null";
                    }
                    else {
                        if ($value =~ s/NULL,//g || $value =~ s/,NULL//) {
                            $include_null = 1;
                        }
                        if ($quoted) {
                            $value = $dbh->quote($value);
                            if ($value =~ /,/ && ! $tabledef->{param}{$param}{no_auto_in_param}) {
                                $value =~ s/,/','/g;
                                $value = "($value)";
                                $sqlop = "not in";
                            }
                            else {
                                $sqlop = "!=";
                            }
                        }
                        else {
                            if ($value =~ /,/ && ! $tabledef->{param}{$param}{no_auto_in_param}) {
                                $value = "($value)";
                                $sqlop = "not in";
                            }
                            else {
                                $sqlop = "!=";
                            }
                        }
                    }
                }
                elsif ($quoted) {
                    $value = $dbh->quote($value);
                }
                $dbexpr = $column_def->{dbexpr};
                if ($dbexpr && $dbexpr ne "$alias.$column") {
                    $column = $dbexpr;
                    $column =~ s/$alias.//g;
                }
                if ($include_null) {
                    if ($sqlop eq "not in" || $sqlop eq "!=") {
                        push(@where, "($column $sqlop $value and $column is not null)");
                    }
                    else {
                        push(@where, "($column $sqlop $value or $column is null)");
                    }
                }
                else {
                    push(@where, "$column $sqlop $value");
                }
            }
        }
        if ($#where > -1) {
            $where = "where " . join("\n  and ", @where) . "\n";
        }
    }
    &App::sub_exit($where) if ($App::trace);
    $where;
}

sub _mk_select_sql {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $params, $cols, $options) = @_;
    $self->_load_table_metadata($table) if (!defined $self->{table}{$table}{loaded});

    $params = $self->_key_to_params($table,$params) if (!$params || !ref($params));  # $params is undef/scalar => $key
    $cols = [$cols] if (!ref($cols));
    $options = {} if (!$options);

    my ($sql, $order_by, $direction, $col, $colnum, $dir);
    $order_by = $options->{order_by} || $options->{ordercols} || [];  # {ordercols} is deprecated
    $order_by = [$order_by] if (!ref($order_by));
    $direction = $options->{direction} || $options->{directions};     # {directions} is deprecated
    my $modifier = $options->{distinct} ? " distinct" : "";

    $sql = "select$modifier\n   " . join(",\n   ", @$cols) . "\nfrom $table\n";
    $sql .= $self->_mk_where_clause($table, $params);

    if (defined $order_by && $#$order_by > -1) {
        for ($colnum = 0; $colnum <= $#$order_by; $colnum++) {
            $col = $order_by->[$colnum];
            if ($col =~ /^(.+)\.asc$/) {
                $col = $1;
                $dir = " asc";
            }
            elsif ($col =~ /^(.+)\.desc$/) {
                $col = $1;
                $dir = " desc";
            }
            else {
                $dir = "";
                if ($direction && ref($direction) eq "HASH" && defined $direction->{$col}) {
                    if ($direction->{$col} =~ /^[au]/i) {  # asc, up, etc.
                        $dir = " asc";
                    }
                    elsif ($direction->{$col} =~ /^d/i) {
                        $dir = " desc";
                    }
                }
            }
            $sql .= ($colnum == 0) ? "order by\n   $col$dir" : ",\n   $col$dir";
        }
        $sql .= "\n";
    }
    my $suffix = $self->_mk_select_sql_suffix($table, $options);
    $sql .= $suffix if ($suffix);
    
    &App::sub_exit($sql) if ($App::trace);
    return($sql);
}

sub _mk_select_joined_sql {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $params, $cols, $options) = @_;

    $self->_load_table_metadata($table) if (!defined $self->{table}{$table}{loaded});
    my $dbh = $self->{dbh};

    my $table_def = $self->get_table_def($table, $options);
    die "Table $table not defined" if (!$table_def);

    if (!defined $params || $params eq "") {
        $params = {};
    }
    elsif (!ref($params)) {
        $params = $self->_key_to_params($table,$params);  # $params is undef/scalar => $key
    }
    $cols = [$cols] if (!ref($cols));
    $options = {} if (!$options);

    my ($order_by, $direction, $param_order, $param, $col, $dir);
    $order_by = $options->{order_by} || $options->{ordercols} || [];  # {ordercols} is deprecated
    $order_by = [$order_by] if (!ref($order_by));
    $direction = $options->{direction} || $options->{directions};     # {directions} is deprecated
    my $modifier = $options->{distinct} ? " distinct" : "";

    my ($where_condition, @join_conditions, @criteria_conditions, $repop, $sqlop, $value);

    # ADD ANY DEFAULT PARAMS
    my $paramdefs = $table_def->{param};
    my $params_modified = 0;
    if ($paramdefs) {
        # make a copy.
        # This is necessary if {default} is supplied (see the few lines above)
        # or if the {method} (see way below) is called to modify the $params.
        my %params = %$params;
        $params = \%params;

        foreach $param (keys %$paramdefs) {
            if (! exists $params->{$param} && $paramdefs->{$param}{default}) {
                $params{$param} = $paramdefs->{$param}{default};
                $params_modified = 1;
            }
            elsif (exists $params->{$param} && $paramdefs->{$param}{method}) {
                my $method = $paramdefs->{$param}{method};
                my $param_clause = $self->$method($params, $param, $table);
                if ($param_clause) {
                    push(@criteria_conditions, $param_clause);
                }
                $params_modified = 1;
            }
        }
    }

    # DETERMINE THE ORDER TO PROCESS THE PARAMS
    $param_order = $params->{"_order"};
    if (!defined $param_order) {
        $param_order = [ (keys %$params) ];
    }
    elsif ($params_modified) {
        # TODO: go into a merge between added params and predetermined ordered params
    }

    my $startrow    = $options->{startrow}    || 0;
    my $endrow      = $options->{endrow}      || 0;
    my $auto_extend = $options->{auto_extend} || 0;
    my $keycolidx   = $options->{keycolidx};
    my $writeref    = $options->{writeref};
    my $reptyperef  = $options->{reptyperef};
    my $group_by    = $options->{group_by} || $options->{summarykeys};

    my ($tablealiases, $tablealiashref);

    $tablealiases   = $table_def->{tablealiases};
    $tablealiashref = $table_def->{tablealias};

    ############################################################
    # Record indexes of all requested columns
    ############################################################
    my ($idx, $column, %columnidx, $column_def, @write, @reptype);

    for ($idx = 0; $idx <= $#$cols; $idx++) {
        $column = $cols->[$idx];
        if (! defined $columnidx{$column}) {
            $columnidx{$column} = $idx;
        }
        $write[$idx] = 1;            # assume every field writable
        $reptype[$idx] = "string";   # assume every field a string (most general type)
        $column_def = $table_def->{column}{$column};
        if ((!defined $column_def || !%$column_def) && $column =~ /[^_a-zA-Z0-9]/) {
            my $alias = $column;
            $alias =~ s/[^_a-zA-Z0-9.]+/_/g;
            $alias =~ s/^([0-9])/_$1/;
            $column_def = {
                dbexpr => $column,
                alias => $alias,
            };
            $table_def->{column}{$column} = $column_def;
        }
    }

    ############################################################
    # ensure that the primary key and sort keys are included
    ############################################################
    my ($dbexpr, $columnalias, $columntype, $colidx, $quoted);
    my (%dbexpr, @select_phrase, $group_reqd, @group_dbexpr, %reqd_tables);
    my (@keycolidx, $primary_key, $primary_table);
    my ($is_summary, %is_summary_key, $summaryexpr, @group_summarykeys);

    $is_summary = (defined $group_by && $#$group_by >= 0);

    $primary_table = "";
    if ($is_summary) {
        foreach $column (@$group_by) {         # primary key is list of summary keys
            $is_summary_key{$column} = 1;
            $colidx = $columnidx{$column};
            if (! defined $colidx && $auto_extend) {
                push(@$cols, $column);            # add the column to the list
                $colidx = $#$cols;
                $columnidx{$column} = $colidx;
            }
            if (defined $colidx) {
                push(@keycolidx, $colidx);
                $write[$colidx] = 0;              # keys aren't editable
            }
        }
    }
    else {  # non-summary (detail) table rows
        $primary_key = $table_def->{primary_key}; # primary key is in the metadata
        if ($primary_key) {
            $primary_key = [$primary_key] if (!ref($primary_key));
            foreach $column (@$primary_key) {
                $colidx = $columnidx{$column};
                if (! defined $colidx && $auto_extend) {
                    push(@$cols, $column);         # add the column to the list
                    $colidx = $#$cols;
                    $columnidx{$column} = $colidx;
                }
                if (defined $colidx) {
                    push(@keycolidx, $colidx);
                    $write[$colidx] = 0;        # keys aren't editable
                }
    
                $dbexpr = $table_def->{column}{$column}{dbexpr}; # take note of table the key is on
                if ($dbexpr && $dbexpr =~ /^([a-zA-Z][a-zA-Z0-9_]*)\.[a-zA-Z_][a-zA-Z_0-9]*$/) {
                    $primary_table = $1;
                }
            }
        }
    }

    if ($auto_extend) {
        if (defined $order_by && ref($order_by) eq "ARRAY") {
            foreach $column (@$order_by) {          # foreach sort key
                if ($column && ! defined $columnidx{$column} && $auto_extend) {
                    push(@$cols, $column);     # add the column to the list
                    $columnidx{$column} = $#$cols;
                }
            }
        }
    }

    for ($idx = 0; $idx <= $#$cols; $idx++) {
        $column = $cols->[$idx];
        $column_def = $table_def->{column}{$column};
        if (!defined $column_def) {
            push(@select_phrase, "NULL u$idx");
            next;
        }

        $columnalias   = $column_def->{alias};
        $dbexpr        = $column_def->{dbexpr};
        $reptype[$idx] = $column_def->{type};

        # if the field is not defined, or it is not a simple field on the primary table...
        if (!defined $dbexpr || $dbexpr !~ /^$primary_table\.[a-zA-Z_][a-zA-Z0-9_]*$/) {
            $write[$idx] = 0;    # consider it *not* writable
        }

        ############################################################
        # allow param substitutions in dbexpr
        ############################################################
        if ($dbexpr =~ /{/) {
            $dbexpr = $self->substitute($dbexpr, $params);
        }

        ############################################################
        # accumulate select expressions and their aliases
        ############################################################
        if ($is_summary) {
            if ($is_summary_key{$column}) {
                if (defined $dbexpr) {
                    push(@select_phrase, "$dbexpr $columnalias");
                    push(@group_summarykeys, $columnalias);
                }
            }
            else {
                $summaryexpr = $column_def->{summary};
                if (!defined $summaryexpr || $summaryexpr eq "") {
                    if ($dbexpr && $dbexpr =~ /\b(count|sum|avg|min|max)\(/) {
                        $summaryexpr = $dbexpr;
                    }
                    else {
                        $columntype = $column_def->{type};
                        if ($columntype eq "integer" || $columntype eq "number") {
                            $summaryexpr = "avg(\$)";
                        }
                        else {
                            $summaryexpr = "count(distinct(\$))";
                        }
                    }
                }
                if (defined $dbexpr) {
                    $summaryexpr =~ s#\$#$dbexpr#g;   # substitute the dbexpr into the summaryexpr
                }
                else {
                    $summaryexpr = "NULL";
                }
                push(@select_phrase, "$summaryexpr $columnalias") if ($summaryexpr);
            }
        }
        else {
            push(@select_phrase, (defined $dbexpr) ? "$dbexpr $columnalias" : "NULL $columnalias");
        }

        ############################################################
        # get the expression from the config info
        ############################################################

        if (!defined $dbexpr || $dbexpr eq "") {
            $dbexpr{$column} = "NULL";
        }
        else {
            ############################################################
            # save selected columns for reference
            ############################################################
            $dbexpr{$column} = $dbexpr;

            ############################################################
            # accumulate group-by columns and whether grouping reqd
            ############################################################
            if (($dbexpr =~ /sum *\(/i) ||
                ($dbexpr =~ /min *\(/i) ||
                ($dbexpr =~ /max *\(/i) ||
                ($dbexpr =~ /avg *\(/i) ||
                ($dbexpr =~ /std *\(/i) ||
                ($dbexpr =~ /stddev *\(/i) || # Oracle extension (supported by MySQL)
                ($dbexpr =~ /count *\(/i)) {
                $group_reqd = 1;
            }
            else {
                push(@group_dbexpr, $columnalias);
            }

            ############################################################
            # For each table, mentioned in the select expression...
            ############################################################
            $self->_require_tables($dbexpr, \%reqd_tables, $tablealiashref, 1);
        }
    }

    ############################################################
    # copy data out if a reference is given
    ############################################################
    if (defined $keycolidx && ref($keycolidx) eq "ARRAY") {
        @$keycolidx = @keycolidx;
    }
    if (defined $writeref && ref($writeref) eq "ARRAY") {
        @$writeref = @write;
    }
    if (defined $reptyperef && ref($reptyperef) eq "ARRAY") {
        @$reptyperef = @reptype;
    }

    ############################################################
    # create order-by columns
    ############################################################
    my (@order_by_dbexpr, $order_by_dbexpr);
    if (defined $order_by && ref($order_by) eq "ARRAY") {
        my ($dir);

        for ($idx = 0; $idx <= $#$order_by; $idx++) {
            $column = $order_by->[$idx];
            $dir = "";
            if ($column =~ /^(.+)\.asc$/) {
                $column = $1;
                $dir = " asc";
            }
            elsif ($column =~ /^(.+)\.desc$/) {
                $column = $1;
                $dir = " desc";
            }
            $column_def = $table_def->{column}{$column};
            next if (!defined $column_def);

            $order_by_dbexpr = $dbexpr{$column};
            if (!$order_by_dbexpr) {
                $order_by_dbexpr = $column_def->{dbexpr};
                $dbexpr{$column} = $order_by_dbexpr;
                $self->_require_tables($order_by_dbexpr, \%reqd_tables, $tablealiashref, 1);
            }

            $columnalias = $column_def->{alias};
            if (defined $columnidx{$column} && $columnalias) {
                $order_by_dbexpr = $columnalias;
            }

            if ($order_by_dbexpr) {
                if ($dir) {
                    $order_by_dbexpr .= $dir;
                }
                else {
                    if ($direction && ref($direction) eq "HASH" && defined $direction->{$column}) {
                        if ($direction->{$column} =~ /^[au]/i) {
                            $order_by_dbexpr .= " asc";
                        }
                        elsif ($direction->{$column} =~ /^d/i) {
                            $order_by_dbexpr .= " desc";
                        }
                    }
                }
                push(@order_by_dbexpr, $order_by_dbexpr);
            }
        }
    }

    ############################################################
    # create initial where conditions for the selected rows
    ############################################################

    #print $App::DEBUG_FILE $self->{context}->dump(), "\n";

    my %sqlop = (
        "contains"     => "like",
        "matches"      => "like",
        "not_contains" => "not like",
        "not_matches"  => "not like",
        "eq"           => "=",
        "ne"           => "!=",
        "le"           => "<=",
        "lt"           => "<",
        "ge"           => ">=",
        "gt"           => ">",
        "in"           => "in",
        "not_in"       => "not in",
    );
    my %repop = (
        "=~" => "contains",
        "~"  => "contains",
        "!~" => "not_contains",
        "==" => "eq",
        "="  => "eq",
        "!"  => "ne",
        "!=" => "ne",
        "<=" => "le",
        "<"  => "lt",
        ">=" => "ge",
        ">"  => "gt",
    );

    my ($include_null, $inferred_op);
    for ($idx = 0; $idx <= $#$param_order; $idx++) {

        $param = $param_order->[$idx];
        next if (!defined $param || $param eq "");

        $column = $param;

        #if ($param eq "_key") {
        #    # o TODO: enable multi-field primary keys (this assumes one-field only)
        #    # o TODO: enable non-integer primary key fields (this assumes integer, no quotes)
        #    $column = $table_def->{primary_key};  # assumes one column primary key
        #    $dbexpr = $table_def->{column}{$column}{dbexpr};
        #    if ($value =~ /,/) {
        #        $where_condition = "$dbexpr in ($value)";  # assumes one column, non-quoted primary key
        #    }
        #    else {
        #        $where_condition = "$dbexpr = $value";     # assumes one column, non-quoted primary key
        #    }
        #    push(@criteria_conditions, $where_condition);
        #    next;
        #}

        $sqlop = "=";
        $repop = "";
        $inferred_op = 1;
        $value = $params->{$param};
        # check if $param contains an embedded operation, i.e. "name.eq", "name.contains"
        if ($param =~ /^(.*)\.([^.]+)$/) {
            $repop = $2;
            $inferred_op = 0;
            if ($sqlop{$repop}) {
                $column = $1;
                $sqlop = $sqlop{$repop};
            }
        }
        if (!$repop && $value && $value =~ s/^(=~|~|!~|==|=|!=|!|<=|<|>=|>)//) {
            $repop = $repop{$1};
            $sqlop = $sqlop{$repop};
            $inferred_op = 0 if ($1 eq "==");
        }
        if (!$repop && $value && $value =~ /[\*\?]/) {
            $repop = "matches";
            $sqlop = $sqlop{$repop};
        }

        if ($repop eq "verbatim") {
            push(@criteria_conditions, $params->{$param});
            next;
        }

        $column_def = $table_def->{column}{$column};

        if (!defined $column_def) {
            if ($param =~ /^begin_(.*)/) {
                $column = $1;
                $sqlop = ">=";
                $inferred_op = 0;
            }
            elsif ($param =~ /^end_(.*)/) {
                $column = $1;
                $sqlop = "<=";
                $inferred_op = 0;
            }
            $column_def = $table_def->{column}{$column};
        }

        if (defined $column_def) {  # skip if the column is unknown
            $include_null = 0;

            if (! defined $value) {
                # $value = "?";   # TODO: make this work with the "contains/matches" operators
                if (!$sqlop || $sqlop eq "=") {
                    $sqlop = "is";
                }
                elsif ($sqlop eq "!=") {
                    $sqlop = "is not";
                }
                else {
                    next;
                }
                $value = "null";
            }
            else {
                next if (defined $table_def->{param}{$param}{all_value} &&
                         $value eq $table_def->{param}{$param}{all_value});

                next if ($inferred_op && $value eq "ALL");

                if (ref($value) eq "ARRAY") {
                    $value = join(",", @$value);
                }

                if ($value =~ s/^@\[(.*)\]$/$1/) {  # new @[] expressions replace !expr!
                    $quoted = 0;
                }
                elsif ($value =~ s/^@\{(.*)\}$/$1/) {  # new @{} don't work.. perl interpolates... deprecate.
                    $quoted = 0;
                }
                elsif ($value =~ s/^!expr!//) { # deprecated (ugh!)
                    $quoted = 0;
                }
                elsif ($value =~ /,/ && ! $table_def->{param}{$param}{no_auto_in_param}) {
                    $quoted = (defined $column_def->{quoted}) ? ($column_def->{quoted}) : ($value !~ /^-?[0-9.,]+$/);
                }
                else {
                    $quoted = (defined $column_def->{quoted}) ? ($column_def->{quoted}) : ($value !~ /^-?[0-9.]+$/);
                }

                next if ($inferred_op && !$quoted && $value eq "");

                if ($repop eq "contains" || $repop eq "not_contains") {
                    $value = $dbh->quote("%" . $value . "%");
                }
                elsif ($repop eq "matches" || $repop eq "not_matches") {
                    $value = $dbh->quote($value);
                    $value =~ s/_/\\_/g;
                    $value =~ s/\*/%/g;
                    $value =~ s/\?/_/g;
                }
                elsif ($sqlop eq "in" || ($inferred_op && $sqlop eq "=")) {

                    if (! defined $value || $value eq "NULL") {
                        $sqlop = "is";
                        $value = "null";
                    }
                    else {
                        if ($value =~ s/NULL,//g || $value =~ s/,NULL//) {
                            $include_null = 1;
                        }
                        if ($quoted) {
                            $value = $dbh->quote($value);
                            if ($value =~ /,/ && ! $table_def->{param}{$param}{no_auto_in_param}) {
                                $value =~ s/,/','/g;
                                $value = "($value)";
                                $sqlop = "in";
                            }
                            else {
                                $sqlop = "=";
                            }
                        }
                        else {
                            if ($value =~ /,/ && ! $table_def->{param}{$param}{no_auto_in_param}) {
                                $value = "($value)";
                                $sqlop = "in";
                            }
                            else {
                                $sqlop = "=";
                            }
                        }
                    }
                }
                elsif ($sqlop eq "not in" || ($inferred_op && $sqlop eq "!=")) {

                    if (! defined $value || $value eq "NULL") {
                        $sqlop = "is not";
                        $value = "null";
                    }
                    else {
                        if ($value =~ s/NULL,//g || $value =~ s/,NULL//) {
                            $include_null = 1;
                        }
                        if ($quoted) {
                            $value = $dbh->quote($value);
                            if ($value =~ /,/ && ! $table_def->{param}{$param}{no_auto_in_param}) {
                                $value =~ s/,/','/g;
                                $value = "($value)";
                                $sqlop = "not in";
                            }
                            else {
                                $sqlop = "!=";
                            }
                        }
                        else {
                            if ($value =~ /,/ && ! $table_def->{param}{$param}{no_auto_in_param}) {
                                $value = "($value)";
                                $sqlop = "not in";
                            }
                            else {
                                $sqlop = "!=";
                            }
                        }
                    }
                }
                elsif ($quoted) {
                    $value = $dbh->quote($value);
                }
            }

            $dbexpr = $column_def->{dbexpr};
            if (defined $dbexpr && $dbexpr ne "") {
                $self->_require_tables($dbexpr, \%reqd_tables, $tablealiashref, 2);
                if ($include_null) {
                    if ($sqlop eq "not in" || $sqlop eq "!=") {
                        push(@criteria_conditions, "($dbexpr $sqlop $value and $dbexpr is not null)");
                    }
                    else {
                        push(@criteria_conditions, "($dbexpr $sqlop $value or $dbexpr is null)");
                    }
                }
                else {
                    push(@criteria_conditions, "$dbexpr $sqlop $value");
                }
            }
        }
        elsif ($paramdefs && $paramdefs->{$param}) {
            if ($paramdefs->{$param}{criteria}) {
                push(@criteria_conditions, $self->substitute($paramdefs->{$param}{criteria}, $params));
            }
        }
        else {
            # skip. not a known column or a known param of any other type.
        }
    }

#    THIS IS DEAD CODE.
#    I NEED TO FIGURE OUT WHAT IT USED TO DO SO I CAN FIGURE WHETHER I NEED
#    TO REWRITE IT AND REINSTATE IT IN THE CURRENT CODE BASE.
#    {
#        my ($paramsql_alias_table, %param_used, @params_to_be_used);
#        my ($cond1, $cond2, $expr, $p1, $p2, $p1val, $p2val, @pval);
#        my ($crit_lines);
#
#        $paramsql_alias_table = $self->{table}{aliases}{$table}{parametersql};
#        $paramsql_alias_table = $table if (!$dep_alias_table);
#        $crit_lines = $self->{table}{criterialines}{$dep_alias_table}
#
#        CRIT: foreach $expr (@crit_lines) {
#            @params_to_be_used = ();
#            if ($expr =~ /^ *([^ ].*[^ ]) *\? *([^ ].*[^ ]) *$/) {
#                $cond1 = $1;
#                $expr = $2;
#
#                if ($cond1 =~ /^#(.+)/) {
#                    $p = $1;
#                    @pval = $query->param($p);
#                    next if ($#pval <= 0);
#                }
#                elsif ($cond1 =~ /^([a-zA-Z0-9]+) *== *\*([a-zA-Z0-9]+) *$/) {
#                    $p1 = $1;
#                    $p2 = $2;
#                    next CRIT if (defined $param_used{$p1} || defined $param_used{$p2});
#                    $p1val = $query->param($p1);
#                    $p2val = $query->param($p2);
#                    next CRIT if (!defined $p1val || !defined $p2val || $p1val ne $p2val);
#                    push(@params_to_be_used, $p2);
#                }
#            }
#
#            $cond2 = $expr;
#            while ($cond2 =~ s/{([a-zA-Z0-9]+)}//) {
#                $p = $1;
#                @pval = $query->param($p);
#                next CRIT if (!defined @pval || $#pval < 0 || $pval[0] eq "");
#                next CRIT if (defined $param_used{$p});
#                push(@params_to_be_used, $p);
#                if ($expr =~ /'{$p}'/) {
#                    $p1val = "'" . join("','",@pval) . "'";
#                    $expr =~ s/'{$p}'/$p1val/;
#                }
#                else {
#                    $p1val = join(",",@pval);
#                    $expr =~ s/{$p}/$p1val/;
#                }
#            }
#            foreach (@params_to_be_used) {
#                $param_used{$_} = 1;
#            }
#            push(@criteria_conditions, $expr);
#            $self->_require_tables($expr, \%reqd_tables, $table_aliases, 2);
#        }
#    }

    ############################################################
    # put tables in table list in the standard order
    # and build the join criteria
    ############################################################

    my ($dbtable, $tablealias, @from_tables, $tableref);
    my (@outer_join_clauses);

    foreach $tablealias (@$tablealiases) {
        #print $App::DEBUG_FILE "checking table $tablealias\n";
        if ($reqd_tables{$tablealias}) {
            $dbtable = $tablealiashref->{$tablealias}{table};
            $tableref = ($dbtable) ? "$dbtable $tablealias" : $tablealias;
            $where_condition = $tablealiashref->{$tablealias}{joincriteria};
            if ($where_condition =~ /\{.*\}/) {
                $where_condition = $self->substitute($where_condition);
            }
            if ($tablealiashref->{$tablealias}{cardinality_zero}) {
                push(@outer_join_clauses, "left join $tableref on $where_condition") if ($where_condition);
                #print $App::DEBUG_FILE "   $tablealias is [$dbtable] as [$tableref] where [$where_condition] (outer)\n";
            }
            else {
                push(@join_conditions, split(/ +and +/,$where_condition)) if ($where_condition);
                if ($tablealiashref->{$tablealias}{hint}) {
                    $tableref .= " $tablealiashref->{$tablealias}{hint}";
                }
                push(@from_tables, $tableref);
                #print $App::DEBUG_FILE "   $tablealias is [$dbtable] as [$tableref] where [$where_condition]\n";
            }
        }
    }
    if ($#from_tables == -1 && $#$tablealiases > -1) {
        $tablealias = $tablealiases->[0];
        $table = $tablealiashref->{$tablealias}{table};
        $tableref = ($table) ? "$table $tablealias" : $tablealias;
        if ($tablealiashref->{$tablealias}{hint}) {
            $tableref .= " $tablealiashref->{$tablealias}{hint}";
        }
        push(@from_tables, $tableref);
    }

    ############################################################
    # create the SQL statement
    ############################################################

    my ($sql, $conjunction);

    if ($#select_phrase >= 0) {
        $sql = "select$modifier\n   " .
                        join(",\n   ",@select_phrase) . "\n" .
                 "from\n   " .
                        join(",\n   ",@from_tables) . "\n";
    }

    if ($#outer_join_clauses >= 0) {
        $sql .= join("\n",@outer_join_clauses) . "\n";
    }

    if ($#join_conditions >= 0) {
        $sql .= "where " . join("\n  and ",@join_conditions) . "\n";
    }
    $conjunction = "AND";
    $conjunction = $params->{"_conjunction"} if (defined $params);
    $conjunction = "AND" if (!defined $conjunction);
    $conjunction = uc($conjunction);
    if ($#criteria_conditions >= 0) {
        $sql .= ($#join_conditions == -1 ? "where " : "  and ");
        if ($conjunction eq "NOT_AND") {
            $sql .= "not (" . join("\n  and ",@criteria_conditions) . ")\n";
        }
        elsif ($conjunction eq "NOT_OR") {
            $sql .= "not (" . join("\n  or ",@criteria_conditions) . ")\n";
        }
        elsif ($conjunction eq "OR") {
            $sql .= "(" . join("\n  or ",@criteria_conditions) . ")\n";
        }
        else {
            $sql .= join("\n  and ",@criteria_conditions) . "\n";
        }
    }
    if ($#group_summarykeys >= 0) {
        $sql .= "group by\n   " . join(",\n   ",@group_summarykeys) . "\n";
    }
    elsif ($group_reqd && $#group_dbexpr >= 0) {
        $sql .= "group by\n   " . join(",\n   ",@group_dbexpr) . "\n";
    }
    if ($#order_by_dbexpr >= 0) {
        $sql .= "order by\n   " . join(",\n   ",@order_by_dbexpr) . "\n";
    }

    my $suffix = $self->_mk_select_sql_suffix($table, $options);
    $sql .= $suffix if ($suffix);

    ############################################################
    # return the SQL statement
    ############################################################
    &App::sub_exit($sql) if ($App::trace);
    return($sql);
}

sub _mk_select_sql_suffix {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $options) = @_;
    &App::sub_exit("") if ($App::trace);
    return("");
}

sub _require_tables {
    &App::sub_entry if ($App::trace >= 3);
    my ($self, $dbexpr, $reqd_tables, $relationship_defs, $require_type) = @_;
    #print $App::DEBUG_FILE "_require_tables($dbexpr,...,...,$require_type)\n";
    my ($relationship, $relationship2, @relationship, %tableseen, $dependencies);
    while ($dbexpr =~ s/([a-zA-Z_][a-zA-Z_0-9]*)\.[a-zA-Z_][a-zA-Z_0-9]*//) {
        if (defined $relationship_defs->{$1} && !$tableseen{$1}) {
            push(@relationship, $1);
            $tableseen{$1} = 1;
        }
        while ($relationship = pop(@relationship)) {
            if (! defined $reqd_tables->{$relationship}) {
                $reqd_tables->{$relationship} = $require_type;
                #print $App::DEBUG_FILE "table required: $relationship => $require_type\n";
                $dependencies = $relationship_defs->{$relationship}{dependencies};
                push(@relationship, @$dependencies)
                   if (defined $dependencies && ref($dependencies) eq "ARRAY");
            }
            elsif ($reqd_tables->{$relationship} < $require_type) {
                $reqd_tables->{$relationship} = $require_type;
                #print $App::DEBUG_FILE "table required: $relationship => $require_type\n";
            }
        }
    }
    &App::sub_exit() if ($App::trace >= 3);
}

# $insert_sql = $rep->_mk_insert_row_sql ($table, \@cols, \@row);
sub _mk_insert_row_sql {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $cols, $row) = @_;

    $self->_load_table_metadata($table) if (!defined $self->{table}{$table}{loaded});
    my $dbh = $self->{dbh};

    my ($sql, $values, $col, $value, $colnum, $quoted);
    #print $App::DEBUG_FILE "_mk_insert_row_sql($table,\n   [",
    #    join(",",@$cols), "],\n   [",
    #    join(",",@$row), "])\n";

    if ($#$cols == -1) {
        $self->{error} = "Database->_mk_insert_row_sql(): no columns specified";
        return();
    }
    my $tabcols = $self->{table}{$table}{column};

    $sql = "insert into $table\n";
    $values = "values\n";
    for ($colnum = 0; $colnum <= $#$cols; $colnum++) {
        $col = $cols->[$colnum];
        if (!defined $row || $#$row == -1) {
            $value = "?";
        }
        else {
            $value = $row->[$colnum];
            if (!defined $value) {
                $value = "NULL";
            }
            else {
                $quoted = (defined $tabcols->{$col}{quoted}) ? ($tabcols->{$col}{quoted}) : ($value !~ /^-?[0-9.]+$/);
                if ($quoted) {
                    $value = $dbh->quote($value);
                }
            }
        }
        $sql .= ($colnum == 0) ? "  ($col" : ",\n   $col";
        if ($tabcols->{$col}{dbexpr_update}) {
            $value = sprintf($tabcols->{$col}{dbexpr_update}, $value);
        }
        $values .= ($colnum == 0) ? "  ($value" : ",\n   $value";
    }
    $sql .= ")\n";
    $values .= ")\n";
    $sql .= $values;
    &App::sub_exit($sql) if ($App::trace);
    $sql;
}

# $insert_sql = $rep->_mk_insert_sql ($table, \@cols, \@row, \%options);
sub _mk_insert_sql {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $cols, $row, $options) = @_;

    $self->_load_table_metadata($table) if (!defined $self->{table}{$table}{loaded});
    my $dbh = $self->{dbh};

    if (!ref($cols)) {
        $cols = [ $cols ];
        $row  = [ $row ];
    }
    elsif ($#$cols == -1) {
        die "Database->_mk_insert_sql(): no columns specified";
    }

    my ($col, $value, $colidx, $quoted);
    my $tabcols = $self->{table}{$table}{column};
    my $by_expression = ($options && $options->{by_expression}) ? 1 : 0;
    my @values = ();

    for ($colidx = 0; $colidx <= $#$cols; $colidx++) {
        $col = $cols->[$colidx];
        if (!defined $row || $#$row == -1) {
            push(@values, "?");
        }
        else {
            $value = $row->[$colidx];
            if (!defined $value) {
                push(@values, "NULL");
            }
            elsif ($by_expression) {
                push(@values, $value);
            }
            else {
                $quoted = (defined $tabcols->{$col}{quoted}) ? ($tabcols->{$col}{quoted}) : ($value !~ /^-?[0-9.]+$/);
                if ($quoted) {
                    $value = $dbh->quote($value);
                }
                if ($tabcols->{$col}{dbexpr_update}) {
                    $value = sprintf($tabcols->{$col}{dbexpr_update}, $value);
                }
                push(@values, $value);
            }
        }
    }
    my $sql = "insert into $table\n  (" . join(",\n   ",@$cols) . ")\nvalues\n  (" . join(@values) . ")\n";
    &App::sub_exit($sql) if ($App::trace);
    $sql;
}

# $update_sql = $rep->_mk_update_sql($table, \%params,    \@cols, \@row, \%options);
# $update_sql = $rep->_mk_update_sql($table, \@keycolidx, \@cols, \@row, \%options);
# $update_sql = $rep->_mk_update_sql($table, \@paramcols, \@cols, \@row, \%options);
# $update_sql = $rep->_mk_update_sql($table, $key,        \@cols, \@row, \%options);
# $update_sql = $rep->_mk_update_sql($table, undef,       \@cols, \@row, \%options);
sub _mk_update_sql {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $params, $cols, $row, $options) = @_;
    die "Database->_mk_update_sql(): no columns specified" if (!$cols || $#$cols == -1);

    $self->_load_table_metadata($table) if (!defined $self->{table}{$table}{loaded});
    my $dbh = $self->{dbh};

    my $tabcols = $self->{table}{$table}{column};
    my $by_expression = $options->{by_expression};

    my (@noupdate, %noupdate, @set, $where);

    $noupdate[$#$cols] = 0;   # pre-extend the array
    %noupdate = ();
    $where = "";

    my ($colidx, $col, $value, $quoted);
    if (!defined $params) {
        $params = $self->{table}{$table}{primary_key};
        die "_mk_update_sql() can't update with undef params because {table}{$table}{primary_key} not defined"
            if (!defined $params);
        $params = [ $params ] if (!ref($params));
    }

    if (!ref($params)) {  # update by key!
        $value = $params;
        $col = $self->{table}{$table}{primary_key};
        die "_mk_update_sql() can't update with key because {table}{$table}{primary_key} not defined"
            if (!defined $col);
        if (!ref($col)) {
            $quoted = (defined $tabcols->{$col}{quoted}) ? ($tabcols->{$col}{quoted}) : ($value !~ /^-?[0-9.]+$/);
            if ($quoted && !$by_expression) {
                $value = $dbh->quote($value);
            }
            $where = "where $col = $value\n";
            $noupdate{$col} = 1;
        }
        else {
            $params = $col;   # it wasn't a column, but an array of them
            my @where = ();
            my @values = $self->_key_to_values($value);
            for ($colidx = 0; $colidx <= $#$params; $colidx++) {
                $col = $params->[$colidx];
                $value = $values[$colidx];
                $quoted = (defined $tabcols->{$col}{quoted}) ? ($tabcols->{$col}{quoted}) : ($value !~ /^-?[0-9.]+$/);
                if ($quoted && !$by_expression) {
                    $value = $dbh->quote($value);
                }
                push(@where, "$col = $value");
                $noupdate{$col} = 1;
            }
            $where = "where " . join("\n  and ",@where) . "\n";
        }
    }
    elsif (ref($params) eq "HASH") {
        $where = $self->_mk_where_clause($table, $params);
    }
    elsif (ref($params) eq "ARRAY") {
        die "_mk_update_sql() can't update with no indexes/columns in params" if ($#$params == -1);
        my @where = ();
        if ($params->[0] =~ /^[0-9]+$/) {  # an array of indexes
            my $keycolidx = $params;  # @$params represents a set of array indices
            for (my $i = 0; $i <= $#$keycolidx; $i++) {
                $colidx = $keycolidx->[$i];
                $col = $cols->[$colidx];
                if (!defined $row || $#$row == -1) {
                    $value = "?";
                }
                else {
                    $value = $row->[$colidx];
                    if (!defined $value) {
                        $value = "NULL";
                    }
                    else {
                        $quoted = (defined $tabcols->{$col}{quoted})?($tabcols->{$col}{quoted}):($value !~ /^-?[0-9.]+$/);
                        if ($quoted) {
                            $value = $dbh->quote($value);
                        }
                    }
                }
                push(@where, "$col = $value");
                $noupdate[$colidx] = 1;
            }
        }
        else {   # an array of column names
        }
        $where = "where " . join("\n  and ",@where) . "\n" if ($#where > -1);
    }
    else {
        die "_mk_update_sql() unrecognized params type";
    }

    # Now determine what to "set"
    my $ref_row = ref($row);
    if (!$ref_row) {
        for ($colidx = 0; $colidx <= $#$cols; $colidx++) {
            next if ($noupdate[$colidx]);
            $col = $cols->[$colidx];
            next if ($noupdate{$col});
            push(@set, "$col = ?");
        }
    }
    else {
        my $is_array = ($ref_row eq "ARRAY");
        for ($colidx = 0; $colidx <= $#$cols; $colidx++) {
            next if ($noupdate[$colidx]);
            $col = $cols->[$colidx];
            next if ($noupdate{$col});
            $value = $is_array ? $row->[$colidx] : $row->{$col};
            if (!defined $value) {
                push(@set, "$col = NULL");
            }
            else {
                $quoted = (defined $tabcols->{$col}{quoted}) ? ($tabcols->{$col}{quoted}) : ($value !~ /^-?[0-9.]+$/);
                if ($quoted && !$by_expression) {
                    $value = $dbh->quote($value);
                }
                if ($tabcols->{$col}{dbexpr_update}) {
                    $value = sprintf($tabcols->{$col}{dbexpr_update}, $value, $value, $value, $value, $value);
                }
                push(@set, "$col = $value");
            }
        }
    }

    my $sql = "update $table set\n   " . join(",\n   ",@set) . "\n" . $where;
    &App::sub_exit($sql) if ($App::trace);
    $sql;
}

# $delete_sql = $rep->_mk_delete_sql($table, \%params,                   \%options);
# $delete_sql = $rep->_mk_delete_sql($table, \%params,    undef,  undef, \%options);
# $delete_sql = $rep->_mk_delete_sql($table, \@keycolidx, \@cols, \@row, \%options);
# $delete_sql = $rep->_mk_delete_sql($table, \@paramcols, \@cols, \@row, \%options);
# $delete_sql = $rep->_mk_delete_sql($table, $key,                       \%options);
# $delete_sql = $rep->_mk_delete_sql($table, $key,        undef,  undef, \%options);
# $delete_sql = $rep->_mk_delete_sql($table, undef,       \@cols, \@row, \%options);
sub _mk_delete_sql {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $params, $cols, $row, $options) = @_;

    $self->_load_table_metadata($table) if (!defined $self->{table}{$table}{loaded});
    my $dbh = $self->{dbh};

    my $tabcols = $self->{table}{$table}{column};
    my $by_expression = $options->{by_expression};

    my $where = "";

    my ($colidx, $col, $value, $quoted);
    if (!defined $params) {
        if ($cols && $row) {
            $params = $self->{table}{$table}{primary_key};
            die "_mk_delete_sql() can't delete with undef params because {table}{$table}{primary_key} not defined"
                if (!defined $params);
            $params = [ $params ] if (!ref($params));
        }
        else {
            $params = {};
        }
    }

    if (!ref($params)) {  # delete by key!
        $value = $params;
        $col = $self->{table}{$table}{primary_key};
        die "_mk_delete_sql() can't delete with key because {table}{$table}{primary_key} not defined"
            if (!defined $col);
        if (!ref($col)) {
            $quoted = (defined $tabcols->{$col}{quoted}) ? ($tabcols->{$col}{quoted}) : ($value !~ /^-?[0-9.]+$/);
            if ($quoted && !$by_expression) {
                $value = $dbh->quote($value);
            }
            $where = "where $col = $value\n";
        }
        else {
            $params = $col;   # it wasn't a column, but an array of them
            my @where = ();
            my @values = $self->_key_to_values($value);
            for ($colidx = 0; $colidx <= $#$params; $colidx++) {
                $col = $params->[$colidx];
                $value = $values[$colidx];
                $quoted = (defined $tabcols->{$col}{quoted}) ? ($tabcols->{$col}{quoted}) : ($value !~ /^-?[0-9.]+$/);
                if ($quoted && !$by_expression) {
                    $value = $dbh->quote($value);
                }
                push(@where, "$col = $value");
            }
            $where = "where " . join("\n  and ",@where) . "\n";
        }
    }
    elsif (ref($params) eq "HASH") {
        $where = $self->_mk_where_clause($table, $params);
    }
    elsif (ref($params) eq "ARRAY") {
        die "_mk_delete_sql() can't delete with no indexes/columns in params" if ($#$params == -1);
        my @where = ();
        if ($params->[0] =~ /^[0-9]+$/) {  # an array of indexes
            my $keycolidx = $params;  # @$params represents a set of array indices
            for (my $i = 0; $i <= $#$keycolidx; $i++) {
                $colidx = $keycolidx->[$i];
                $col = $cols->[$colidx];
                if (!defined $row || $#$row == -1) {
                    $value = "?";
                }
                else {
                    $value = $row->[$colidx];
                    if (!defined $value) {
                        $value = "NULL";
                    }
                    else {
                        $quoted = (defined $tabcols->{$col}{quoted})?($tabcols->{$col}{quoted}):($value !~ /^-?[0-9.]+$/);
                        if ($quoted) {
                            $value = $dbh->quote($value);
                        }
                    }
                }
                push(@where, "$col = $value");
            }
        }
        else {   # an array of column names
        }
        $where = "where " . join("\n  and ",@where) . "\n" if ($#where > -1);
    }
    else {
        die "_mk_delete_sql() unrecognized params type";
    }

    my $sql = "delete from $table\n$where";
    &App::sub_exit($sql) if ($App::trace);
    $sql;
}

# $delete_sql = $rep->_mk_delete_row_sql ($table, \@cols, \@row, \@keycolidx);
sub _mk_delete_row_sql {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $cols, $row, $keycolidx) = @_;

    $self->_load_table_metadata($table) if (!defined $self->{table}{$table}{loaded});
    my $dbh = $self->{dbh};

    my ($sql, $where, @colused, $col, $value, $colnum, $i, $nonkeycolnum, $quoted);
    if ($#$cols == -1) {
        $self->{error} = "Database->_mk_delete_row_sql(): no columns specified";
        return();
    }
    my $tabcols = $self->{table}{$table}{column};

    $colused[$#$cols] = 0;   # pre-extend the array

    $sql = "delete from $table\n";

    if (defined $keycolidx && $#$keycolidx > -1) {
        for ($i = 0; $i <= $#$keycolidx; $i++) {
            $colnum = $keycolidx->[$i];
            $col = $cols->[$colnum];
            if (!defined $row || $#$row == -1) {
                $value = "?";
            }
            else {
                $value = $row->[$colnum];
                if (!defined $value) {
                    $value = "NULL";
                }
                else {
                    $quoted = (defined $tabcols->{$col}{quoted}) ? ($tabcols->{$col}{quoted}) : ($value !~ /^-?[0-9.]+$/);
                    if ($quoted) {
                        $value = $dbh->quote($value);
                    }
                }
            }
            $where .= ($i == 0) ? "where $col = $value" : "\n  and $col = $value";
            $colused[$colnum] = 1;
        }
        $where .= "\n";
    }

    $sql .= $where;
    &App::sub_exit($sql) if ($App::trace);
    $sql;
}

# $delete_sql = $rep->_mk_delete_rows_sql($table, \@params, \%paramvalues);
sub _mk_delete_rows_sql {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $params, $paramvalues) = @_;
    $self->_load_table_metadata($table) if (!defined $self->{table}{$table}{loaded});
    my ($sql);

    $sql = "delete from $table\n";
    $sql .= $self->_mk_where_clause($table, $params);
    &App::sub_exit($sql) if ($App::trace);
    $sql;
}

######################################################################
# SIMPLE SQL OPERATIONS
######################################################################

# $row = $rep->select_row ($table, \@cols, \@params, \%paramvalues);

# this is a new version that uses bind variables instead of relying on my quoting rules
# unfortunately, it doesn't work yet

sub _select_row {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $cols, $params, $paramvalues) = @_;
    my ($dbh, $sql, $param, @params, %paramvalues, @paramvalues);

    $self->{error} = "";

    if (defined $params) {
        @params = @$params;
    }
    else {
        @params = (keys %$paramvalues);
    }
    foreach $param (@params) {
        push(@paramvalues, $paramvalues->{$param});
    }

    if ($self->{table}{$table}{rawaccess}) {
        $sql = $self->_mk_select_sql($table, $cols, \@params, \%paramvalues, undef, 1, 1);
    }
    else {
        $sql = $self->_mk_select_rows_sql($table, $cols, \@params, \%paramvalues, undef, 1, 1);
    }
    $self->{sql} = $sql;

    my $rows = $self->_selectrange_arrayref($sql, 1, 1, undef, @paramvalues);
    if (!$rows || $#$rows == -1) {
        &App::sub_exit([]) if ($App::trace);
        return [];
    }
    &App::sub_exit($rows->[0]) if ($App::trace);
    return ($rows->[0]);
}

# NOTE: everything after the first line is optional
# @rows = $rep->_select_rows($table, \@cols,
#               \@params, \%paramvalues, \@order_by,
#               $startrow, $endrow,
#               \@sortdircol, \@keycolidx, \@writeable, \@columntype, \@summarykeys);
# TODO: get the $startrow/$endrow working when one/both/neither work in the SQL portion
# TODO: rethink $startrow/$endrow vs. $numrows/$skiprows

# this is a new version that uses bind variables instead of relying on my quoting rules
# unfortunately, it doesn't work yet

sub _select_rows {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $cols, $params, $paramvalues, $order_by, $startrow, $endrow,
        $sortdircol, $keycolidx, $writeable, $columntype, $group_by) = @_;
    my ($sql, $param, @params, %paramvalues, @paramvalues);

    $self->{error} = "";

    if (defined $params) {
        @params = @$params;
    }
    else {
        @params = (keys %$paramvalues);
    }
    foreach $param (@params) {
        push(@paramvalues, $paramvalues->{$param});
    }

    if ($self->{table}{$table}{rawaccess}) {
        $sql = $self->_mk_select_sql($table, $cols, \@params, \%paramvalues, $order_by,
            $startrow, $endrow, $sortdircol, $keycolidx, $writeable, $columntype, $group_by);
    }
    else {
        $sql = $self->_mk_select_rows_sql($table, $cols, \@params, \%paramvalues, $order_by,
            $startrow, $endrow, $sortdircol, $keycolidx, $writeable, $columntype, $group_by);
    }
    $self->{sql} = $sql;
    my $retval = $self->_selectrange_arrayref($sql, $startrow, $endrow, undef, @paramvalues);
    &App::sub_exit($retval) if ($App::trace);
    $retval;
}

# $ok = $rep->_insert_row($table, \@cols, \@row);
sub _insert_row {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $cols, $row, $options) = @_;
    $self->{error} = "";
    my $sql = $self->_mk_insert_row_sql($table, $cols, undef, $options);
    $self->{sql} = $sql;
    my $dbh = $self->{dbh};
    my $retval = 0;

    my $context_options = $self->{context}{options};
    my $debug_sql = $context_options->{debug_sql};
    my ($timer, $elapsed_time);
    my $loglevel = 1;
    if ($debug_sql) {
        $timer = $self->_get_timer();
        print $App::DEBUG_FILE "DEBUG_SQL: insert()\n";
        print $App::DEBUG_FILE "DEBUG_SQL: bind vars [", join("|",map { defined $_ ? $_ : "undef" } @$row), "]\n";
        print $App::DEBUG_FILE $sql;
    }
    if ($context_options->{explain_sql}) {
        $self->explain_sql($sql);
    }
    if (defined $dbh) {
        eval {
            ### TODO: make this work with regex for retry
            $retval = $dbh->do($sql, undef, @$row);
            $retval = 0 if ($retval == 0); # turn "0E0" into plain old "0"
        };
        if ($@) {  # Log the error message with the SQL and rethrow the exception
            my $bind_values = join("|", map { defined $_ ? $_ : "undef" } @$row);
            $loglevel = 3 if ($@ =~ /duplicate/i);
            $self->{context}->log({level=>$loglevel}, "DBI Exception (fail) in _insert_row(): $@BIND VALUES: [$bind_values]\nSQL: $sql");
            die $@;
        }
    }
    if ($debug_sql) {
        $elapsed_time = $self->_read_timer($timer);
        print $App::DEBUG_FILE "DEBUG_SQL: retval [$retval] ($elapsed_time sec) $DBI::errstr\n";
        print $App::DEBUG_FILE "\n";
    }

    &App::sub_exit($retval) if ($App::trace);
    $retval;
}

# $nrows = $rep->_insert_rows ($table, \@cols, \@rows);
sub _insert_rows {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $cols, $rows, $options) = @_;
    $self->{error} = "";
    my ($sql, $retval);
   
    my $dbh = $self->{dbh};
    return 0 if (!defined $dbh);

    my $nrows = 0;
    my $ok = 1;
    my $context_options = $self->{context}{options};
    my $debug_sql = $context_options->{debug_sql};
    my $explain_sql = $context_options->{explain_sql};
    my ($timer, $elapsed_time);
    my $loglevel = 1;
    if ($debug_sql) {
        $timer = $self->_get_timer();
    }
    if (ref($rows) eq "ARRAY") {
        $sql = $self->_mk_insert_row_sql($table, $cols);
        foreach my $row (@$rows) {
            if ($debug_sql) {
                print $App::DEBUG_FILE "DEBUG_SQL: _insert_rows()\n";
                print $App::DEBUG_FILE "DEBUG_SQL: bind vars [", join("|",map { defined $_ ? $_ : "undef" } @$row), "]\n";
                print $App::DEBUG_FILE $sql;
            }
            if ($explain_sql) {
                $self->explain_sql($sql);
            }
            if (defined $dbh) {
                eval {
                    ### TODO: make this work with regex for retry
                    $retval = $dbh->do($sql, undef, @$row);
                    $retval = 0 if ($retval == 0); # turn "0E0" into plain old "0"
                };
                if ($@) {  # Log the error message with the SQL and rethrow the exception
                    $loglevel = ($@ =~ /duplicate/i) ? 3 : 1;
                    my $bind_values = join("|", map { defined $_ ? $_ : "undef" } @$row);
                    $self->{context}->log({level=>$loglevel}, "DBI Exception (fail) in _insert_rows() [ARRAY]: $@BIND VALUES: [$bind_values]\nSQL: $sql");
                    die $@;
                }
            }
            if ($debug_sql) {
                print $App::DEBUG_FILE "DEBUG_SQL: retval [$retval] $DBI::errstr\n";
                print $App::DEBUG_FILE "\n";
            }
    
            if ($retval) {
                $nrows ++;
            }
            else {
                $self->{numrows} = $nrows;
                $ok = 0;
                last;
            }
        }
    }
    else {
        my $fh = $rows;                # assume it is a file handle
        $rows = [];                    # we will be refilling this buffer
        my %options = ( %$options );   # make a copy so it can be modified
        $options->{maxrows} = 100;
        $sql = $self->_mk_insert_row_sql($table, $cols);
        while (1) {
            $rows = $self->_read_rows_from_file($fh, $cols, \%options);
            last if ($#$rows == -1);
            foreach my $row (@$rows) {
                if ($debug_sql) {
                    print $App::DEBUG_FILE "DEBUG_SQL: _insert_rows()\n";
                    print $App::DEBUG_FILE "DEBUG_SQL: bind vars [", join("|",map { defined $_ ? $_ : "undef" } @$row), "]\n";
                    print $App::DEBUG_FILE $sql;
                }
                if ($context_options->{explain_sql}) {
                    $self->explain_sql($sql);
                }
                if (defined $dbh) {
                    eval {
                        ### TODO: make this work with regex for retry
                        $retval = $dbh->do($sql, undef, @$row);
                        $retval = 0 if ($retval == 0); # turn "0E0" into plain old "0"
                    };
                    if ($@) {  # Log the error message with the SQL and rethrow the exception
                        $loglevel = ($@ =~ /duplicate/i) ? 3 : 1;
                        my $bind_values = join("|", map { defined $_ ? $_ : "undef" } @$row);
                        $self->{context}->log({level=>$loglevel}, "DBI Exception (fail) in _insert_rows() [FILE]: $@BIND VALUES: [$bind_values]\nSQL: $sql");
                        die $@;
                    }
                }
                if ($debug_sql) {
                    print $App::DEBUG_FILE "DEBUG_SQL: retval [$retval] $DBI::errstr\n";
                    print $App::DEBUG_FILE "\n";
                }
        
                if ($retval) {
                    $nrows ++;
                }
                else {
                    $self->{numrows} = $nrows;
                    $ok = 0;
                }
            }
        }
    }
    if ($debug_sql) {
        $elapsed_time = $self->_read_timer($timer);
        print $App::DEBUG_FILE "DEBUG_SQL: nrows [$nrows] ($elapsed_time sec)\n";
    }
    $self->{sql} = $sql;
    $self->{numrows} = $nrows;
    &App::sub_exit($nrows) if ($App::trace);
    return($nrows);
}

sub _delete {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $params, $cols, $row, $options) = @_;
    $self->{error} = "";
    my $sql = $self->_mk_delete_sql($table, $params, $cols, $row, $options);
    $self->{sql} = $sql;

    my $context_options = $self->{context}{options};
    my $debug_sql = $context_options->{debug_sql};
    my ($timer, $elapsed_time);
    if ($debug_sql) {
        $timer = $self->_get_timer();
        print $App::DEBUG_FILE "DEBUG_SQL: _delete()\n";
        print $App::DEBUG_FILE $sql;
    }
    if ($context_options->{explain_sql}) {
        $self->explain_sql($sql);
    }
    my $retval = 0;
    my $dbh = $self->{dbh};
    if (defined $dbh) {
        eval {
            ### TODO: make this work with regex for retry
            $retval = $dbh->do($sql);
            $retval = 0 if ($retval == 0); # turn "0E0" into plain old "0"
        };
        if ($@) {  # Log the error message with the SQL and rethrow the exception
            $self->{context}->log({level=>1},"DBI Exception (fail) in _delete(): $@SQL: $sql");
            die $@;
        }
    }
    if ($debug_sql) {
        $elapsed_time = $self->_read_timer($timer);
        print $App::DEBUG_FILE "DEBUG_SQL: retval [$retval] ($elapsed_time sec) $DBI::errstr\n";
        print $App::DEBUG_FILE "\n";
    }

    &App::sub_exit($retval) if ($App::trace);
    return($retval);
}

# $nrows = $rep->_update($table, \%params,    \@cols, \@row, \%options);
# $nrows = $rep->_update($table, \@keycolidx, \@cols, \@row, \%options);
# $nrows = $rep->_update($table, \@paramcols, \@cols, \@row, \%options);
# $nrows = $rep->_update($table, $key,        \@cols, \@row, \%options);
# $nrows = $rep->_update($table, undef,       \@cols, \@row, \%options);
sub _update {
    &App::sub_entry if ($App::trace);
    my ($self, $table, $params, $cols, $row, $options) = @_;
    $self->{error} = "";
    my $sql = $self->_mk_update_sql($table, $params, $cols, $row, $options);
    $self->{sql} = $sql;

    my $context_options = $self->{context}{options};
    my $debug_sql = $context_options->{debug_sql};
    my ($timer, $elapsed_time);
    if ($debug_sql) {
        $timer = $self->_get_timer();
        print $App::DEBUG_FILE "DEBUG_SQL: _update()\n";
        print $App::DEBUG_FILE $sql;
    }
    if ($context_options->{explain_sql}) {
        $self->explain_sql($sql);
    }
    my $retval = 0;
    my $dbh = $self->{dbh};
    if (defined $dbh) {
        eval {
            ### TODO: make this work with regex for retry
            $retval = $dbh->do($sql);
            $retval = 0 if ($retval == 0); # turn "0E0" into plain old "0"
        };
        if ($@) {  # Log the error message with the SQL and rethrow the exception
            $self->{context}->log({level=>1},"DBI Exception (fail) in _update(): $@SQL: $sql");
            die $@;
        }
    }
    if ($debug_sql) {
        $elapsed_time = $self->_read_timer($timer);
        print $App::DEBUG_FILE "DEBUG_SQL: retval [$retval] ($elapsed_time sec) $DBI::errstr\n";
        print $App::DEBUG_FILE "\n";
    }

    &App::sub_exit($retval) if ($App::trace);
    return($retval);
}

# $ok = $rep->_delete_row ($table, \@cols, \@row, \@keycolidx);
sub _delete_row {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    $self->{error} = "";
    my $sql = $self->_mk_delete_row_sql(@_);
    $self->{sql} = $sql;

    my $context_options = $self->{context}{options};
    my $debug_sql = $context_options->{debug_sql};
    my ($timer, $elapsed_time);
    if ($debug_sql) {
        $timer = $self->_get_timer();
        print $App::DEBUG_FILE "DEBUG_SQL: _delete_row()\n";
        print $App::DEBUG_FILE $sql;
    }
    if ($context_options->{explain_sql}) {
        $self->explain_sql($sql);
    }
    my $retval = 0;
    my $dbh = $self->{dbh};
    if (defined $dbh) {
        eval {
            ### TODO: make this work with regex for retry
            $retval = $dbh->do($sql);
            $retval = 0 if ($retval == 0); # turn "0E0" into plain old "0"
        };
        if ($@) {  # Log the error message with the SQL and rethrow the exception
            $self->{context}->log({level=>1},"DBI Exception (fail) in _delete_row(): $@SQL: $sql");
            die $@;
        }
    }
    if ($debug_sql) {
        $elapsed_time = $self->_read_timer($timer);
        print $App::DEBUG_FILE "DEBUG_SQL: retval [$retval] ($elapsed_time sec) $DBI::errstr\n";
        print $App::DEBUG_FILE "\n";
    }

    &App::sub_exit($retval) if ($App::trace);
    $retval;
}

# $ok = $rep->_delete_rows($table, \@params, \%paramvalues);
sub _delete_rows {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    $self->{error} = "";
    my $sql = $self->_mk_delete_rows_sql(@_);
    $self->{sql} = $sql;

    my $context_options = $self->{context}{options};
    my $debug_sql = $context_options->{debug_sql};
    my ($timer, $elapsed_time);
    if ($debug_sql) {
        $timer = $self->_get_timer();
        print $App::DEBUG_FILE "DEBUG_SQL: _delete_rows()\n";
        print $App::DEBUG_FILE $sql;
    }
    if ($context_options->{explain_sql}) {
        $self->explain_sql($sql);
    }
    my $retval = 0;
    my $dbh = $self->{dbh};
    if (defined $dbh) {
        eval {
            ### TODO: make this work with regex for retry
            $retval = $dbh->do($sql);
            $retval = 0 if ($retval == 0); # turn "0E0" into plain old "0"
        };
        if ($@) {  # Log the error message with the SQL and rethrow the exception
            $self->{context}->log({level=>1},"DBI Exception (fail) in _delete_rows(): $@SQL: $sql");
            die $@;
        }
    }
    $retval = 0 if ($retval == 0); # turn "0E0" into plain old "0"
    if ($debug_sql) {
        $elapsed_time = $self->_read_timer($timer);
        print $App::DEBUG_FILE "DEBUG_SQL: retval [$retval] ($elapsed_time sec) $DBI::errstr\n";
        print $App::DEBUG_FILE "\n";
    }

    &App::sub_exit($retval) if ($App::trace);
    $retval;
}

sub _do {
    &App::sub_entry if ($App::trace);
    my ($self, $sql) = @_;
    $self->{error} = "";
    $self->{sql} = $sql;
    my $dbh = $self->{dbh};
    my $retval = 0;

    my $context_options = $self->{context}{options};
    my $debug_sql = $context_options->{debug_sql};
    my ($timer, $elapsed_time);
    if ($debug_sql) {
        $timer = $self->_get_timer();
        print $App::DEBUG_FILE "DEBUG_SQL: _do()\n";
        print $App::DEBUG_FILE $sql;
        print $App::DEBUG_FILE "\n" if ($sql !~ /\n$/);
    }
    if ($context_options->{explain_sql}) {
        $self->explain_sql($sql);
    }
    if (defined $dbh) {
        $self->{sql} = $sql;
        my $continue = 1;
        my $tries = 1;
        while ($continue) {
            eval {
                if ($sql =~ /^select/i) {
                    $retval = $dbh->selectall_arrayref($sql);
                }
                else {
                    $retval = $dbh->do($sql);
                    $retval = 0 if ($retval == 0); # turn "0E0" into plain old "0"
                }
            };
            if ($@) {  # Log the error message with the SQL and rethrow the exception
                my $retryable_modify_error_regex = $self->retryable_modify_error_regex();
                if ($@ =~ /$retryable_modify_error_regex/i) {
                    if ($tries >= 3) {
                        $self->{context}->log({level=>1},"DBI Exception (fail) (tries=$tries) in _do(): $@$sql");
                        die $@;
                    }
                    $self->{context}->log({level=>1},"DBI Exception (retry) (tries=$tries) in _do(): $@$sql");
                    $tries++;
                    sleep(1);
                }
                else {
                    $self->{context}->log({level=>1},"DBI Exception (fail) in _do(): $@$sql");
                    die $@;
                }
            }
            else {
                $continue = 0;
            }
        }
    }
    if ($debug_sql) {
        my $nrows = 0;
        if ($retval) {
            if (ref($retval)) {
                $nrows = $#$retval + 1;
            }
            else {
                $nrows = $retval;
            }
        }
        $elapsed_time = $self->_read_timer($timer);
        print $App::DEBUG_FILE "DEBUG_SQL: nrows [$nrows] ($elapsed_time sec) $DBI::errstr\n";
        if ($debug_sql >= 2 && ref($retval)) {
            foreach my $row (@$retval) {
                print $App::DEBUG_FILE "DEBUG_SQL: [", join("|",map { defined $_ ? $_ : "undef"} @$row), "]\n";
            }
        }
        print $App::DEBUG_FILE "\n";
    }

    &App::sub_exit($retval) if ($App::trace);
    $retval;
}

#############################################################################
# begin_work()
#############################################################################

=head2 begin_work()

    * Signature: $rep->begin_work();
    * Param:     void
    * Return:    void
    * Throws:    App::Exception::Repository
    * Since:     0.01

    Sample Usage: 

    $rep->begin_work();

=cut

sub begin_work {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    $self->_connect();
    if (!$self->{in_transaction}) {
        $self->{dbh}->begin_work();
        $self->{in_transaction} = 1;
    }
    &App::sub_exit() if ($App::trace);
}

#############################################################################
# commit()
#############################################################################

=head2 commit()

    * Signature: $rep->commit();
    * Param:     void
    * Return:    void
    * Throws:    App::Exception::Repository
    * Since:     0.01

    Sample Usage: 

    $rep->commit();

=cut

sub commit {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    if ($self->{in_transaction}) {
        $self->{dbh}->commit();
        delete $self->{in_transaction};
    }
    &App::sub_exit() if ($App::trace);
}

#############################################################################
# rollback()
#############################################################################

=head2 rollback()

    * Signature: $rep->rollback();
    * Param:     void
    * Return:    void
    * Throws:    App::Exception::Repository
    * Since:     0.01

    Sample Usage: 

    $rep->rollback();

=cut

sub rollback {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    if ($self->{in_transaction}) {
        $self->{dbh}->rollback();
        delete $self->{in_transaction};
    }
    &App::sub_exit() if ($App::trace);
}

#############################################################################
# call_procedure()
#############################################################################

=head2 call_procedure()

    * Signature: $rep->call_procedure($call_str);
    * Signature: $rep->call_procedure($call_str, $return_type, $param_types, @params);
    * Signature: $result  = $rep->call_procedure($call_str, $return_type);
    * Signature: $result  = $rep->call_procedure($call_str, $return_type, $param_types, @params);
    * Signature: @results = $rep->call_procedure($call_str, $return_type);
    * Signature: @results = $rep->call_procedure($call_str, $return_type, $param_types, @params);
    * Signature: $rows    = $rep->call_procedure($call_str, $return_type);
    * Signature: $rows    = $rep->call_procedure($call_str, $return_type, $param_types, @params);
    * Param:     void
    * Return:    $result   string    (if $return_type is "SCALAR")
    * Return:    @results  ARRAY     (if $return_type is "LIST")
    * Return:    $row      ARRAY     (if $return_type is "ROW")
    * Return:    $rows     ARRAY     (if $return_type is "ROWS")
    * Throws:    App::Exception::Repository
    * Since:     0.01

There is no standard way to call stored procedures in the DBI.
This is an attempt to provide access to them.

    MySQL: Sample Usage
    1. As of DBD-mysql-3.0008 and MySQL 5.1.12, INOUT and OUT parameters are not supported
    2. In order to receive values back from a stored procedure in MySQL,
       you need to have applied the "dbd-mysql-multi-statements.patch" patch.
       https://rt.cpan.org/Public/Bug/Display.html?id=12322
       https://rt.cpan.org/Ticket/Attachment/167152/53763/dbd-mysql-multi-statements.patch
       This supports the "SCALAR" return type (and maybe "LIST" and "ROW"), but
       a stored procedure can still not return multiple rows ("ROWS"). (I think.)
       You DSN needs to have "mysql_multi_results=1" set to activate the ability to
       get rows back from a stored procedure.

    $rep->call_procedure("call sp_doit('prod',5)");
    $val           = $rep->call_procedure("call sp_doit_return_val('prod',5)", "SCALAR");
    ($val1, $val2) = $rep->call_procedure("call sp_doit_return_vals('prod',5)", "LIST");
    $row           = $rep->call_procedure("call sp_doit_return_vals('prod',5)", "ROW");

=cut

sub call_procedure {
    &App::sub_entry if ($App::trace);
    my ($self, $call_str, $return_type, $param_options, @params) = @_;
    my $dbh = $self->{dbh};
    my $sth = $dbh->prepare($call_str);
    my ($i, $param_option, $param_direction, $param_length, $param_type);
    for ($i = 0; $i <= $#params; $i++) {
        $param_option = $param_options->[$i];
        if (!ref($param_option)) {
            $param_direction = $param_option || "IN";
            $param_length    = 100;
            $param_type      = undef;
        }
        else {
            $param_direction = $param_option->{direction} || "IN";
            $param_length    = $param_option->{length} || 100;
            $param_type      = $param_option;
        }
        if ($param_direction eq "OUT") {
            $sth->bind_param_inout($i+1, \$_[$i+4], $param_length);
        }
        elsif ($param_direction eq "INOUT") {
            $sth->bind_param_inout($i+1, \$_[$i+4], $param_length);
        }
        else {
            $sth->bind_param($i+1, $params[$i], $param_type);
        }
    }
    $sth->execute();
    my (@values);
    my $rows = [];
    if (defined $return_type) {
        while (@values = $sth->fetchrow_array()) {
            push(@$rows, [@values]);
        }
        if ($return_type eq "LIST") {
            @values = @{$rows->[0]} if ($#$rows > -1);
        }
        elsif ($return_type eq "SCALAR") {
            @values = ($rows->[0][0]) if ($#$rows > -1 && $#{$rows->[0]} > -1);
        }
        elsif ($return_type eq "ROW") {
            @values = ( $rows->[0] ) if ($#$rows > -1);
        }
        elsif ($return_type eq "ROWS") {
            @values = ( $rows );
        }
    }
    $sth->finish(); 
    if ($return_type eq "LIST") {
        &App::sub_exit(@values) if ($App::trace);
        return(@values);
    }
    else {
        &App::sub_exit($values[0]) if ($App::trace);
        return($values[0]);
    }
}

sub explain_sql {
    &App::sub_entry if ($App::trace);
    my ($self, $sql) = @_;
    # to be overridden in each Repository class
    &App::sub_exit() if ($App::trace);
}

######################################################################
# METADATA REPOSITORY METHODS (implements methods from App::Repository)
######################################################################

# REMOVE ALL DEPENDENCE ON DBIx::Compat
# (ok. I want to, but I'm not ready to rewrite ListFields.)
use DBIx::Compat;

sub _load_rep_metadata_from_source {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;

    my ($dbdriver, $dbh);
    $dbdriver = $self->{dbdriver};
    $dbh = $self->{dbh};

    #####################################################
    # TABLE DATA
    #####################################################

    my ($table, @tables, $func);

    # if we are not hiding the physical tables, go get them
    if (! $self->{hide_physical}) {

        # get a list of the physical tables from the database
        # in MySQL 4.0.13, the table names are surrounded by backticks (!?!)
        # so for safe measure, get rid of all quotes
        # Also, get rid of prepended schema names.
        @tables = grep(s/^[^.]+\.//, grep(s/['"`]//g, $dbh->tables(undef, undef, undef, "TABLE")));

        # REMOVE ALL DEPENDENCE ON DBIx::Compat
        # if the DBI method doesn't work, try the DBIx method...
        # if ($#tables == -1) {
        #     $func = DBIx::Compat::GetItem($dbdriver, "ListTables");
        #     @tables = &{$func}($dbh);
        # }

        # go through the list of native tables from the database
        foreach $table (@tables) {

            # if it has never been defined, then define it
            if (!defined $self->{table}{$table}) {
                $self->{table}{$table} = {
                    "name" => $table,
                };
            }

            # if it has not been added to the list and it is not explicitly hidden, add to list
            if (!defined $self->{table}{$table}{idx} && ! $self->{table}{$table}{hide}) {
                push(@{$self->{tables}}, $table);                  # add to list
                $self->{table}{$table}{idx} = $#{$self->{tables}}; # take note of the index
            }
        }
    }

    #########################################################
    # TYPE DATA
    # note: these are native database types, whereas a Repository "type" is a standard
    #########################################################

    my ($ntype_attribute_idx, @ntype_attribute_values);
    ($ntype_attribute_idx, @ntype_attribute_values) = @{$dbh->type_info_all};

    # Contents of $type_attribute_idx for MySQL:
    # $ntype_attribute_idx = {
    #     "TYPE_NAME"          =>  0,
    #     "DATA_TYPE"          =>  1,
    #     "COLUMN_SIZE"        =>  2,
    #     "LITERAL_PREFIX"     =>  3,
    #     "LITERAL_SUFFIX"     =>  4,
    #     "CREATE_PARAMS"      =>  5,
    #     "NULLABLE"           =>  6,
    #     "CASE_SENSITIVE"     =>  7,
    #     "SEARCHABLE"         =>  8,
    #     "UNSIGNED_ATTRIBUTE" =>  9,
    #     "FIXED_PREC_SCALE"   => 10,
    #     "AUTO_UNIQUE_VALUE"  => 11,
    #     "LOCAL_TYPE_NAME"    => 12,
    #     "MINIMUM_SCALE"      => 13,
    #     "MAXIMUM_SCALE"      => 14,
    #     "NUM_PREC_RADIX"     => 15,
    #     "mysql_native_type"  => 16,
    #     "mysql_is_num"       => 17,
    # };

    # Contents of @ntype_attribute_values for MySQL:
    # TYPE_NAME   DATA_TYPE COLUMN_SIZE PRE SUF CREATEPARAMETERS NUL CASE SRCH UNS FIX AUTO LTYPE MINS MAXS RDX
    # varchar            12         255 '   '   max length         1    0    1   0   0    0 0        0    0   0
    # decimal             3          15         precision,scale    1    0    1   0   0    0 0        0    6   2
    # tinyint            -6           3                            1    0    1   0   0    0 0        0    0  10
    # smallint            5           5                            1    0    1   0   0    0 0        0    0  10
    # integer             4          10                            1    0    1   0   0    0 0        0    0  10
    # float               7           7                            1    0    0   0   0    0 0        0    2   2
    # double              8          15                            1    0    1   0   0    0 0        0    4   2
    # timestamp          11          14 '   '                      0    0    1   0   0    0 0        0    0   0
    # bigint             -5          20                            1    0    1   0   0    0 0        0    0  10
    # middleint           4           8                            1    0    1   0   0    0 0        0    0  10
    # date                9          10 '   '                      1    0    1   0   0    0 0        0    0   0
    # time               10           6 '   '                      1    0    1   0   0    0 0        0    0   0
    # datetime           11          21 '   '                      1    0    1   0   0    0 0        0    0   0
    # year                5           4                            1    0    1   0   0    0 0        0    0   0
    # date                9          10 '   '                      1    0    1   0   0    0 0        0    0   0
    # enum               12         255 '   '                      1    0    1   0   0    0 0        0    0   0
    # set                12         255 '   '                      1    0    1   0   0    0 0        0    0   0
    # blob               -1       65535 '   '                      1    0    1   0   0    0 0        0    0   0
    # tinyblob           -1         255 '   '                      1    0    1   0   0    0 0        0    0   0
    # mediumblob         -1    16777215 '   '                      1    0    1   0   0    0 0        0    0   0
    # longblob           -1  2147483647 '   '                      1    0    1   0   0    0 0        0    0   0
    # char                1         255 '   '   max length         1    0    1   0   0    0 0        0    0   0
    # decimal             2          15         precision,scale    1    0    1   0   0    0 0        0    6   2
    # tinyint unsigned   -6           3                            1    0    1   1   0    0 0        0    0  10
    # smallint unsigned   5           5                            1    0    1   1   0    0 0        0    0  10
    # middleint unsigned  4           8                            1    0    1   1   0    0 0        0    0  10
    # int unsigned        4          10                            1    0    1   1   0    0 0        0    0  10
    # int                 4          10                            1    0    1   0   0    0 0        0    0  10
    # integer unsigned    4          10                            1    0    1   1   0    0 0        0    0  10
    # bigint unsigned    -5          20                            1    0    1   1   0    0 0        0    0  10
    # text               -1       65535 '   '                      1    0    1   0   0    0 0        0    0   0
    # mediumtext         -1    16777215 '   '                      1    0    1   0   0    0 0        0    0   0

    my ($ntype_name, @ntype_names, $ntype_num, $ntype_attribute_values, $ntype_def);
    my ($ntype_name_idx, $ntype_num_idx, $column_size_idx, $literal_prefix_idx, $literal_suffix_idx);
    my ($unsigned_attribute_idx, $auto_unique_value_idx, $column);

    $ntype_name_idx         = $ntype_attribute_idx->{"TYPE_NAME"};
    $ntype_num_idx          = $ntype_attribute_idx->{"DATA_TYPE"};
    $column_size_idx        = $ntype_attribute_idx->{"COLUMN_SIZE"};
    $literal_prefix_idx     = $ntype_attribute_idx->{"LITERAL_PREFIX"};
    $literal_suffix_idx     = $ntype_attribute_idx->{"LITERAL_SUFFIX"};
    $unsigned_attribute_idx = $ntype_attribute_idx->{"UNSIGNED_ATTRIBUTE"};
    $auto_unique_value_idx  = $ntype_attribute_idx->{"AUTO_UNIQUE_VALUE"};

    # go through the list of native type info from the DBI handle
    foreach $ntype_attribute_values (@ntype_attribute_values) {

        $ntype_name = $ntype_attribute_values->[$ntype_name_idx];
        $ntype_num = $ntype_attribute_values->[$ntype_num_idx];
        $ntype_def = {};
        push(@ntype_names, $ntype_name);

        $self->{native}{type}{$ntype_name} = $ntype_def;
        if (!defined $self->{native}{type}{$ntype_num}) {
            $self->{native}{type}{$ntype_num} = $ntype_def;
        }

        # save all the info worth saving in a native type definition
        $ntype_def->{name}               = $ntype_name;  # a real type name
        $ntype_def->{num}                = $ntype_num;  # an internal data type number
        $ntype_def->{column_size}        = $ntype_attribute_values->[$column_size_idx];
        $ntype_def->{literal_prefix}     = $ntype_attribute_values->[$literal_prefix_idx];
        $ntype_def->{literal_suffix}     = $ntype_attribute_values->[$literal_suffix_idx];
        $ntype_def->{unsigned_attribute} = $ntype_attribute_values->[$unsigned_attribute_idx];
        $ntype_def->{auto_unique_value}  = $ntype_attribute_values->[$auto_unique_value_idx];
        $ntype_def->{literal_prefix}     = "" if (! defined $ntype_def->{literal_prefix});
        $ntype_def->{literal_suffix}     = "" if (! defined $ntype_def->{literal_suffix});

        $ntype_def->{quoted} = ($ntype_def->{literal_prefix} ne "" || $ntype_def->{literal_suffix} ne "");

        # translate a native type into a repository type

        if ($ntype_name =~ /char/ || $ntype_name eq "enum" || $ntype_name eq "set") {
            $ntype_def->{type} = "string";
        }
        elsif ($ntype_name =~ /text/) {
            $ntype_def->{type} = "text";
        }
        elsif ($ntype_name =~ /int/ || $ntype_name eq "year") {
            $ntype_def->{type} = "integer";
        }
        elsif ($ntype_name =~ /decimal/ || $ntype_name =~ /float/ || $ntype_name =~ /double/) {
            $ntype_def->{type} = "float";
        }
        elsif ($ntype_name =~ /datetime/ || $ntype_name =~ /timestamp/) {
            $ntype_def->{type} = "datetime";
        }
        elsif ($ntype_name =~ /time/) {
            $ntype_def->{type} = "time";
        }
        elsif ($ntype_name =~ /date/) {
            $ntype_def->{type} = "date";
        }
        elsif ($ntype_name =~ /blob/ || $ntype_name =~ /binary/) {
            $ntype_def->{type} = "binary";
        }
    }

    $self->{native}{types} = \@ntype_names;

    #########################################################
    # DATABASE ATTRIBUTES
    #########################################################
    # REMOVE ALL DEPENDENCE ON DBIx::Compat
    # $self->{native}{support_join}           = DBIx::Compat::GetItem($dbdriver, "SupportJoin");
    # $self->{native}{inner_join_syntax}      = DBIx::Compat::GetItem($dbdriver, "SupportSQLJoin");
    # $self->{native}{inner_join_only2tables} = DBIx::Compat::GetItem($dbdriver, "SQLJoinOnly2Tabs");
    # $self->{native}{have_types}             = DBIx::Compat::GetItem($dbdriver, "HaveTypes");
    # $self->{native}{null_operator}          = DBIx::Compat::GetItem($dbdriver, "NullOperator");
    # $self->{native}{need_null_in_create}    = DBIx::Compat::GetItem($dbdriver, "NeedNullInCreate");
    # $self->{native}{empty_is_null}          = DBIx::Compat::GetItem($dbdriver, "EmptyIsNull");

    &App::sub_exit() if ($App::trace);
}

sub _load_table_metadata_from_source {
    &App::sub_entry if ($App::trace);
    my ($self, $table) = @_;

    return if (! $table);

    my ($dbdriver, $dbh, $sth, $native_table, $table_def);
    my (@tables, $column, $func, $tablealias);

    $dbdriver = $self->{dbdriver};
    $dbh = $self->{dbh};
    $table_def = $self->{table}{$table};
    return if (!defined $table_def);

    $native_table = $table;     # assume the table name is a physical one
    $native_table = $table_def->{native_table} if ($table_def->{native_table});

    $table_def->{name} = $table;

    $tablealias = $table_def->{alias};
    if (! defined $tablealias) {
        $tablealias = "t" . $self->serial("table");
        $table_def->{alias} = $tablealias;
    }

    $table_def->{tablealiases} = [ $tablealias ]
        if (!defined $table_def->{tablealiases});
    $table_def->{tablealias} = {}
        if (!defined $table_def->{tablealias});
    $table_def->{tablealias}{$tablealias} = {}
        if (!defined $table_def->{tablealias}{$tablealias});
    $table_def->{tablealias}{$tablealias}{table} = $table
        if (!defined $table_def->{tablealias}{$tablealias}{table});

    #########################################################
    # COLUMN DATA
    #########################################################
    my ($colnum, $data_types, $columns, $column_def, $phys_columns);
    my ($native_type_num, $native_type_def, $phys_table);

    # REMOVE ALL DEPENDENCE ON DBIx::Compat
    # (ok. I want to, but I'm not ready to rewrite ListFields.)
    $func = DBIx::Compat::GetItem($dbdriver, "ListFields");
    eval {
        $sth  = &{$func}($dbh, $table);
    };
    if (!$@) {
        $table_def->{phys_table} = $table;
        $phys_columns = $sth->{NAME};    # array of fieldnames
        $data_types   = $sth->{TYPE};    # array of fieldtypes

        $columns = $table_def->{columns};
        if (! defined $columns) {
            $columns = [];
            $table_def->{columns} = $columns;
        }

        # if we got a list of columns for the table from the database
        if (defined $phys_columns && ref($phys_columns) eq "ARRAY") {

            $table_def->{phys_columns} = [ @$phys_columns ];

            for ($colnum = 0; $colnum <= $#$phys_columns; $colnum++) {
                $column = $phys_columns->[$colnum];

                $column_def = $table_def->{column}{$column};
                if (!defined $column_def) {
                    $column_def = {};
                    $table_def->{column}{$column} = $column_def;
                }
                next if ($column_def->{hide});

                $native_type_num = $data_types->[$colnum];
                $native_type_def = $self->{native}{type}{$native_type_num};

                if (! $self->{hide_physical} && ! defined $column_def->{idx}) {
                    push(@$columns, $column);
                    $column_def->{idx} = $#$columns;
                }

                $column_def->{name}   = $column;
                $column_def->{type}   = $native_type_def->{type};
                $column_def->{quoted} = $native_type_def->{quoted} ? 1 : 0;
                $column_def->{alias}  = "cn$colnum" if (!defined $column_def->{alias});
                $column_def->{dbexpr} = $table_def->{alias} . "." . $column
                    if (!defined $column_def->{dbexpr});
            }
        }
    }
    #else {
    #    die $@;
    #}

    ######################################################################
    # primary key
    ######################################################################

    if (!$self->{primary_key} || !$self->{alternate_key}) {
        $self->_load_table_key_metadata($table);
    }

    ######################################################################
    # tables that are related via tablealiases can be "import"-ed
    # this copies all of the column definitions from the imported table to this table
    # TODO: allow for role modifiers in related tables
    # TODO: rethink "import=1" to "multiplicity=1"
    # TODO: think about chained imports
    # TODO: think about import on demand rather than in advance
    ######################################################################
    my ($tablealiases, $alias, $alias_def, $related_table, $related_table_def);
    my ($tablealias_defs, $tablealias_def, $idx);

    $tablealiases = $table_def->{tablealiases};
    if (defined $tablealiases && ref($tablealiases) eq "ARRAY") {
        foreach $alias (@$tablealiases) {
            $alias_def = $table_def->{tablealias}{$alias};
            if ($alias_def->{import}) {
                $related_table = $alias_def->{table};
                if (! $self->{table}{$related_table}{loaded}) {
                    $self->_load_table_metadata($related_table);
                }
                $related_table_def = $self->{table}{$related_table};
                foreach $column (@{$related_table_def->{columns}}) {
                    if (! defined $table_def->{column}{$column} &&
                          defined $related_table_def->{column}{$column}) {
                        $table_def->{column}{$column} = $related_table_def->{column}{$column};
                    }
                }
            }
        }
    }

    # for each tablealias named in the configuration, give it a number up front
    $tablealias_defs = $table_def->{tablealias};
    for ($idx = 0; $idx <= $#$tablealiases; $idx++) {
        $tablealias = $tablealiases->[$idx];
        $tablealias_defs->{$tablealias}{idx} = $idx;
    }

    # for each tablealias in the hash (random order), add them to the end
    foreach $tablealias (keys %$tablealias_defs) {
        $tablealias_def = $tablealias_defs->{$tablealias};

        # table has not been added to the list and it's not explicitly "hidden", so add it
        if (!defined $tablealias_def->{idx}) {
            push(@$tablealiases, $tablealias);
            $tablealias_def->{idx} = $#$tablealiases;
        }
    }

    #if ($App::DEBUG >= 2 && $self->{context}->dbg(2)) {
    #    print $App::DEBUG_FILE "Table Metadata: $table\n";
    #    my $d = Data::Dumper->new([ $table_def ], [ "table_def" ]);
    #    $d->Indent(1);
    #    print $App::DEBUG_FILE $d->Dump();
    #}
    &App::sub_exit() if ($App::trace);
}

sub _load_table_key_metadata {
    &App::sub_entry if ($App::trace);
    my ($self, $table) = @_;

    return if (! $table);
    my $table_def = $self->{table}{$table};
    return if (! $table_def);
    my $dbh = $self->{dbh};

    # if not defined at all, try to get it from the database
    if (! defined $table_def->{primary_key}) {
        eval {
            $table_def->{primary_key} = [ $dbh->primary_key($self->{dbcatalog}, $self->{dbschema}, $table) ];
        };
    }
    &App::sub_exit() if ($App::trace);
}

1;