The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

package OOPS::pg;

@ISA = qw(OOPS::DBO);

require OOPS;
use OOPS::DBO;
use strict;
use warnings;
use Carp qw(confess);
use DBD::Pg qw(:pg_types);

BEGIN {
	Filter::Util::Call::filter_add(\&OOPS::SelfFilter::filter)
		unless $OOPS::SelfFilter::defeat;
}

sub tmode
{
	my ($dbo, $dbh) = @_;
	$dbh = $dbo->{dbh} unless $dbh;

	# READ COMMITTED is the default
	# my $tmode2 = $dbo->{counterdbh}->prepare('SET TRANSACTION ISOLATION LEVEL READ COMMITTED') || die;
	# $tmode2->execute() || die $tmode2->errstr;
	unless ($dbo->{readonly}) {
		my $tmode = $dbh->prepare('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE') || die;
		$tmode->execute() || die;
	}
}

#
# Error code that indicates deadlock or clashing transactions.
#
sub deadlock_rx
{
	return (
		qr{ERROR:  could not serialize access due to concurrent update},			  	# pg 8.1
		qr{ERROR:  could not serialize access due to read/write dependencies among transactions}, 	# pg 9.1
		qr{ERROR:  deadlock detected},
		qr{ERROR:  duplicate key value violates unique constraint},     				# pg 9.1
		qr{ERROR:  duplicate key violates unique constraint},						# pg 8.1
	);
}

sub nodata_rx
{
	return qr/ERROR:  relation "\S+object" does not exist/;
}

sub initialize
{
	my ($dbo) = @_;

	# $dbo->tmode;

	$dbo->{counterdbh} = OOPS::dbiconnect(undef, %$dbo);

	$dbo->{id_pool_start} = 0;
	$dbo->{id_pool_end} = 0;
}

#
# Postgres SERIALIZABLE doesn't really work: adding a new
# row that would have been returned by query in another process
# is allowed.  Setting this to 1 forces the object record
# to be updated every time the object contents change.
#
sub do_forcesave { 1 };

sub tabledefs
{
	my $x = <<'END';

	CREATE TABLE TP_object (
		id		BIGINT,
		loadgroup	BIGINT, 
		class 		BYTEA, 			# ref($object)
		otype		CHAR(1),		# 'S'calar/ref, 'A'rray, 'H'ash
		virtual		CHAR(1),		# load virutal ('V' or '0')
		reftarg		CHAR(1),		# reference target ('T' or '0')
		rfe		CHAR(1),		# reserved for future expansion
		alen		INT,			# array length
		refs		INT, 			# references
		counter		SMALLINT,
		gcgeneration	INT DEFAULT 1,
		PRIMARY KEY (id));

	CREATE INDEX TP_group_index ON TP_object (loadgroup);

	CREATE TABLE TP_attribute (
		id		BIGINT NOT NULL, 
		pkey		BYTEA,
		pval		BYTEA,
		ptype		VARCHAR(1),		# type '0'-normal or 'R'eference 'B'ig
		PRIMARY KEY (id, pkey));

	CREATE INDEX TP_value_index ON TP_attribute (pval);

	CREATE TABLE TP_big (
		id		BIGINT NOT NULL, 
		pkey		BYTEA,
		pval		BYTEA,
		PRIMARY KEY (id, pkey));

	CREATE TABLE TP_counters (
		name		VARCHAR(128),
		cval		BIGINT,
		PRIMARY KEY	(name));

END
	$x =~ s/#.*//mg;
	return $x;
}

sub table_list
{
	return (qw(TP_object TP_attribute TP_big TP_counters));
}

sub db_initial_values
{
	require OOPS::Setup;
	return <<END;
	INSERT INTO TP_counters values ('objectid', $OOPS::last_reserved_oid + 1);
END
}

sub initial_query_set
{
	return <<END;
		allocate_id:
			UPDATE TP_counters
			SET cval = cval + ?
			WHERE name = 'objectid'
		get_id:
			SELECT cval 
			FROM TP_counters
			WHERE name = 'objectid'
		bigload: 2
			SELECT pval FROM TP_big 
			WHERE id = ? AND pkey = ?
		savebig: 2 3
			INSERT INTO TP_big 
			VALUES (?, ?, ?)
		updatebig: 1 3
			UPDATE TP_big
			SET pval = ?
			WHERE id = ? AND pkey = ?
		lock_object:
			SELECT loadgroup 
			FROM TP_object
			WHERE id = ? FOR UPDATE 
		lock_attribute:
			SELECT ptype
			FROM TP_attribute
			WHERE id = ? AND pkey = ? FOR UPDATE
END
}


