The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package MogileFS::Worker::Reaper;
# deletes files

use strict;
use base 'MogileFS::Worker';
use MogileFS::Server;
use MogileFS::Util qw(error debug);
use MogileFS::Config qw(DEVICE_SUMMARY_CACHE_TIMEOUT);
use constant REAP_INTERVAL => 5;
use constant REAP_BACKOFF_MIN => 60;

# completely forget about devices we've reaped after 2 hours of idleness
use constant REAP_BACKOFF_MAX => 7200;

sub new {
    my ($class, $psock) = @_;
    my $self = fields::new($class);
    $self->SUPER::new($psock);

    return $self;
}

sub watchdog_timeout {
    return 240;
}

# order is important here:
#
# first, add fid to file_to_replicate table.  it
# shouldn't matter if the replicator gets to this
# before the subsequent 'forget_about' method, as the
# replicator will treat dead file_on devices as
# non-existent anyway.  however, it is important that
# we enqueue it for replication first, before we
# forget about that file_on row, otherwise a failure
# after/during 'forget_about' could leave a stranded
# file on a dead device and we'd never fix it.
sub reap_fid {
    my ($self, $fid, $dev) = @_;

    $fid->enqueue_for_replication(in => 1);
    $dev->forget_about($fid);
}

# this returns 1000 by default
sub reaper_inject_limit {
    my ($self) = @_;

    my $sto = Mgd::get_store();
    my $max = MogileFS::Config->server_setting_cached('queue_size_for_reaper');
    my $limit = MogileFS::Config->server_setting_cached('queue_rate_for_reaper') || 1000;

    # max defaults to zero, meaning we inject $limit every wakeup
    if ($max) {
        # if a queue size limit is configured for reaper, prevent too many
        # files from entering the repl queue:
        my $len = $sto->deferred_repl_queue_length;
        my $space_left = $max - $len;

        $limit = $space_left if ($limit > $space_left);

        # limit may end up being negative here since other processes
        # can inject into the deferred replication queue, reaper is
        # the only one which can respect this queue size
        $limit = 0 if $limit < 0;
    }

    return $limit;
}

# we pass the $devid here (instead of a Device object) to avoid
# potential memory leaks since this sub reschedules itself to run
# forever.   $delay is the current delay we were scheduled at
sub reap_dev {
    my ($self, $devid, $delay) = @_;

    # ensure the master DB is up, retry in REAP_INTERVAL if down
    unless ($self->validate_dbh) {
        $delay = REAP_INTERVAL;
        Danga::Socket->AddTimer($delay, sub { $self->reap_dev($devid, $delay) });
        return;
    }

    my $limit = $self->reaper_inject_limit;

    # just in case a user mistakenly nuked a devid from the device table:
    my $dev = Mgd::device_factory()->get_by_id($devid);
    unless ($dev) {
        error("No device row for dev$devid, cannot reap");
        $delay = undef;
    }

    # limit == 0 if we hit the queue size limit, we'll just reschedule
    if ($limit && $dev) {
        my $sto = Mgd::get_store();
        my $lock = "mgfs:reaper";
        my $lock_timeout = $self->watchdog_timeout / 4;
        my @fids;

        if ($sto->get_lock($lock, $lock_timeout)) {
            @fids = $dev->fid_list(limit => $limit);
            if (@fids) {
                $self->still_alive;
                foreach my $fid (@fids) {
                    $self->reap_fid($fid, $dev);
                }
            }
            $sto->release_lock($lock);

            # if we've found any FIDs (perhaps even while backing off)
            # ensure we try to find more soon:
            if (@fids) {
                $delay = REAP_INTERVAL;
            } else {
                $delay = $self->reap_dev_backoff_delay($delay);
            }
        } else {
            # No lock after a long lock_timeout?  Try again soon.
            # We should never get here under MySQL, and rarely for other DBs.
            debug("get_lock($lock, $lock_timeout) failed");
            $delay = REAP_INTERVAL;
        }
    }

    return unless defined $delay;

    # schedule another update, delay could be REAP_BACKOFF_MAX
    Danga::Socket->AddTimer($delay, sub { $self->reap_dev($devid, $delay) });
}

# called when we're hopefully all done with a device, but reschedule
# into the future in case the replicator had an out-of-date cache and the
# "dead" device was actually writable.
sub reap_dev_backoff_delay {
    my ($self, $delay) = @_;

    return REAP_BACKOFF_MIN if ($delay < REAP_BACKOFF_MIN);

    $delay *= 2;
    return $delay > REAP_BACKOFF_MAX ? undef : $delay;
}

# looks for dead devices
sub work {
    my $self = shift;

    # we just forked from our parent process, also using Danga::Socket,
    # so we need to lose all that state and start afresh.
    Danga::Socket->Reset;

    # ensure we get monitor updates
    Danga::Socket->AddOtherFds($self->psock_fd, sub{ $self->read_from_parent });

    my %devid_seen;
    my $reap_check;
    $reap_check = sub {
        # get db and note we're starting a run
        $self->parent_ping;
        debug("Reaper running; looking for dead devices");

        foreach my $dev (grep { $_->dstate->is_perm_dead }
                         Mgd::device_factory()->get_all)
        {
            next if $devid_seen{$dev->id};

            # delay the initial device reap in case any replicator cache
            # thinks the device is still alive
            Danga::Socket->AddTimer(DEVICE_SUMMARY_CACHE_TIMEOUT + 1, sub {
                $self->reap_dev($dev->id, REAP_INTERVAL);
            });

            # once we've seen a device, reap_dev will takeover scheduling
            # reaping for the given device.
            $devid_seen{$dev->id} = 1;
        }

        Danga::Socket->AddTimer(REAP_INTERVAL, $reap_check);
    };

    # kick off the reaper and loop forever
    $reap_check->();
    Danga::Socket->EventLoop;
}

1;

# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End: