The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
CHANGES 026
LICENSE 01
MANIFEST 01
META.json 1212
META.yml 33
lib/MogileFS/Config.pm 03
lib/MogileFS/Connection/Poolable.pm 138
lib/MogileFS/Device.pm 22
lib/MogileFS/FID.pm 60
lib/MogileFS/Host.pm 213
lib/MogileFS/IOStatWatcher.pm 02
lib/MogileFS/Server.pm 11
lib/MogileFS/Store/MySQL.pm 032
lib/MogileFS/Store/Postgres.pm 147
lib/MogileFS/Store/SQLite.pm 02
lib/MogileFS/Store.pm 1917
lib/MogileFS/Worker/Monitor.pm 917
t/02-host-device.t 05
t/http.t 07
19 files changed (This is a version diff) 69189
@@ -1,3 +1,29 @@
+2014-12-15: Release version 2.72
+
+   * Work with DBD::SQLite's latest lock errors (dormando <dormando@rydia.net>)
+
+   * remove update_host_property (Eric Wong <e@80x24.org>)
+
+   * remove users of unreachable_fids table (Eric Wong <e@80x24.org>)
+
+   * monitor: batch MySQL device table updates (Eric Wong <normalperson@yhbt.net>)
+
+   * monitor: defer DB updates until all HTTP requests are done (Eric Wong <normalperson@yhbt.net>)
+
+   * connection/poolable: defer expiry of timed out connections (Eric Wong <e@80x24.org>)
+
+   * connection/poolable: disable watch_write before retrying write (Eric Wong <normalperson@yhbt.net>)
+
+   * connection/poolable: do not write before event_write (Eric Wong <normalperson@yhbt.net>)
+
+   * add conn_pool_size configuration option (Eric Wong <normalperson@yhbt.net>)
+
+   * enable TCP keepalives for iostat watcher sockets (Eric Wong <normalperson@yhbt.net>)
+
+   * host: add "readonly" state to override device "alive" state (Eric Wong <normalperson@yhbt.net>)
+
+   * add LICENSE file to distro (dormando <dormando@rydia.net>)
+
 2013-08-18: Release version 2.70
 
    * This release features a very large rewrite to the Monitor worker to run
@@ -0,0 +1 @@
+License granted to use/distribute under the same terms as Perl itself.
@@ -1,4 +1,5 @@
 CHANGES
+LICENSE
 examples/testapp/testapp-perlbal.conf
 examples/testapp/testapp.psgi
 examples/testapp/README
@@ -4,7 +4,7 @@
       "Brad Fitzpatrick <brad@danga.com>"
    ],
    "dynamic_config" : 1,
-   "generated_by" : "ExtUtils::MakeMaker version 6.62, CPAN::Meta::Converter version 2.112621",
+   "generated_by" : "ExtUtils::MakeMaker version 6.76, CPAN::Meta::Converter version 2.132510",
    "license" : [
       "unknown"
    ],
@@ -29,28 +29,28 @@
    "prereqs" : {
       "build" : {
          "requires" : {
-            "ExtUtils::MakeMaker" : 0
+            "ExtUtils::MakeMaker" : "0"
          }
       },
       "configure" : {
          "requires" : {
-            "ExtUtils::MakeMaker" : 0
+            "ExtUtils::MakeMaker" : "0"
          }
       },
       "runtime" : {
          "requires" : {
-            "DBI" : 0,
+            "DBI" : "0",
             "Danga::Socket" : "1.56",
-            "Getopt::Long" : 0,
-            "IO::AIO" : 0,
-            "MogileFS::Client" : 0,
-            "Net::Netmask" : 0,
+            "Getopt::Long" : "0",
+            "IO::AIO" : "0",
+            "MogileFS::Client" : "0",
+            "Net::Netmask" : "0",
             "Perlbal" : "1.79",
-            "Symbol" : 0,
+            "Symbol" : "0",
             "Sys::Syscall" : "0.22",
-            "Sys::Syslog" : 0,
+            "Sys::Syslog" : "0",
             "Test::More" : "0.88",
-            "fields" : 0
+            "fields" : "0"
          }
       }
    },
@@ -65,5 +65,5 @@
       },
       "x_MailingList" : "http://groups.google.com/group/mogile"
    },
-   "version" : "2.70"
+   "version" : "2.72"
 }
@@ -7,7 +7,7 @@ build_requires:
 configure_requires:
   ExtUtils::MakeMaker: 0
 dynamic_config: 1
