package MogileFS::Worker::Delete;
# deletes files
use strict;
use base 'MogileFS::Worker';
use MogileFS::Util qw(error);
use MogileFS::Server;
# we select 1000 but only do a random 100 of them, to allow
# for stateless parallelism
use constant LIMIT => 1000;
use constant PER_BATCH => 100;
# TODO: use LWP and persistent connections to do deletes. less local ports used.
sub new {
my ($class, $psock) = @_;
my $self = fields::new($class);
$self->SUPER::new($psock);
return $self;
}
sub watchdog_timeout { 120 }
sub work {
my $self = shift;
my $sleep_for = 0; # we sleep longer and longer until we hit max_sleep
my $sleep_max = 5; # max sleep when there's nothing to do.
my $old_queue_check = 0; # next time to check the old queue.
my $old_queue_backoff = 0; # backoff index
while (1) {
$self->send_to_parent("worker_bored 50 delete");
$self->read_from_parent(1);
next unless $self->validate_dbh;
# call our workers, and have them do things
# RETVAL = 0; I think I am done working for now
# RETVAL = 1; I have more work to do
my $lock = 'mgfs:tempfiles';
# This isn't something we need to wait for: just need to ensure one is.
my $tempres;
if (Mgd::get_store()->get_lock($lock, 0)) {
$tempres = $self->process_tempfiles;
Mgd::get_store()->release_lock($lock);
}
my $delres;
if (time() > $old_queue_check) {
$self->reenqueue_delayed_deletes;
$delres = $self->process_deletes;
# if we did no work, crawl the backoff.
if ($delres) {
$old_queue_backoff = 0;
$old_queue_check = 0;
} else {
$old_queue_check = time() + $old_queue_backoff
if $old_queue_backoff > 360;
$old_queue_backoff++ unless $old_queue_backoff > 1800;
}
}
my $delres2 = $self->process_deletes2;
# unless someone did some work, let's sleep
unless ($tempres || $delres || $delres2) {
$sleep_for++ if $sleep_for < $sleep_max;
sleep $sleep_for;
} else {
$sleep_for = 0;
}
}
}
sub process_tempfiles {
my $self = shift;
# also clean the tempfile table
#mysql> select * from tempfile where createtime < unix_timestamp() - 86400 limit 50;
#+--------+------------+---------+------+---------+--------+
#| fid | createtime | classid | dmid | dkey | devids |
#+--------+------------+---------+------+---------+--------+
#| 3253 | 1149451058 | 1 | 1 | file574 | 1,2 |
#| 4559 | 1149451156 | 1 | 1 | file83 | 1,2 |
#| 11024 | 1149451697 | 1 | 1 | file836 | 2,1 |
#| 19885 | 1149454542 | 1 | 1 | file531 | 1,2 |
# BUT NOTE:
# the fids might exist on one of the devices in devids column if we assigned them those,
# they wrote some to one of them, then they died or for whatever reason didn't create_close
# to use, so we shouldn't delete from tempfile before going on a hunt of the missing fid.
# perhaps we should just add to the file_on table for both devids, and let the regular delete
# process discover via 404 that they're not there.
# so we should:
# select fid, devids from tempfile where createtime < unix_timestamp() - 86400
# add file_on rows for both of those,
# add fid to fids_to_delete table,
# delete from tempfile where fid=?
# dig up some temporary files to purge
my $sto = Mgd::get_store();
my $too_old = int($ENV{T_TEMPFILE_TOO_OLD} || 3600);
my $tempfiles = $sto->old_tempfiles($too_old);
return 0 unless $tempfiles && @$tempfiles;
# insert the right rows into file_on and file_to_delete and remove the
# now expunged (or soon to be) rows from tempfile
my (@devfids, @fidids);
foreach my $row (@$tempfiles) {
# If FID is still loadable, we've arrived here due to a bug or race
# condition elsewhere. Remove the tempfile row but don't delete the
# file!
my $fidid = $row->[0];
my $fid = MogileFS::FID->new($fidid);
if ($fid->exists) {
$sto->delete_tempfile_row($fidid);
next;
}
push @fidids, $fidid;
# sanity check the string column.
my $devids = $row->[1];
unless ($devids =~ /^(\d+)(,\d+)*$/) {
$devids = "";
}
foreach my $devid (split /,/, $devids) {
push @devfids, MogileFS::DevFID->new($devid, $row->[0]);
}
}
# We might've done no work due to discovering the tempfiles are real.
return 0 unless @fidids;
$sto->mass_insert_file_on(@devfids);
$sto->enqueue_fids_to_delete2(@fidids);
$sto->dbh->do("DELETE FROM tempfile WHERE fid IN (" . join(',', @fidids) . ")");
return 1;
}
# new style delete queueing. I'm not putting a lot of effort into commonizing
# code between the old one and the new one. Feel free to send a patch!
sub process_deletes2 {
my $self = shift;
my $sto = Mgd::get_store();
my $queue_todo = $self->queue_todo('delete');
unless (@$queue_todo) {
# No work.
return 0;
}
while (my $todo = shift @$queue_todo) {
$self->still_alive;
# load all the devids related to this fid, and delete.
my $fid = MogileFS::FID->new($todo->{fid});
my $fidid = $fid->id;
# if it's currently being replicated, wait for replication to finish
# before deleting to avoid stale files
if (! $sto->should_begin_replicating_fidid($fidid)) {
$sto->reschedule_file_to_delete2_relative($fidid, 1);
next;
}
$sto->delete_fidid_enqueued($fidid);
my @devids = $fid->devids;
my %devids = map { $_ => 1 } @devids;
for my $devid (@devids) {
my $dev = $devid ? Mgd::device_factory()->get_by_id($devid) : undef;
error("deleting fid $fidid, on devid ".($devid || 'NULL')."...") if $Mgd::DEBUG >= 2;
unless ($dev) {
next;
}
if ($dev->dstate->is_perm_dead) {
$sto->remove_fidid_from_devid($fidid, $devid);
delete $devids{$devid};
next;
}
# devid is observed down/readonly: delay for at least
# 10 minutes.
unless ($dev->observed_writeable) {
$sto->reschedule_file_to_delete2_relative($fidid,
60 * (10 + $todo->{failcount}));
next;
}
# devid is marked readonly/down/etc: delay for
# at least 1 hour.
unless ($dev->can_delete_from) {
$sto->reschedule_file_to_delete2_relative($fidid,
60 * 60 * (1 + $todo->{failcount}));
next;
}
my $dfid = MogileFS::DevFID->new($dev, $fidid);
my $path = $dfid->url;
# dormando: "There are cases where url can return undefined,
# Mogile appears to try to replicate to bogus devices
# sometimes?"
unless ($path) {
error("in deleter, url(devid=$devid, fid=$fidid) returned nothing");
next;
}
my $urlparts = MogileFS::Util::url_parts($path);
# hit up the server and delete it
# TODO: (optimization) use MogileFS->get_observed_state and don't
# try to delete things known to be down/etc
my $sock = IO::Socket::INET->new(PeerAddr => $urlparts->[0],
PeerPort => $urlparts->[1],
Timeout => 2);
# this used to mark the device as down for the whole tracker.
# if the device is actually down, we can struggle until the
# monitor job figures it out... otherwise an occasional timeout
# due to high load will prevent delete from working at all.
unless ($sock) {
$sto->reschedule_file_to_delete2_relative($fidid,
60 * 60 * (1 + $todo->{failcount}));
next;
}
# send delete request
error("Sending delete for $path") if $Mgd::DEBUG >= 2;
$sock->write("DELETE $urlparts->[2] HTTP/1.0\r\n\r\n");
my $response = <$sock>;
if ($response =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
if (($1 >= 200 && $1 <= 299) || $1 == 404) {
# effectively means all went well
$sto->remove_fidid_from_devid($fidid, $devid);
delete $devids{$devid};
} else {
# remote file system error? mark node as down
my $httpcode = $1;
error("Error: unlink failure: $path: HTTP code $httpcode");
$sto->reschedule_file_to_delete2_relative($fidid,
60 * 30 * (1 + $todo->{failcount}));
next;
}
} else {
error("Error: unknown response line deleting $path: $response");
}
}
# fid has no pants.
unless (keys %devids) {
$sto->delete_fid_from_file_to_delete2($fidid);
}
$sto->note_done_replicating($fidid);
}
# did work.
return 1;
}
sub process_deletes {
my $self = shift;
my $sto = Mgd::get_store();
my $dbh = $sto->dbh;
my $delmap = $dbh->selectall_arrayref("SELECT fd.fid, fo.devid ".
"FROM file_to_delete fd ".
"LEFT JOIN file_on fo ON fd.fid=fo.fid ".
"LIMIT " . LIMIT);
my $count = $delmap ? scalar @$delmap : 0;
return 0 unless $count;
my $done = 0;
foreach my $dm (List::Util::shuffle(@$delmap)) {
last if ++$done > PER_BATCH;
$self->still_alive;
my ($fid, $devid) = @$dm;
error("deleting fid $fid, on devid ".($devid || 'NULL')."...") if $Mgd::DEBUG >= 2;
my $done_with_fid = sub {
my $reason = shift;
$dbh->do("DELETE FROM file_to_delete WHERE fid=?", undef, $fid);
$sto->condthrow("Failure to delete from file_to_delete for fid=$fid");
};
my $done_with_devid = sub {
my $reason = shift;
$dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
undef, $fid, $devid);
$sto->condthrow("Failure to delete from file_on for $fid/$devid");
die "Failed to delete from file_on: " . $dbh->errstr if $dbh->err;
};
my $reschedule_fid = sub {
my ($secs, $reason) = (int(shift), shift);
$sto->insert_ignore("INTO file_to_delete_later (fid, delafter) ".
"VALUES (?,".$sto->unix_timestamp."+$secs)", undef,
$fid);
error("delete of fid $fid rescheduled: $reason") if $Mgd::DEBUG >= 2;
$done_with_fid->("rescheduled");
};
# Cases:
# devid is null: doesn't exist anywhere anymore, we're done with this fid.
# devid is observed down/readonly: delay for 10 minutes
# devid is marked readonly: delay for 2 hours
# devid is marked dead or doesn't exist: consider it deleted on this devid.
# CASE: devid is null, which means we're done deleting all instances.
unless (defined $devid) {
$done_with_fid->("no_more_locations");
next;
}
# CASE: devid is marked dead or doesn't exist: consider it deleted on this devid.
# (Note: we're tolerant of '0' as a devid, due to old buggy version which
# would sometimes put that in there)
my $dev = $devid ? Mgd::device_factory()->get_by_id($devid) : undef;
unless ($dev) {
$done_with_devid->("devid_doesnt_exist");
next;
}
if ($dev->dstate->is_perm_dead) {
$done_with_devid->("devid_marked_dead");
next;
}
# CASE: devid is observed down/readonly: delay for 10 minutes
unless ($dev->observed_writeable) {
$reschedule_fid->(60 * 10, "not_observed_writeable");
next;
}
# CASE: devid is marked readonly/down/etc: delay for 2 hours
unless ($dev->can_delete_from) {
$reschedule_fid->(60 * 60 * 2, "devid_marked_not_alive");
next;
}
my $dfid = MogileFS::DevFID->new($dev, $fid);
my $path = $dfid->url;
# dormando: "There are cases where url can return undefined,
# Mogile appears to try to replicate to bogus devices
# sometimes?"
unless ($path) {
error("in deleter, url(devid=$devid, fid=$fid) returned nothing");
next;
}
my $urlparts = MogileFS::Util::url_parts($path);
# hit up the server and delete it
# TODO: (optimization) use MogileFS->get_observed_state and don't try to delete things known to be down/etc
my $sock = IO::Socket::INET->new(PeerAddr => $urlparts->[0],
PeerPort => $urlparts->[1],
Timeout => 2);
unless ($sock) {
# timeout or something, mark this device as down for now and move on
$reschedule_fid->(60 * 60 * 2, "no_sock_to_hostid");
next;
}
# send delete request
error("Sending delete for $path") if $Mgd::DEBUG >= 2;
$sock->write("DELETE $urlparts->[2] HTTP/1.0\r\n\r\n");
my $response = <$sock>;
if ($response =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
if (($1 >= 200 && $1 <= 299) || $1 == 404) {
# effectively means all went well
$done_with_devid->("deleted");
} else {
# remote file system error? mark node as down
my $httpcode = $1;
error("Error: unlink failure: $path: HTTP code $httpcode");
$reschedule_fid->(60 * 30, "http_code_$httpcode");
next;
}
} else {
error("Error: unknown response line deleting $path: $response");
}
}
# as far as we know, we have more work to do
return 1;
}
sub reenqueue_delayed_deletes {
my $self = shift;
my $sto = Mgd::get_store();
my $dbh = $sto->dbh;
my @fidids = $sto->fids_to_delete_again
or return;
$sto->enqueue_fids_to_delete(@fidids);
$dbh->do("DELETE FROM file_to_delete_later WHERE fid IN (" .
join(",", @fidids) . ")");
$sto->condthrow("reenqueue file_to_delete_later delete");
}
1;
# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End: