package MogileFS::Store::MySQL;
use strict;
use warnings;
use DBI 1.44;
use DBD::mysql;
use MogileFS::Util qw(throw);
use base 'MogileFS::Store';
# --------------------------------------------------------------------------
# Package methods we override
# --------------------------------------------------------------------------
sub dsn_of_dbhost {
my ($class, $dbname, $host, $port) = @_;
return "DBI:mysql:$dbname;host=$host" . ($port ? ";port=$port" : "");
}
sub dsn_of_root {
my ($class, $dbname, $host, $port) = @_;
return $class->dsn_of_dbhost('mysql', $host, $port);
}
# --------------------------------------------------------------------------
# Store-related things we override
# --------------------------------------------------------------------------
sub init {
my $self = shift;
$self->SUPER::init;
$self->{lock_depth} = 0;
$self->{slave_next_check} = 0;
}
sub post_dbi_connect {
my $self = shift;
$self->SUPER::post_dbi_connect;
$self->{lock_depth} = 0;
}
sub was_deadlock_error {
my $self = shift;
my $dbh = $self->dbh;
return 0 unless $dbh->err;
# 1205 is "lock wait timeout", but we should bomb out if we've
# alerady hung for that long.
return 1 if ($dbh->err == 1213);
}
sub was_duplicate_error {
my $self = shift;
my $dbh = $self->dbh;
return 0 unless $dbh->err;
return 1 if $dbh->err == 1062 || $dbh->errstr =~ /duplicate/i;
}
sub table_exists {
my ($self, $table) = @_;
return eval {
my $sth = $self->dbh->prepare("DESCRIBE $table");
$sth->execute;
my $rec = $sth->fetchrow_hashref;
return $rec ? 1 : 0;
};
}
sub can_replace { 1 }
sub can_insertignore { 1 }
sub can_insert_multi { 1 }
sub unix_timestamp { "UNIX_TIMESTAMP()" }
sub filter_create_sql {
my ($self, $sql) = @_;
return $sql unless $self->fid_type eq "BIGINT";
$sql =~ s!\bfid\s+INT\b!fid BIGINT!i;
return $sql;
}
sub can_do_slaves { 1 }
sub check_slave {
my $self = shift;
return 0 unless $self->{slave};
my $next_check = \$self->{slave}->{next_check};
if ($$next_check > time()) {
return 1;
}
#my $slave_status = eval { $self->{slave}->dbh->selectrow_hashref("SHOW SLAVE STATUS") };
#warn "Error thrown: '$@' while trying to get slave status." if $@;
# TODO: Check show slave status *unless* a server setting is present to
# tell us to ignore it (like in a multi-DC setup).
eval { $self->{slave}->dbh };
if ($@) {
warn "Error while checking slave: $@";
return 0;
}
# call time() again here because SQL blocks.
$$next_check = time() + 5;
return 1;
}
# attempt to grab a lock of lockname, and timeout after timeout seconds.
# returns 1 on success and 0 on timeout
sub get_lock {
my ($self, $lockname, $timeout) = @_;
die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth};
my $lock = $self->dbh->selectrow_array("SELECT GET_LOCK(?, ?)", undef, $lockname, $timeout);
if ($lock) {
$self->{lock_depth} = 1;
$self->{last_lock} = $lockname;
}
return $lock;
}
# attempt to release a lock of lockname.
# returns 1 on success and 0 if no lock we have has that name.
sub release_lock {
my ($self, $lockname) = @_;
my $rv = $self->dbh->selectrow_array("SELECT RELEASE_LOCK(?)", undef, $lockname);
$self->{lock_depth} = 0;
return $rv;
}
sub lock_queue {
my ($self, $type) = @_;
my $lock = $self->get_lock('mfsd:' . $type, 30);
return $lock ? 1 : 0;
}
sub unlock_queue {
my ($self, $type) = @_;
my $lock = $self->release_lock('mfsd:' . $type);
return $lock ? 1 : 0;
}
# clears everything from the fsck_log table
# return 1 on success. die otherwise.
# Under MySQL 4.1+ this is actually fast.
sub clear_fsck_log {
my $self = shift;
$self->dbh->do("TRUNCATE TABLE fsck_log");
return 1;
}
# --------------------------------------------------------------------------
# Functions specific to Store::MySQL subclass. Not in parent.
# --------------------------------------------------------------------------
sub fid_type {
my $self = shift;
return $self->{_fid_type} if $self->{_fid_type};
# let people force bigint mode with environment.
if ($ENV{MOG_FIDSIZE} && $ENV{MOG_FIDSIZE} eq "big") {
return $self->{_fid_type} = "BIGINT";
}
# else, check a maybe-existing table and see if we're in bigint
# mode already.
my $dbh = $self->dbh;
my @create = eval { $dbh->selectrow_array("SHOW CREATE TABLE file") };
if (@create && $create[0] eq 'file') {
if ($create[1] =~ /\bfid\b.+\bbigint\b/i) {
return $self->{_fid_type} = "BIGINT";
} else {
return $self->{_fid_type} = "INT";
}
}
# Used to default to 32bit ints, but this always bites people
# a few years down the road. So default to 64bit.
return $self->{_fid_type} = "BIGINT";
}
sub column_type {
my ($self, $table, $col) = @_;
my $sth = $self->dbh->prepare("DESCRIBE $table");
$sth->execute;
while (my $rec = $sth->fetchrow_hashref) {
if ($rec->{Field} eq $col) {
$sth->finish;
return $rec->{Type};
}
}
return undef;
}
# --------------------------------------------------------------------------
# Test suite things we override
# --------------------------------------------------------------------------
sub new_temp {
my $self = shift;
my %args = @_;
my $dbname = $args{dbname} || "tmp_mogiletest";
my $host = $args{dbhost} || 'localhost';
my $port = $args{dbport} || 3306;
my $user = $args{dbuser} || 'root';
my $pass = $args{dbpass} || '';
my $rootuser = $args{dbrootuser} || $args{dbuser} || 'root';
my $rootpass = $args{dbrootpass} || $args{dbpass} || '';
my $sto =
MogileFS::Store->new_from_dsn_user_pass("DBI:mysql:database=$dbname;host=$host;port=$port",
$rootuser, $rootpass);
my $dbh = $sto->dbh;
_create_mysql_db($dbh, $dbname);
# allow MyISAM in the test suite.
$ENV{USE_UNSAFE_MYSQL} = 1 unless defined $ENV{USE_UNSAFE_MYSQL};
my @args = ("$FindBin::Bin/../mogdbsetup", "--yes",
"--dbname=$dbname", "--type=MySQL",
"--dbhost=$host", "--dbport=$port",
"--dbrootuser=$rootuser",
"--dbuser=$user", );
push @args, "--dbpass=$pass" unless $pass eq '';
push @args, "--dbrootpass=$rootpass" unless $rootpass eq '';
system(@args)
and die "Failed to run mogdbsetup (".join(' ',map { "'".$_."'" } @args).").";
if($user ne $rootuser) {
$sto = MogileFS::Store->new_from_dsn_user_pass(
"DBI:mysql:database=$dbname;host=$host;port=$port",
$user, $pass);
$dbh = $sto->dbh;
}
$dbh->do("use $dbname");
return $sto;
}
sub _create_mysql_db {
my $dbh = shift;
my $dbname = shift;
_drop_mysql_db($dbh, $dbname);
$dbh->do("CREATE DATABASE $dbname");
}
sub _drop_mysql_db {
my $dbh = shift;
my $dbname = shift;
$dbh->do("DROP DATABASE IF EXISTS $dbname");
}
# --------------------------------------------------------------------------
# Database creation time things we override
# --------------------------------------------------------------------------
sub create_table {
my $self = shift;
my ($table) = @_;
my $dbh = $self->dbh;
my $errmsg =
"InnoDB backend is unavailable for use, force creation of tables " .
"by setting USE_UNSAFE_MYSQL=1 in your environment and run this " .
"command again.";
unless ($ENV{USE_UNSAFE_MYSQL}) {
my $engines = eval { $dbh->selectall_hashref("SHOW ENGINES", "Engine"); };
if ($@ && $dbh->err == 1064) {
# syntax error? for MySQL 4.0.x.
# who cares. we'll catch it below on the double-check.
} else {
die $errmsg
unless ($engines->{InnoDB} and
$engines->{InnoDB}->{Support} =~ m/^(YES|DEFAULT)$/i);
}
}
my $existed = $self->table_exists($table);
$self->SUPER::create_table(@_);
return if $ENV{USE_UNSAFE_MYSQL};
# don't alter an existing table up to InnoDB from MyISAM...
# could be costly. but on new tables, no problem...
unless ($existed) {
$dbh->do("ALTER TABLE $table ENGINE=InnoDB");
warn "DBI reported an error of: '" . $dbh->errstr . "' when trying to " .
"alter table type of $table to InnoDB\n" if $dbh->err;
}
# but in any case, let's see if it's already InnoDB or not.
my $table_status = $dbh->selectrow_hashref("SHOW TABLE STATUS LIKE '$table'");
# if not, either die or warn.
unless (($table_status->{Engine} || $table_status->{Type} || "") eq "InnoDB") {
if ($existed) {
warn "WARNING: MySQL table that isn't InnoDB: $table\n";
} else {
die "MySQL didn't change table type to InnoDB as requested.\n\n$errmsg"
}
}
}
# --------------------------------------------------------------------------
# Data-access things we override
# --------------------------------------------------------------------------
# update the device count for a given fidid
sub update_devcount_atomic {
my ($self, $fidid) = @_;
my $lockname = "mgfs:fid:$fidid";
my $lock = eval { $self->get_lock($lockname, 10) };
# Check to make sure the lock didn't timeout, then we want to bail.
return 0 if defined $lock && $lock == 0;
# Checking $@ is pointless for the time because we just want to plow ahead
# even if the get_lock trapped a recursion and threw a fatal error.
$self->update_devcount($fidid);
# Don't release the lock if we never got it.
$self->release_lock($lockname) if $lock;
return 1;
}
sub upgrade_add_host_getport {
my $self = shift;
# see if they have the get port, else update it
unless ($self->column_type("host", "http_get_port")) {
$self->dowell("ALTER TABLE host ADD COLUMN http_get_port MEDIUMINT UNSIGNED AFTER http_port");
}
}
sub upgrade_add_host_altip {
my $self = shift;
unless ($self->column_type("host", "altip")) {
$self->dowell("ALTER TABLE host ADD COLUMN altip VARCHAR(15) AFTER hostip");
$self->dowell("ALTER TABLE host ADD COLUMN altmask VARCHAR(18) AFTER altip");
$self->dowell("ALTER TABLE host ADD UNIQUE altip (altip)");
}
}
sub upgrade_add_device_asof {
my $self = shift;
unless ($self->column_type("device", "mb_asof")) {
$self->dowell("ALTER TABLE device ADD COLUMN mb_asof INT(10) UNSIGNED AFTER mb_used");
}
}
sub upgrade_add_device_weight {
my $self = shift;
unless ($self->column_type("device", "weight")) {
$self->dowell("ALTER TABLE device ADD COLUMN weight MEDIUMINT DEFAULT 100 AFTER status");
}
}
sub upgrade_add_device_readonly {
my $self = shift;
unless ($self->column_type("device", "status") =~ /readonly/) {
$self->dowell("ALTER TABLE device MODIFY COLUMN status ENUM('alive', 'dead', 'down', 'readonly')");
}
}
sub upgrade_add_device_drain {
my $self = shift;
unless ($self->column_type("device", "status") =~ /drain/) {
$self->dowell("ALTER TABLE device MODIFY COLUMN status ENUM('alive', 'dead', 'down', 'readonly', 'drain')");
}
}
sub upgrade_modify_server_settings_value {
my $self = shift;
unless ($self->column_type("server_settings", "value") =~ /text/i) {
$self->dowell("ALTER TABLE server_settings MODIFY COLUMN value TEXT");
}
}
sub upgrade_add_file_to_queue_arg {
my $self = shift;
unless ($self->column_type("file_to_queue", "arg")) {
$self->dowell("ALTER TABLE file_to_queue ADD COLUMN arg TEXT");
}
}
sub upgrade_modify_device_size {
my $self = shift;
for my $col ('mb_total', 'mb_used') {
if ($self->column_type("device", $col) =~ m/mediumint/i) {
$self->dowell("ALTER TABLE device MODIFY COLUMN $col INT UNSIGNED");
}
}
}
sub upgrade_add_host_readonly {
my $self = shift;
unless ($self->column_type("host", "status") =~ /\breadonly\b/) {
$self->dowell("ALTER TABLE host MODIFY COLUMN status ENUM('alive', 'dead', 'down', 'readonly')");
}
}
sub pre_daemonize_checks {
my $self = shift;
# Jay Buffington, from the mailing lists, writes:
# > > Is your DBI version at least 1.43? The Makefile.PL of DBD::mysql shows
# > > that code for last_insert_it is compiled in only if DBD::mysql is built
# > > with DBI 1.43 or newer.
#> Yes, I have 1.53.
#> jay@webdev:~$ perl -MDBI -le 'print $DBI::VERSION'
#> 1.53
#>
#> BUT I just re-installed 2.9006 while researching this and my test
#> script started working. I just reran the mogile server test suite and
#> all test passed!
#>
#> Problem solved!
#>
#> The original DBD::mysql 2.9006 was installed from a RPM. I bet that
#> it was built against a DBI older than 1.43, so it didn't support
#> LAST_INSERT_ID.
# So...
# since we don't know what version of DBI their DBD::mysql was built against,
# let's just test that last_insert_id works.
my $id = eval {
$self->register_tempfile(dmid => 99,
key => "_server_startup_test");
};
unless ($id) {
die "MySQL self-tests failed. Your DBD::mysql might've been built against an old DBI version.\n";
}
return $self->SUPER::pre_daemonize_checks();
}
sub get_keys_like_operator {
my $bool = MogileFS::Config->server_setting_cached('case_sensitive_list_keys');
return $bool ? "LIKE /*! BINARY */" : "LIKE";
}
sub update_device_usages {
my ($self, $updates, $cb) = @_;
$cb->();
my $chunk = 10000; # in case we hit max_allowed_packet size(!)
while (scalar @$updates) {
my @cur = splice(@$updates, 0, $chunk);
my @set;
foreach my $fld (qw(mb_total mb_used mb_asof)) {
my $s = "$fld = CASE devid\n";
foreach my $upd (@cur) {
my $devid = $upd->{devid};
defined($devid) or croak("devid not set\n");
my $val = $upd->{$fld};
defined($val) or croak("$fld not defined for $devid\n");
$s .= "WHEN $devid THEN $val\n";
}
$s .= "ELSE $fld END";
push @set, $s;
}
my $sql = "UPDATE device SET ". join(",\n", @set);
$self->dowell($sql);
$cb->();
}
}
1;
__END__
=head1 NAME
MogileFS::Store::MySQL - MySQL data storage for MogileFS
=head1 SEE ALSO
L<MogileFS::Store>