-generated_by: 'ExtUtils::MakeMaker version 6.62, CPAN::Meta::Converter version 2.112621'
+generated_by: 'ExtUtils::MakeMaker version 6.76, CPAN::Meta::Converter version 2.132510'
 license: unknown
 meta-spec:
   url: http://module-build.sourceforge.net/META-spec-v1.4.html
@@ -37,8 +37,8 @@ requires:
   Test::More: 0.88
   fields: 0
 resources:
+  MailingList: http://groups.google.com/group/mogile
   bugtracker: http://code.google.com/p/mogilefs/issues/list
   homepage: http://www.mogilefs.org
   repository: git://github.com/mogilefs/MogileFS-Server.git
-  x_MailingList: http://groups.google.com/group/mogile
-version: 2.70
+version: 2.72
@@ -68,6 +68,7 @@ our (
     $max_disk_age,
     $node_timeout,          # time in seconds to wait for storage node responses
     $conn_timeout,          # time in seconds to wait for connection to storage node
+    $conn_pool_size,        # size of the HTTP connection pool
     $pidfile,
     $repl_use_get_port,
     $local_network,
@@ -99,6 +100,7 @@ sub load_config {
                              'default_mindevcount=i' => \$cmdline{default_mindevcount},
                              'node_timeout=i' => \$cmdline{node_timeout},
                              'conn_timeout=i' => \$cmdline{conn_timeout},
+                             'conn_pool_size=i' => \$cmdline{conn_pool_size},
                              'max_handles=i'  => \$cmdline{max_handles},
                              'pidfile=s'      => \$cmdline{pidfile},
                              'no_schema_check' => \$cmdline{no_schema_check},
@@ -168,6 +170,7 @@ sub load_config {
     choose_value( 'default_mindevcount', 2 );
     $node_timeout   = choose_value( 'node_timeout', 2 );
     $conn_timeout   = choose_value( 'conn_timeout', 2 );
+    $conn_pool_size = choose_value( 'conn_pool_size', 20 );
 
     choose_value( 'rebalance_ignore_missing', 0 );
     $repl_use_get_port = choose_value( 'repl_use_get_port', 0 );
@@ -13,6 +13,7 @@ use fields (
     'mfs_expire_cb',  # Danga::Socket::Timer callback
     'mfs_requests',   # number of requests made on this object
     'mfs_err',        # used to propagate an error to start()
+    'mfs_writeq',     # arrayref if connecting, undef otherwise
 );
 use Socket qw(SO_KEEPALIVE);
 use Time::HiRes;
@@ -27,6 +28,9 @@ sub new {
     $self->{mfs_hostport} = [ $ip, $port ];
     $self->{mfs_requests} = 0;
 
+    # newly-created socket, we buffer writes until event_write is triggered
+    $self->{mfs_writeq} = [];
+
     return $self;
 }
 
@@ -53,6 +57,38 @@ sub mark_idle {
     $self->{mfs_requests}++;
 }
 
+sub write {
+    my ($self, $arg) = @_;
+    my $writeq = $self->{mfs_writeq};
+
+    if (ref($writeq) eq "ARRAY") {
+        # if we're still connecting, we must buffer explicitly for *BSD
+        # and not attempt a real write() until event_write is triggered
+        push @$writeq, $arg;
+        $self->watch_write(1); # enable event_write triggering
+        0; # match Danga::Socket::write return value
+    } else {
+        $self->SUPER::write($arg);
+    }
+}
+
+# Danga::Socket will trigger this when a socket is writable
+sub event_write {
+    my ($self) = @_;
+
+    # we may have buffered writes in mfs_writeq during non-blocking connect(),
+    # this is needed on *BSD but unnecessary (but harmless) on Linux.
+    my $writeq = delete $self->{mfs_writeq};
+    if ($writeq) {
+        $self->watch_write(0); # ->write will re-enable if needed
+        foreach my $queued (@$writeq) {
+            $self->write($queued);
+        }
+    } else {
+        $self->SUPER::event_write();
+    }
+}
+
 # the request running on this connection is retryable if this socket
 # has ever been marked idle.  The connection pool can never be 100%
 # reliable for detecting dead sockets, and all HTTP requests made by
@@ -69,6 +105,7 @@ sub set_timeout {
     my ($self, $timeout_key) = @_;
     my $mfs_pool = $self->{mfs_pool};
 
+    $self->SetPostLoopCallback(undef);
     if ($timeout_key) {
         my $timeout;
 
@@ -108,7 +145,7 @@ sub expired {
     if ($now >= $expire) {
         my $expire_cb = delete $self->{mfs_expire_cb};
         if ($expire_cb && $self->sock) {
-            $expire_cb->($now);
+            $self->SetPostLoopCallback(sub { $expire_cb->($now); 1 });
         }
         return 1;
     }
@@ -122,12 +122,12 @@ sub dstate {
 }
 
 sub can_delete_from {
-    return $_[0]->dstate->can_delete_from;
+    return $_[0]->host->alive && $_[0]->dstate->can_delete_from;
 }
 
 # this method is for Monitor, other workers should use should_read_from
 sub can_read_from {
-    return $_[0]->host->alive && $_[0]->dstate->can_read_from;
+    return $_[0]->host->should_read_from && $_[0]->dstate->can_read_from;
 }
 
 # this is the only method a worker should call for checking for readability
@@ -151,12 +151,6 @@ sub enqueue_for_replication {
     Mgd::get_store()->enqueue_for_replication($self->id, $from_devid, $in);
 }
 
-sub mark_unreachable {
-    my $self = shift;
-    # update database table
-    Mgd::get_store()->mark_fidid_unreachable($self->id);
-}
-
 sub delete {
     my $fid = shift;
     my $sto = Mgd::get_store();
@@ -36,7 +36,7 @@ sub new_from_args {
 
 sub valid_state {
     my ($class, $state) = @_;
-    return $state && $state =~ /\A(?:alive|dead|down)\z/;
+    return $state && $state =~ /\A(?:alive|dead|down|readonly)\z/;
 }
 
 # Instance methods:
@@ -77,6 +77,14 @@ sub alive {
     return $_[0]->status eq 'alive';
 }
 
+sub readonly {
+    return $_[0]->status eq 'readonly';
+}
+
+sub should_read_from {
+    return $_[0]->alive || $_[0]->readonly;
+}
+
 sub observed_reachable {
     my $self = shift;
     return $self->{observed_state} && $self->{observed_state} eq 'reachable';
@@ -154,8 +162,11 @@ sub http {
 # FIXME - make these customizable
 sub _init_pools {
     return if $http_pool;
+    my $opts = {
+        total_capacity => MogileFS->config("conn_pool_size"),
+    };
 
-    $http_pool = MogileFS::ConnectionPool->new("MogileFS::Connection::HTTP");
+    $http_pool = MogileFS::ConnectionPool->new("MogileFS::Connection::HTTP", $opts);
 }
 
 1;
@@ -97,6 +97,7 @@ sub got_disconnect {
 
 # Support class that does the communication with individual hosts.
 package MogileFS::IOStatWatch::Client;
+use Socket qw(SO_KEEPALIVE);
 
 use strict;
 use warnings;
@@ -117,6 +118,7 @@ sub new {
                                      );
     return unless $sock;
 
+    $sock->sockopt(SO_KEEPALIVE, 1);
     $self = fields::new($self) unless ref $self;
     $self->SUPER::new($sock);
     $self->watch_write(1);
@@ -2,7 +2,7 @@ package MogileFS::Server;
 use strict;
 use warnings;
 use vars qw($VERSION);
-$VERSION = "2.70";
+$VERSION = "2.72";
 
 =head1 NAME
 
@@ -396,6 +396,13 @@ sub upgrade_modify_device_size {
     }
 }
 
+sub upgrade_add_host_readonly {
+    my $self = shift;
+    unless ($self->column_type("host", "status") =~ /\breadonly\b/) {
+        $self->dowell("ALTER TABLE host MODIFY COLUMN status ENUM('alive', 'dead', 'down', 'readonly')");
+    }
+}
+
 sub pre_daemonize_checks {
     my $self = shift;
     # Jay Buffington, from the mailing lists, writes:
@@ -437,6 +444,31 @@ sub get_keys_like_operator {
     return $bool ? "LIKE /*! BINARY */" : "LIKE";
 }
 
+sub update_device_usages {
+    my ($self, $updates, $cb) = @_;
+    $cb->();
+    my $chunk = 10000; # in case we hit max_allowed_packet size(!)
+    while (scalar @$updates) {
+        my @cur = splice(@$updates, 0, $chunk);
+        my @set;
+        foreach my $fld (qw(mb_total mb_used mb_asof)) {
+            my $s = "$fld = CASE devid\n";
+            foreach my $upd (@cur) {
+                my $devid = $upd->{devid};
+                defined($devid) or croak("devid not set\n");
+                my $val = $upd->{$fld};
+                defined($val) or croak("$fld not defined for $devid\n");
+                $s .= "WHEN $devid THEN $val\n";
+            }
+            $s .= "ELSE $fld END";
+            push @set, $s;
+        }
+        my $sql = "UPDATE device SET ". join(",\n", @set);
+        $self->dowell($sql);
+        $cb->();
+    }
+}
+
 1;
 
 __END__
@@ -292,6 +292,13 @@ sub upgrade_add_device_drain {
     }
 }
 
+sub upgrade_add_host_readonly {
+    my $self = shift;
+    unless ($self->column_constraint("host", "status") =~ /\breadonly\b/) {
+        $self->dowell("ALTER TABLE host MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly'))");
+    }
+}
+
 sub upgrade_modify_server_settings_value {
     my $self = shift;
     unless ($self->column_type("server_settings", "value" =~ /text/i)) {
@@ -694,20 +701,6 @@ sub create_device {
     return 1;
 }
 
-sub mark_fidid_unreachable {
-    my ($self, $fidid) = @_;
-    my $dbh = $self->dbh;
-
-    eval {
-        $self->insert_or_update(
-            insert => "INSERT INTO unreachable_fids (fid, lastupdate) VALUES (?, ".$self->unix_timestamp.")",
-            insert_vals => [ $fidid ],
-            update => "UPDATE unreachable_fids SET lastupdate = ".$self->unix_timestamp." WHERE field = ?",
-            update_vals => [ $fidid ],
-        );
-    };
-}
-
 sub replace_into_file {
     my $self = shift;
     my %arg  = $self->_valid_params([qw(fidid dmid key length classid devcount)], @_);
@@ -174,6 +174,7 @@ sub was_duplicate_error {
     my $errstr = $dbh->errstr;
     return 1 if $errstr =~ /(?:is|are) not unique/i;
     return 1 if $errstr =~ /must be unique/i;
+    return 1 if $errstr =~ /UNIQUE constraint failed/i;
     return 0;
 }
 
@@ -363,6 +364,7 @@ sub upgrade_add_device_drain {
 sub upgrade_modify_server_settings_value { 1 }
 sub upgrade_add_file_to_queue_arg { 1 }
 sub upgrade_modify_device_size { 1 }
+sub upgrade_add_host_readonly { 1 }
 
 sub BLOB_BIND_TYPE { SQL_BLOB }
 
@@ -20,7 +20,8 @@ use List::Util qw(shuffle);
 #     also adds a TEXT 'arg' column to file_to_queue for passing arguments
 # 14: modifies 'device' mb_total, mb_used to INT for devs > 16TB
 # 15: adds checksum table, adds 'hashtype' column to 'class' table
-use constant SCHEMA_VERSION => 15;
+# 16: adds 'readonly' state to enum in host table
+use constant SCHEMA_VERSION => 16;
 
 sub new {
     my ($class) = @_;
@@ -1218,14 +1219,26 @@ sub update_device {
 
 sub update_device_usage {
     my $self = shift;
-    my %arg  = $self->_valid_params([qw(mb_total mb_used devid)], @_);
+    my %arg = $self->_valid_params([qw(mb_total mb_used devid mb_asof)], @_);
     eval {
-        $self->dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = " . $self->unix_timestamp .
-                       " WHERE devid = ?", undef, $arg{mb_total}, $arg{mb_used}, $arg{devid});
+        $self->dbh->do("UPDATE device SET ".
+                       "mb_total = ?, mb_used = ?, mb_asof = ?" .
+                       " WHERE devid = ?",
+                       undef, $arg{mb_total}, $arg{mb_used}, $arg{mb_asof},
+                       $arg{devid});
     };
     $self->condthrow;
 }
 
+# MySQL has an optimized version
+sub update_device_usages {
+    my ($self, $updates, $cb) = @_;
+    foreach my $upd (@$updates) {
+        $self->update_device_usage(%$upd);
+        $cb->();
+    }
+}
+
 # This is unimplemented at the moment as we must verify:
 # - no file_on rows exist
 # - nothing in file_to_queue is going to attempt to use it
@@ -1239,13 +1252,6 @@ sub delete_device {
     die "Unimplemented; needs further testing";
 }
 
-sub mark_fidid_unreachable {
-    my ($self, $fidid) = @_;
-    die "Your database does not support REPLACE! Reimplement mark_fidid_unreachable!" unless $self->can_replace;
-    $self->dbh->do("REPLACE INTO unreachable_fids VALUES (?, " . $self->unix_timestamp . ")",
-                   undef, $fidid);
-}
-
 sub set_device_weight {
     my ($self, $devid, $weight) = @_;
     eval {
@@ -1665,14 +1671,6 @@ sub update_host {
     return 1;
 }
 
-sub update_host_property {
-    my ($self, $hostid, $col, $val) = @_;
-    $self->conddup(sub {
-        $self->dbh->do("UPDATE host SET $col=? WHERE hostid=?", undef, $val, $hostid);
-    });
-    return 1;
-}
-
 # return ne hostid, or throw 'dup' on error.
 # NOTE: you need to put them into the initial 'down' state.
 sub create_host {
@@ -20,6 +20,7 @@ use fields (
             'db_monitor_ran',  # We announce "monitor_just_ran" every time the
                                # device checks are run, but only if the DB has
                                # been checked inbetween.
+            'devs_to_update'   # device table update queue
             );
 
 use Danga::Socket 1.56;
@@ -105,6 +106,7 @@ sub usage_refresh {
         # Fetch the freshlist list of entries, to avoid excessive writes.
         $self->{updateable_devices} = { map { $_->{devid} => $_ }
             Mgd::get_store()->get_all_devices };
+        $self->{devs_to_update} = [];
     } else {
         $self->{updateable_devices} = undef;
     }
@@ -133,11 +135,6 @@ sub usage_refresh {
 sub usage_refresh_done {
     my ($self) = @_;
 
-    if ($self->{updateable_devices}) {
-        Mgd::get_store()->release_lock('mgfs:device_update');
-        $self->{updateable_devices} = undef;
-    }
-
     $self->{devutil}->{prev} = $self->{devutil}->{tmp};
     # Set the IOWatcher hosts (once old monitor code has been disabled)
 
@@ -179,6 +176,14 @@ sub usage_refresh_done {
             $self->send_to_parent(":monitor_just_ran");
         }
     }
+
+    if ($self->{updateable_devices}) {
+        my $sto = Mgd::get_store();
+        my $updates = delete $self->{devs_to_update};
+        $sto->update_device_usages($updates, sub { $self->still_alive });
+        $sto->release_lock('mgfs:device_update');
+        $self->{updateable_devices} = undef;
+    }
 }
 
 sub work {
@@ -410,10 +415,13 @@ sub check_usage_response {
     if ($self->{updateable_devices}) {
         my $devrow = $self->{updateable_devices}->{$devid};
         my $last = ($devrow && $devrow->{mb_asof}) ? $devrow->{mb_asof} : 0;
-        if ($last + UPDATE_DB_EVERY < time()) {
-            Mgd::get_store()->update_device_usage(mb_total => int($total / 1024),
-                                                  mb_used  => int($used / 1024),
-                                                  devid    => $devid);
+        my $now = time();
+        if ($last + UPDATE_DB_EVERY < $now) {
+            my %upd = (mb_total => int($total / 1024),
+                       mb_used  => int($used / 1024),
+                       mb_asof => $now,
+                       devid => $devid);
+            push @{$self->{devs_to_update}}, \%upd;
         }
     }
     return 1;
@@ -62,8 +62,13 @@ observed_state => 'writeable'});
             ok(!$dev->can_read_from, "can_read_from for device fails when host is $s");
             ok(!$dev->should_read_from, "device should not be readable when host is $s");
         }
+        $host->{status} = "readonly";
+        ok($dev->can_read_from, "device is readable from again");
+        ok(! $dev->should_get_new_files, "device should not get new files");
+
         $host->{status} = "alive";
         ok($dev->can_read_from, "device is readable from again");
+        ok($dev->should_get_new_files, "device should get new files again");
     }
 
     # first ensure device status is respected
@@ -32,11 +32,18 @@ my $host_args = {
     http_get_port => $http_get->sockport,
 };
 my $host = MogileFS::Host->new_from_args($host_args);
+
+# required, defaults to 20 in normal server
+MogileFS::Config->set_config("conn_pool_size", 13);
+
 MogileFS::Host->_init_pools;
+
 my $idle_pool = $MogileFS::Host::http_pool->{idle};
 is("MogileFS::Host", ref($host), "host created");
 MogileFS::Config->set_config("node_timeout", 1);
 
+is(13, $MogileFS::Host::http_pool->{total_capacity}, "conn_pool_size took effect");
+
 # hit the http_get_port
 {
     my $resp;