@@ -1,3 +1,29 @@
+2014-12-15: Release version 2.72
+
+ * Work with DBD::SQLite's latest lock errors (dormando <dormando@rydia.net>)
+
+ * remove update_host_property (Eric Wong <e@80x24.org>)
+
+ * remove users of unreachable_fids table (Eric Wong <e@80x24.org>)
+
+ * monitor: batch MySQL device table updates (Eric Wong <normalperson@yhbt.net>)
+
+ * monitor: defer DB updates until all HTTP requests are done (Eric Wong <normalperson@yhbt.net>)
+
+ * connection/poolable: defer expiry of timed out connections (Eric Wong <e@80x24.org>)
+
+ * connection/poolable: disable watch_write before retrying write (Eric Wong <normalperson@yhbt.net>)
+
+ * connection/poolable: do not write before event_write (Eric Wong <normalperson@yhbt.net>)
+
+ * add conn_pool_size configuration option (Eric Wong <normalperson@yhbt.net>)
+
+ * enable TCP keepalives for iostat watcher sockets (Eric Wong <normalperson@yhbt.net>)
+
+ * host: add "readonly" state to override device "alive" state (Eric Wong <normalperson@yhbt.net>)
+
+ * add LICENSE file to distro (dormando <dormando@rydia.net>)
+
2013-08-18: Release version 2.70
* This release features a very large rewrite to the Monitor worker to run
@@ -0,0 +1 @@
+License granted to use/distribute under the same terms as Perl itself.
@@ -1,4 +1,5 @@
CHANGES
+LICENSE
examples/testapp/testapp-perlbal.conf
examples/testapp/testapp.psgi
examples/testapp/README
@@ -4,7 +4,7 @@
"Brad Fitzpatrick <brad@danga.com>"
],
"dynamic_config" : 1,
- "generated_by" : "ExtUtils::MakeMaker version 6.62, CPAN::Meta::Converter version 2.112621",
+ "generated_by" : "ExtUtils::MakeMaker version 6.76, CPAN::Meta::Converter version 2.132510",
"license" : [
"unknown"
],
@@ -29,28 +29,28 @@
"prereqs" : {
"build" : {
"requires" : {
- "ExtUtils::MakeMaker" : 0
+ "ExtUtils::MakeMaker" : "0"
}
},
"configure" : {
"requires" : {
- "ExtUtils::MakeMaker" : 0
+ "ExtUtils::MakeMaker" : "0"
}
},
"runtime" : {
"requires" : {
- "DBI" : 0,
+ "DBI" : "0",
"Danga::Socket" : "1.56",
- "Getopt::Long" : 0,
- "IO::AIO" : 0,
- "MogileFS::Client" : 0,
- "Net::Netmask" : 0,
+ "Getopt::Long" : "0",
+ "IO::AIO" : "0",
+ "MogileFS::Client" : "0",
+ "Net::Netmask" : "0",
"Perlbal" : "1.79",
- "Symbol" : 0,
+ "Symbol" : "0",
"Sys::Syscall" : "0.22",
- "Sys::Syslog" : 0,
+ "Sys::Syslog" : "0",
"Test::More" : "0.88",
- "fields" : 0
+ "fields" : "0"
}
}
},
@@ -65,5 +65,5 @@
},
"x_MailingList" : "http://groups.google.com/group/mogile"
},
- "version" : "2.70"
+ "version" : "2.72"
}
@@ -7,7 +7,7 @@ build_requires:
configure_requires:
ExtUtils::MakeMaker: 0
dynamic_config: 1
-generated_by: 'ExtUtils::MakeMaker version 6.62, CPAN::Meta::Converter version 2.112621'
+generated_by: 'ExtUtils::MakeMaker version 6.76, CPAN::Meta::Converter version 2.132510'
license: unknown
meta-spec:
url: http://module-build.sourceforge.net/META-spec-v1.4.html
@@ -37,8 +37,8 @@ requires:
Test::More: 0.88
fields: 0
resources:
+ MailingList: http://groups.google.com/group/mogile
bugtracker: http://code.google.com/p/mogilefs/issues/list
homepage: http://www.mogilefs.org
repository: git://github.com/mogilefs/MogileFS-Server.git
- x_MailingList: http://groups.google.com/group/mogile
-version: 2.70
+version: 2.72
@@ -68,6 +68,7 @@ our (
$max_disk_age,
$node_timeout, # time in seconds to wait for storage node responses
$conn_timeout, # time in seconds to wait for connection to storage node
+ $conn_pool_size, # size of the HTTP connection pool
$pidfile,
$repl_use_get_port,
$local_network,
@@ -99,6 +100,7 @@ sub load_config {
'default_mindevcount=i' => \$cmdline{default_mindevcount},
'node_timeout=i' => \$cmdline{node_timeout},
'conn_timeout=i' => \$cmdline{conn_timeout},
+ 'conn_pool_size=i' => \$cmdline{conn_pool_size},
'max_handles=i' => \$cmdline{max_handles},
'pidfile=s' => \$cmdline{pidfile},
'no_schema_check' => \$cmdline{no_schema_check},
@@ -168,6 +170,7 @@ sub load_config {
choose_value( 'default_mindevcount', 2 );
$node_timeout = choose_value( 'node_timeout', 2 );
$conn_timeout = choose_value( 'conn_timeout', 2 );
+ $conn_pool_size = choose_value( 'conn_pool_size', 20 );
choose_value( 'rebalance_ignore_missing', 0 );
$repl_use_get_port = choose_value( 'repl_use_get_port', 0 );
@@ -13,6 +13,7 @@ use fields (
'mfs_expire_cb', # Danga::Socket::Timer callback
'mfs_requests', # number of requests made on this object
'mfs_err', # used to propagate an error to start()
+ 'mfs_writeq', # arrayref if connecting, undef otherwise
);
use Socket qw(SO_KEEPALIVE);
use Time::HiRes;
@@ -27,6 +28,9 @@ sub new {
$self->{mfs_hostport} = [ $ip, $port ];
$self->{mfs_requests} = 0;
+ # newly-created socket, we buffer writes until event_write is triggered
+ $self->{mfs_writeq} = [];
+
return $self;
}
@@ -53,6 +57,38 @@ sub mark_idle {
$self->{mfs_requests}++;
}
+sub write {
+ my ($self, $arg) = @_;
+ my $writeq = $self->{mfs_writeq};
+
+ if (ref($writeq) eq "ARRAY") {
+ # if we're still connecting, we must buffer explicitly for *BSD
+ # and not attempt a real write() until event_write is triggered
+ push @$writeq, $arg;
+ $self->watch_write(1); # enable event_write triggering
+ 0; # match Danga::Socket::write return value
+ } else {
+ $self->SUPER::write($arg);
+ }
+}
+
+# Danga::Socket will trigger this when a socket is writable
+sub event_write {
+ my ($self) = @_;
+
+ # we may have buffered writes in mfs_writeq during non-blocking connect(),
+ # this is needed on *BSD but unnecessary (but harmless) on Linux.
+ my $writeq = delete $self->{mfs_writeq};
+ if ($writeq) {
+ $self->watch_write(0); # ->write will re-enable if needed
+ foreach my $queued (@$writeq) {
+ $self->write($queued);
+ }
+ } else {
+ $self->SUPER::event_write();
+ }
+}
+
# the request running on this connection is retryable if this socket
# has ever been marked idle. The connection pool can never be 100%
# reliable for detecting dead sockets, and all HTTP requests made by
@@ -69,6 +105,7 @@ sub set_timeout {
my ($self, $timeout_key) = @_;
my $mfs_pool = $self->{mfs_pool};
+ $self->SetPostLoopCallback(undef);
if ($timeout_key) {
my $timeout;
@@ -108,7 +145,7 @@ sub expired {
if ($now >= $expire) {
my $expire_cb = delete $self->{mfs_expire_cb};
if ($expire_cb && $self->sock) {
- $expire_cb->($now);
+ $self->SetPostLoopCallback(sub { $expire_cb->($now); 1 });
}
return 1;
}
@@ -122,12 +122,12 @@ sub dstate {
}
sub can_delete_from {
- return $_[0]->dstate->can_delete_from;
+ return $_[0]->host->alive && $_[0]->dstate->can_delete_from;
}
# this method is for Monitor, other workers should use should_read_from
sub can_read_from {
- return $_[0]->host->alive && $_[0]->dstate->can_read_from;
+ return $_[0]->host->should_read_from && $_[0]->dstate->can_read_from;
}
# this is the only method a worker should call for checking for readability
@@ -151,12 +151,6 @@ sub enqueue_for_replication {
Mgd::get_store()->enqueue_for_replication($self->id, $from_devid, $in);
}
-sub mark_unreachable {
- my $self = shift;
- # update database table
- Mgd::get_store()->mark_fidid_unreachable($self->id);
-}
-
sub delete {
my $fid = shift;
my $sto = Mgd::get_store();
@@ -36,7 +36,7 @@ sub new_from_args {
sub valid_state {
my ($class, $state) = @_;
- return $state && $state =~ /\A(?:alive|dead|down)\z/;
+ return $state && $state =~ /\A(?:alive|dead|down|readonly)\z/;
}
# Instance methods:
@@ -77,6 +77,14 @@ sub alive {
return $_[0]->status eq 'alive';
}
+sub readonly {
+ return $_[0]->status eq 'readonly';
+}
+
+sub should_read_from {
+ return $_[0]->alive || $_[0]->readonly;
+}
+
sub observed_reachable {
my $self = shift;
return $self->{observed_state} && $self->{observed_state} eq 'reachable';
@@ -154,8 +162,11 @@ sub http {
# FIXME - make these customizable
sub _init_pools {
return if $http_pool;
+ my $opts = {
+ total_capacity => MogileFS->config("conn_pool_size"),
+ };
- $http_pool = MogileFS::ConnectionPool->new("MogileFS::Connection::HTTP");
+ $http_pool = MogileFS::ConnectionPool->new("MogileFS::Connection::HTTP", $opts);
}
1;
@@ -97,6 +97,7 @@ sub got_disconnect {
# Support class that does the communication with individual hosts.
package MogileFS::IOStatWatch::Client;
+use Socket qw(SO_KEEPALIVE);
use strict;
use warnings;
@@ -117,6 +118,7 @@ sub new {
);
return unless $sock;
+ $sock->sockopt(SO_KEEPALIVE, 1);
$self = fields::new($self) unless ref $self;
$self->SUPER::new($sock);
$self->watch_write(1);
@@ -2,7 +2,7 @@ package MogileFS::Server;
use strict;
use warnings;
use vars qw($VERSION);
-$VERSION = "2.70";
+$VERSION = "2.72";
=head1 NAME
@@ -396,6 +396,13 @@ sub upgrade_modify_device_size {
}
}
+sub upgrade_add_host_readonly {
+ my $self = shift;
+ unless ($self->column_type("host", "status") =~ /\breadonly\b/) {
+ $self->dowell("ALTER TABLE host MODIFY COLUMN status ENUM('alive', 'dead', 'down', 'readonly')");
+ }
+}
+
sub pre_daemonize_checks {
my $self = shift;
# Jay Buffington, from the mailing lists, writes:
@@ -437,6 +444,31 @@ sub get_keys_like_operator {
return $bool ? "LIKE /*! BINARY */" : "LIKE";
}
+sub update_device_usages {
+ my ($self, $updates, $cb) = @_;
+ $cb->();
+ my $chunk = 10000; # in case we hit max_allowed_packet size(!)
+ while (scalar @$updates) {
+ my @cur = splice(@$updates, 0, $chunk);
+ my @set;
+ foreach my $fld (qw(mb_total mb_used mb_asof)) {
+ my $s = "$fld = CASE devid\n";
+ foreach my $upd (@cur) {
+ my $devid = $upd->{devid};
+ defined($devid) or croak("devid not set\n");
+ my $val = $upd->{$fld};
+ defined($val) or croak("$fld not defined for $devid\n");
+ $s .= "WHEN $devid THEN $val\n";
+ }
+ $s .= "ELSE $fld END";
+ push @set, $s;
+ }
+ my $sql = "UPDATE device SET ". join(",\n", @set);
+ $self->dowell($sql);
+ $cb->();
+ }
+}
+
1;
__END__
@@ -292,6 +292,13 @@ sub upgrade_add_device_drain {
}
}
+sub upgrade_add_host_readonly {
+ my $self = shift;
+ unless ($self->column_constraint("host", "status") =~ /\breadonly\b/) {
+ $self->dowell("ALTER TABLE host MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly'))");
+ }
+}
+
sub upgrade_modify_server_settings_value {
my $self = shift;
unless ($self->column_type("server_settings", "value" =~ /text/i)) {
@@ -694,20 +701,6 @@ sub create_device {
return 1;
}
-sub mark_fidid_unreachable {
- my ($self, $fidid) = @_;
- my $dbh = $self->dbh;
-
- eval {
- $self->insert_or_update(
- insert => "INSERT INTO unreachable_fids (fid, lastupdate) VALUES (?, ".$self->unix_timestamp.")",
- insert_vals => [ $fidid ],
- update => "UPDATE unreachable_fids SET lastupdate = ".$self->unix_timestamp." WHERE field = ?",
- update_vals => [ $fidid ],
- );
- };
-}
-
sub replace_into_file {
my $self = shift;
my %arg = $self->_valid_params([qw(fidid dmid key length classid devcount)], @_);
@@ -174,6 +174,7 @@ sub was_duplicate_error {
my $errstr = $dbh->errstr;
return 1 if $errstr =~ /(?:is|are) not unique/i;
return 1 if $errstr =~ /must be unique/i;
+ return 1 if $errstr =~ /UNIQUE constraint failed/i;
return 0;
}
@@ -363,6 +364,7 @@ sub upgrade_add_device_drain {
sub upgrade_modify_server_settings_value { 1 }
sub upgrade_add_file_to_queue_arg { 1 }
sub upgrade_modify_device_size { 1 }
+sub upgrade_add_host_readonly { 1 }
sub BLOB_BIND_TYPE { SQL_BLOB }
@@ -20,7 +20,8 @@ use List::Util qw(shuffle);
# also adds a TEXT 'arg' column to file_to_queue for passing arguments
# 14: modifies 'device' mb_total, mb_used to INT for devs > 16TB
# 15: adds checksum table, adds 'hashtype' column to 'class' table
-use constant SCHEMA_VERSION => 15;
+# 16: adds 'readonly' state to enum in host table
+use constant SCHEMA_VERSION => 16;
sub new {
my ($class) = @_;
@@ -1218,14 +1219,26 @@ sub update_device {
sub update_device_usage {
my $self = shift;
- my %arg = $self->_valid_params([qw(mb_total mb_used devid)], @_);
+ my %arg = $self->_valid_params([qw(mb_total mb_used devid mb_asof)], @_);
eval {
- $self->dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = " . $self->unix_timestamp .
- " WHERE devid = ?", undef, $arg{mb_total}, $arg{mb_used}, $arg{devid});
+ $self->dbh->do("UPDATE device SET ".
+ "mb_total = ?, mb_used = ?, mb_asof = ?" .
+ " WHERE devid = ?",
+ undef, $arg{mb_total}, $arg{mb_used}, $arg{mb_asof},
+ $arg{devid});
};
$self->condthrow;
}
+# MySQL has an optimized version
+sub update_device_usages {
+ my ($self, $updates, $cb) = @_;
+ foreach my $upd (@$updates) {
+ $self->update_device_usage(%$upd);
+ $cb->();
+ }
+}
+
# This is unimplemented at the moment as we must verify:
# - no file_on rows exist
# - nothing in file_to_queue is going to attempt to use it
@@ -1239,13 +1252,6 @@ sub delete_device {
die "Unimplemented; needs further testing";
}
-sub mark_fidid_unreachable {
- my ($self, $fidid) = @_;
- die "Your database does not support REPLACE! Reimplement mark_fidid_unreachable!" unless $self->can_replace;
- $self->dbh->do("REPLACE INTO unreachable_fids VALUES (?, " . $self->unix_timestamp . ")",
- undef, $fidid);
-}
-
sub set_device_weight {
my ($self, $devid, $weight) = @_;
eval {
@@ -1665,14 +1671,6 @@ sub update_host {
return 1;
}
-sub update_host_property {
- my ($self, $hostid, $col, $val) = @_;
- $self->conddup(sub {
- $self->dbh->do("UPDATE host SET $col=? WHERE hostid=?", undef, $val, $hostid);
- });
- return 1;
-}
-
# return ne hostid, or throw 'dup' on error.
# NOTE: you need to put them into the initial 'down' state.
sub create_host {
@@ -20,6 +20,7 @@ use fields (
'db_monitor_ran', # We announce "monitor_just_ran" every time the
# device checks are run, but only if the DB has
# been checked inbetween.
+ 'devs_to_update' # device table update queue
);
use Danga::Socket 1.56;
@@ -105,6 +106,7 @@ sub usage_refresh {
# Fetch the freshlist list of entries, to avoid excessive writes.
$self->{updateable_devices} = { map { $_->{devid} => $_ }
Mgd::get_store()->get_all_devices };
+ $self->{devs_to_update} = [];
} else {
$self->{updateable_devices} = undef;
}
@@ -133,11 +135,6 @@ sub usage_refresh {
sub usage_refresh_done {
my ($self) = @_;
- if ($self->{updateable_devices}) {
- Mgd::get_store()->release_lock('mgfs:device_update');
- $self->{updateable_devices} = undef;
- }
-
$self->{devutil}->{prev} = $self->{devutil}->{tmp};
# Set the IOWatcher hosts (once old monitor code has been disabled)
@@ -179,6 +176,14 @@ sub usage_refresh_done {
$self->send_to_parent(":monitor_just_ran");
}
}
+
+ if ($self->{updateable_devices}) {
+ my $sto = Mgd::get_store();
+ my $updates = delete $self->{devs_to_update};
+ $sto->update_device_usages($updates, sub { $self->still_alive });
+ $sto->release_lock('mgfs:device_update');
+ $self->{updateable_devices} = undef;
+ }
}
sub work {
@@ -410,10 +415,13 @@ sub check_usage_response {
if ($self->{updateable_devices}) {
my $devrow = $self->{updateable_devices}->{$devid};
my $last = ($devrow && $devrow->{mb_asof}) ? $devrow->{mb_asof} : 0;
- if ($last + UPDATE_DB_EVERY < time()) {
- Mgd::get_store()->update_device_usage(mb_total => int($total / 1024),
- mb_used => int($used / 1024),
- devid => $devid);
+ my $now = time();
+ if ($last + UPDATE_DB_EVERY < $now) {
+ my %upd = (mb_total => int($total / 1024),
+ mb_used => int($used / 1024),
+ mb_asof => $now,
+ devid => $devid);
+ push @{$self->{devs_to_update}}, \%upd;
}
}
return 1;
@@ -62,8 +62,13 @@ observed_state => 'writeable'});
ok(!$dev->can_read_from, "can_read_from for device fails when host is $s");
ok(!$dev->should_read_from, "device should not be readable when host is $s");
}
+ $host->{status} = "readonly";
+ ok($dev->can_read_from, "device is readable from again");
+ ok(! $dev->should_get_new_files, "device should not get new files");
+
$host->{status} = "alive";
ok($dev->can_read_from, "device is readable from again");
+ ok($dev->should_get_new_files, "device should get new files again");
}
# first ensure device status is respected
@@ -32,11 +32,18 @@ my $host_args = {
http_get_port => $http_get->sockport,
};
my $host = MogileFS::Host->new_from_args($host_args);
+
+# required, defaults to 20 in normal server
+MogileFS::Config->set_config("conn_pool_size", 13);
+
MogileFS::Host->_init_pools;
+
my $idle_pool = $MogileFS::Host::http_pool->{idle};
is("MogileFS::Host", ref($host), "host created");
MogileFS::Config->set_config("node_timeout", 1);
+is(13, $MogileFS::Host::http_pool->{total_capacity}, "conn_pool_size took effect");
+
# hit the http_get_port
{
my $resp;