#
# bind_param only has to happen once.
#
sub query
{
	my ($dbo, $q, %args) = @_;

	my $query;
	my $dbh;
	my $sth;
	my $fresh = 0;

	$dbo->query_debug('pg', $q, %args);

	if (($sth = $dbo->{cached_queries}{$q})) {
		# great
		if ($sth->{Active}) {
			print "Query $q was still active\n" if $OOPS::debug_queries;
			delete $dbo->{cached_queries}{$q};
			delete $dbo->{bind_done}{$q};
			return query($dbo, $q, %args);
		}
		if ($dbo->{binary_q_list}{$q} && ! $dbo->{bind_done}{$q}) {
			$fresh = 1;
		}
	} elsif (($query = $dbo->{queries}{$q})) {
		1 while $query =~ s/DBO:CAST:PG2INT\(($pmatch)\)/CAST($1 AS integer)/s;
		1 while $query =~ s/DBO:CAST:PGBYTEA2INT\(($pmatch)\)/CAST(encode($1, 'escape') AS integer)/s;
		1 while $query =~ s/DBO:CAST:PG2BYTEA\(($pmatch)\)/decode(CAST($1 AS text), 'escape')/s;
		die if $query =~ /\bDBO:[A-Z]+:PG/;
		$query = $dbo->clean_query($query);
		$dbh = $args{dbh} || $dbo->{dbh};
		$sth = $dbh->prepare($query) || die $dbh->errstr;
		$dbo->{cached_queries}{$q} = $sth;
		$fresh = 1;
	} else {
		confess "no query <$q>";
	}

	if ($dbo->{binary_q_list}{$q} && ! $dbo->{binary_params}{$q}) {
		$dbo->{binary_params}{$q} = [];
		for my $i (grep($_ > 0, split(' ', $dbo->{binary_q_list}{$q}))) {
			$dbo->{binary_params}{$q}[$i] = 1;
		}
	}


	my $debug_x = ++$dbo->{invoke_count}{$q};
	print $dbo->{binary_q_list}{$q} ? "BINARY: $q - $debug_x/$fresh\n" : "NOT B: $q - $debug_x/$fresh\n"
		if $OOPS::debug_dbd;

	if (exists $args{execute}) {
		my @a = defined($args{execute})
			? (ref($args{execute})
				? @{$args{execute}}
				: $args{execute})
			: ();

		my $e;
		if ($dbo->{binary_params}{$q} && $fresh) {
			for (my $i = 0; $i <= $#a; $i++) {
				if ($dbo->{binary_params}{$q}[$i+1]) {
					$sth->bind_param($i+1, $a[$i], 
						{ pg_type => PG_BYTEA });
				printf "Bind-param %s #%d - binary\n", $q, $i+1 if $OOPS::debug_dbd;
				} else {
					$sth->bind_param($i+1, $a[$i]);
				}
			}
			$dbo->{bind_done}{$q} = 1;
			$sth->execute() or $e = "Could Not Execute '$query' with '@a':" . ($sth->errstr);
		} else {
			$sth->execute(@a) or $e = "could not execute '$query' with '@a':".$sth->errstr;
		}
		if ($e) {
			$e =~ s/\n/\\n /g; # debug
			confess($e);
		}
	} elsif ($dbo->{binary_params}{$q} && $fresh) {
		print "Using wrapper...\n" if $OOPS::debug_dbd;
		return OOPS::pg::sth->new($sth, $q, $dbo->{binary_params}{$q}, \$dbo->{bind_done}{$q});
	}

	return $sth;
}

sub lock_object
{
	my ($dbo, $id) = @_;
	my $q = $dbo->query('lock_object', execute => [ $id ]);
	(undef) = $q->fetchrow_array;
	$q->finish()
}

sub lock_attribute
{
	my ($dbo, $id, $pkey) = @_;
	my $q = $dbo->query('lock_attribute', execute => [ $id, $pkey ]);
	(undef) = $q->fetchrow_array;
	$q->finish()
}

sub allocate_id
{
	my $dbo = shift;
	my $id;
	if ($dbo->{id_pool_start} && $dbo->{id_pool_start} < $dbo->{id_pool_end}) {
		$id = $dbo->{id_pool_start}++;
		print "in allocate_id, allocating $id from pool\n" if $OOPS::debug_object_id;
	} else {
		my $allocate_idQ = $dbo->query('allocate_id', dbh => $dbo->{counterdbh}, execute => $OOPS::id_alloc_size);
		my $get_idQ = $dbo->query('get_id', dbh => $dbo->{counterdbh}, execute => []);
		(($id) = $get_idQ->fetchrow_array) || die $get_idQ->errstr;
		$get_idQ->finish;
		$dbo->{id_pool_start} = $id+1;
		$dbo->{id_pool_end} = $id+$OOPS::id_alloc_size;
		$dbo->{counterdbh}->commit || die $dbo->{counterdbh}->errstr;
		print "in allocate_id, new pool: $dbo->{id_pool_start} to $dbo->{id_pool_end}\n" if $OOPS::debug_object_id;
		print "in allocate_id, allocated $id from before pool\n" if $OOPS::debug_object_id;
	}
	return $id;
}

sub post_new_object
{
	my $dbo = shift;
	return $_[0];
}

sub disconnect
{
	my $dbo = shift;
	$dbo->{counterdbh}->disconnect() if $dbo->{counterdbh};
	delete $dbo->{counterdbh};
	$dbo->SUPER::disconnect();
}

package OOPS::pg::sth;

use strict;
use warnings;
use Carp qw(confess);
use DBD::Pg qw(:pg_types);

sub new
{
	my ($pkg, $sth, $q, $binary_params, $doneref) = @_;
	return bless [ $sth, $q, $binary_params, $doneref];
}

sub execute
{
	my ($self, @values) = @_;
	my ($sth, $q, $binary_params, $doneref) = @$self;
	$$doneref = 2;

	for (my $i = 0; $i <= $#values; $i++) {
		die if ref $values[$i];
		if ($binary_params->[$i+1]) {
			$sth->bind_param($i+1, $values[$i], 
				{ pg_type => PG_BYTEA });
			printf "Bind-param %s #%d - binary\n", $q, $i+1 if $OOPS::debug_dbd;
		} else {
			$sth->bind_param($i+1, $values[$i]);
		}
	}
	@$self = ($sth);
	$sth->execute();
}

sub AUTOLOAD
{
	my $self = shift;
	our $AUTOLOAD;
	my $a = $AUTOLOAD;
	$a =~ s/.*:://;
	my $method = $self->[0]->can($a) || $self->[0]->can($AUTOLOAD) || confess "cannot find method $a for $self->[0]";
	&$method($self->[0], @_);
}

1;