The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package DBI::Filesystem::DBD::Pg;

use strict;
use warnings;
use base 'DBI::Filesystem';
use DBD::Pg 'PG_BYTEA';

# this provides a DSN used during automatic testing by ./Build test
sub test_dsn {
    return "dbi:Pg:dbname=postgres";
}

# called after database handle is first created to do extra preparation on it
sub _dbh_init {
    my ($self,$dbh) = @_;
    $dbh->do('set client_min_messages to WARNING');
}

sub _metadata_table_def {
    return <<END;
create table metadata (
    inode        serial       primary key,
    mode         integer      not null,
    uid          integer      not null,
    gid          integer      not null,
    rdev         integer      default 0,
    links        integer      default 0,
    inuse        integer      default 0,
    size         bigint       default 0,
    mtime        timestamp,
    ctime        timestamp,
    atime        timestamp
)
END
}

sub _path_table_def {
    return <<END;
create table path (
    inode        integer      not null,
    name         varchar(255) not null,
    parent       integer
);
    create unique index ipath on path (parent,name)
END
}

sub _extents_table_def {
    return <<END;
create table extents (
    inode        integer,
    block        integer,
    contents     bytea
);
    create unique index iblock on extents (inode,block)
END
}

sub _xattr_table_def {
    my $self = shift;
    return <<END;
create table xattr (
    inode integer,
    name  varchar(255),
    value varchar(65536)
    );
    create unique index ixattr on xattr (inode,name)
END
}

sub _get_unix_timestamp_sql {
    my $self  = shift;
    my $field = shift;
    return "extract(epoch from $field)";
}

sub _now_sql {
    return "'now'";
}

sub _update_utime_sql {
    return "update metadata set atime=to_timestamp(?),mtime=to_timestamp(?) where inode=?";
}

sub _write_blocks {
    my $self = shift;
    my ($inode,$blocks,$blksize) = @_;

    my $dbh = $self->dbh;
    my ($size) = $dbh->selectrow_array("select size from metadata where inode=$inode");
    my $hwm      = $size;  # high water mark ;-)

    eval {
	$dbh->begin_work;
	my $insert = $dbh->prepare_cached(<<END) or die $dbh->errstr;
insert into extents (inode,block,contents) values (?,?,?)
END
;
	$insert->bind_param(3,undef,{pg_type=>PG_BYTEA});

	my $update = $dbh->prepare_cached(<<END) or die $dbh->errstr;
update extents set contents=? where inode=? and block=?
END
;
	$update->bind_param(1,undef,{pg_type=>PG_BYTEA});


	for my $block (keys %$blocks) {
	    my $data = $blocks->{$block};
	    $update->execute($data,$inode,$block);
	    $insert->execute($inode,$block,$data) unless $update->rows;
	    my $a   = $block * $blksize + length($data);
	    $hwm    = $a if $a > $hwm;
	}
	$insert->finish;
	$update->finish;
	my $now = $self->_now_sql;
	$dbh->do("update metadata set size=$hwm,mtime=$now where inode=$inode");
	$dbh->commit();
    };

    if ($@) {
	my $msg = $@;
	eval{$dbh->rollback()};
	warn $msg;
	die "write failed with $msg";
    }

    1;
}

sub last_inserted_inode {
    my $self = shift;
    my $dbh  = shift;
    return $dbh->last_insert_id(undef,undef,'metadata','inode');
}

sub set_schema_version {
    my $self = shift;
    my $version = shift;
    my $dbh = $self->dbh;
    eval {
	$dbh->begin_work;
	$dbh->do("delete from sqlfs_vars where name='schema_version'");
	$dbh->do("insert into sqlfs_vars (name,value) values ('schema_version','$version')");
	$dbh->commit;
    };
    if ($@) {
	warn $@;
	eval {$dbh->rollback()};
    }
}

sub setxattr {
    my $self = shift;
    my ($path,$xname,$xval,$xflags) = @_;
    if (!$xflags) {
	my $inode = $self->path2inode($path);
	my $dbh = $self->dbh;
	eval {
	    my $name = $dbh->quote($xname);
	    $dbh->begin_work;
	    $dbh->do("delete from xattr where name=$name and inode=$inode");
	    my $sth = $dbh->prepare_cached("insert into xattr(inode,name,value) values (?,?,?)");
	    $sth->execute($inode,$xname,$xval);
	    $sth->finish;
	    $dbh->commit();
	};
	if ($@) {
	    my $msg = $@;
	    eval {$dbh->rollback()};
	    die "could not update attribute because: $msg";
	}
	return 0;
    }

    # we get here when we have a definite insert/update operation to perform
    return $self->SUPER::setxattr(@_);
}

sub _update_schema_from_1_to_2 {
    my $self = shift;
    my $dbh  = $self->dbh;
    $dbh->do('alter table metadata rename column length to size');
    $dbh->do($self->_variables_table_def);
}
1;