package MogileFS::Rebalance;
use strict;
use warnings;
use Carp qw(croak);
use List::Util ();
use MogileFS::Server ();
# Note: The filters aren't written for maximum speed, as they're not likely
# in the slow path. They're supposed to be readable/extensible. Please don't
# cram them down unless you have to.
# TODO: allow filters to note by dev why they were filtered in/out, and return
# that info for DEBUG display.
# TODO: Add "debug trace" lines to most functions. "choosing sdev to work on",
# etc.
# TODO: tally into the state how many fids/size/etc it's done so far.
# TODO: should track old device state and return to it. Overall this is
# probably better fit by switching "device state" to a set of "device flags",
# so we can disable specifically "stop getting new files" while we work :(
# Default policy structure are all of these fields.
# A minimum set of fields should be defined for a policy to be valid..
my %default_policy = (
# source
from_all_devs => 1,
from_hosts => [], # host ids.
from_devices => [], # dev ids.
from_percent_used => undef, # 0.nn * 100
from_percent_free => undef,
from_space_used => undef,
from_space_free => undef,
fid_age => 'old', # old|new
limit_type => 'device', # global|device
limit_by => 'none', # size|count|percent|none
limit => undef, # 100g|10%|5000
# target
to_all_devs => 1,
to_hosts => [],
to_devices => [],
to_percent_used => undef,
to_percent_free => undef,
to_space_used => undef,
to_space_free => undef,
not_to_hosts => [],
not_to_devices => [],
use_dest_devs => 'all', # all|N (list up to N devices to rep pol)
leave_in_drain_mode => 0,
);
# State policy example
my %default_state = (
completed_devs => [],
source_devs => [],
sdev_current => 0,
sdev_lastfid => 0,
sdev_limit => 0,
limit => 0,
fids_queued => 0,
bytes_queued => 0,
time_started => 0,
time_finished => 0,
time_stopped => 0,
time_latest_run => 0,
time_latest_empty_run => 0,
empty_runs => 0,
);
sub new {
my $class = shift;
my $policy = shift || "";
my $state = shift || '';
# Validate policy here?
my $self = bless {
policy => '',
state => '',
}, $class;
$self->policy($policy) if $policy;
$self->load_state($state) if $state;
return $self;
}
sub init {
my $self = shift;
my $devs = shift;
croak "policy object already initialized" if $self->{state};
croak "please pass in devices to filter" unless $devs && ref($devs);
my %state = %default_state;
# If we don't have an initial source device list, discover them.
# Used to filter destination devices later.
$state{source_devs} = $self->filter_source_devices($devs);
$state{time_started} = time();
$self->{state} = \%state;
}
sub stop {
my $self = shift;
my $p = $self->{policy};
my $s = $self->{state};
my $sdev = $self->{sdev_current};
unless ($p->{leave_in_drain_mode}) {
Mgd::get_store()->set_device_state($sdev, 'alive') if $sdev;
}
$s->{time_stopped} = time();
}
sub finish {
my $self = shift;
my $s = $self->{state};
$s->{time_finished} = time();
}
# Resume from saved as_string state.
sub load_state {
my $self = shift;
my $state = shift;
my $state_parsed = $self->_parse_settings($state, \%default_state);
# TODO: validate state?
$self->{state} = $state_parsed;
}
# Call as_string()? merge into load_state as "state"?
sub save_state {
my $self = shift;
return $self->_save_settings($self->{state});
}
sub source_devices {
my $self = shift;
return $self->{source_devs};
}
sub policy {
my $self = shift;
unless (@_) {
# TODO: serialize it or pass a structure?
return $self->{policy};
}
my $policy = shift;
$self->{policy} = $self->_parse_settings($policy, \%default_policy);
return $self->{policy};
}
sub _save_settings {
my $self = shift;
my $settings = shift;
my @tosave = ();
while (my ($key, $val) = each %{$settings}) {
# Only ref we support is ARRAY at the mo'...
if (ref($val) eq 'ARRAY') {
push(@tosave, $key . '=' . join(',', @$val));
} else {
push(@tosave, $key . '=' . $val);
}
}
return join(' ', @tosave);
}
# foo=bar foo2=bar2 foo3=baaz,quux
sub _parse_settings {
my $self = shift;
my $settings = shift;
my $constraint = shift || '';
my %parsed = ();
# the constraint also serves as a set of defaults.
%parsed = %{$constraint} if ($constraint);
return unless $settings;
# parse out from a string: key=value key=value
for my $tuple (split /\s/, $settings) {
my ($key, $value) = split /=/, $tuple;
if (index($value, ',') > -1) {
# ',' is reserved for multivalue types.
$value = [split /,/, $value];
}
# In the future we could do stronger type checking at load
# time, but for now this will happen at use time :/
if ($constraint) {
if (exists $constraint->{$key}) {
my $c = $constraint->{$key};
# default says we should be an array.
if (ref($c) && ref($c) eq 'ARRAY' && !ref($value)) {
$parsed{$key} = [$value];
} else {
$parsed{$key} = $value;
}
} else {
croak "Invalid setting $key";
}
} else {
$parsed{$key} = $value;
}
}
return \%parsed;
}
# step through the filters and find the next set of fids to rebalance.
# should $sto be passed in here or should we fetch it ourselves?
# also, should device info be passed in? I think so.
# returning 'undef' means there's nothing left
# returning an empty array means "try again"
sub next_fids_to_rebalance {
my $self = shift;
my $devs = shift;
my $sto = shift;
my $limit = shift || 100; # random low default.
# Balk unless we have a policy or a state?
my $policy = $self->{policy};
croak "No policy loaded" unless $policy;
croak "Must pass in device list" unless $devs;
croak "Must pass in storage object" unless $sto;
my $state = $self->{state};
# If we're not working against a source device, discover one
my $sdev = $self->_find_source_device($state->{source_devs});
return undef unless $sdev;
$sdev = Mgd::device_factory()->get_by_id($sdev);
my $filtered_destdevs = $self->filter_dest_devices($devs);
croak("rebalance cannot find suitable destination devices")
unless (@$filtered_destdevs);
my @fids = $sdev->fid_chunks(age => $policy->{fid_age},
fidid => $state->{sdev_lastfid},
limit => $limit);
# We'll wait until the next cycle to find a new sdev.
if (! @fids || ! $self->_check_limits) {
$self->_finish_source_device;
return [];
}
# In both old or new cases, the "last" fid in the list is correct.
$state->{sdev_lastfid} = $fids[-1]->id;
# TODO: create a filterset for $fid settings. filesize, class, domain, etc.
my @devfids = ();
for my $fid (@fids) {
# count the fid or size against device limit.
next unless $fid->exists;
$self->_check_limits($fid) or next;
my $destdevs = $self->_choose_dest_devs($fid, $filtered_destdevs);
# Update internal stats.
$state->{fids_queued}++;
$state->{bytes_queued} += $fid->length;
push(@devfids, [$fid->id, $sdev->id, $destdevs]);
}
$state->{time_latest_run} = time;
unless (@devfids) {
$state->{empty_runs}++;
$state->{time_latest_empty_run} = time;
}
# return block of fiddev combos.
return \@devfids;
}
# ensure this fid wouldn't overrun a limit.
sub _check_limits {
my $self = shift;
my $fid = shift;
my $p = $self->{policy};
my $s = $self->{state};
return 1 if ($p->{limit_by} eq 'none');
my $limit;
if ($p->{limit_type} eq 'global') {
$limit = \$s->{limit};
} else {
$limit = \$s->{sdev_limit};
}
if ($p->{limit_by} eq 'count') {
return $fid ? $$limit-- : $$limit;
} elsif ($p->{limit_by} eq 'size') {
if ($fid) {
if ($fid->length() <= $$limit) {
$$limit -= $fid->length();
return 1;
} else {
return 0;
}
} else {
if ($$limit < 1024) {
# Arbitrary "give up if there's less than 1kb in the limit"
# FIXME: Make this configurable
return 0;
} else {
return 1;
}
}
} else {
croak("uknown limit_by type");
}
}
# shuffle the list and return by limit.
# TODO: use the fid->length to ensure we don't send the file to devices
# that can't handle it.
sub _choose_dest_devs {
my $self = shift;
my $fid = shift;
my $filtered_devs = shift;
my $p = $self->{policy};
my @shuffled_devs = List::Util::shuffle(@$filtered_devs);
return \@shuffled_devs if ($p->{use_dest_devs} eq 'all');
return [splice @shuffled_devs, 0, $p->{use_dest_devs}];
}
# Iterate through all possible constraints until we have a final list.
# unlike the source list we try this
sub filter_source_devices {
my $self = shift;
my $devs = shift;
my $policy = $self->{policy};
my @sdevs = ();
for my $dev (@$devs) {
next unless $dev->can_delete_from;
my $id = $dev->id;
if (@{$policy->{from_devices}}) {
next unless grep { $_ == $id } @{$policy->{from_devices}};
}
if (@{$policy->{from_hosts}}) {
my $hostid = $dev->hostid;
next unless grep { $_ == $hostid } @{$policy->{from_hosts}};
}
# "at least this much used"
if ($policy->{from_percent_used}) {
# returns undef if it doesn't have stats on the device.
my $full = $dev->percent_full * 100;
next unless defined $full;
next unless $full > $policy->{from_percent_used};
}
# "at least this much free"
if ($policy->{from_percent_free}) {
# returns *0* if lacking stats. Must fix :(
my $free = $dev->percent_free * 100;
next unless $free; # hope this never lands at exact zero.
next unless $free > $policy->{from_percent_free};
}
# "at least this much used"
if ($policy->{from_space_used}) {
my $used = $dev->mb_used;
next unless $used && $used > $policy->{from_space_used};
}
# "at least this much free"
if ($policy->{from_space_free}) {
my $free = $dev->mb_free;
next unless $free && $free > $policy->{from_space_free};
}
push @sdevs, $id;
}
return \@sdevs;
}
sub _finish_source_device {
my $self = shift;
my $state = $self->{state};
my $policy = $self->{policy};
croak "Not presently working on a source device"
unless $state->{sdev_current};
delete $state->{sdev_lastfid};
delete $state->{sdev_limit};
my $sdev = delete $state->{sdev_current};
# Unless the user wants a device to never get new files again (sticking in
# drain mode), return to alive.
unless ($policy->{leave_in_drain_mode}) {
Mgd::get_store()->set_device_state($sdev, 'alive');
}
push @{$state->{completed_devs}}, $sdev;
}
# TODO: Be aware of down/unavail devices. temp skip them?
sub _find_source_device {
my $self = shift;
my $sdevs = shift;
my $state = $self->{state};
my $p = $self->{policy};
unless ($state->{sdev_current}) {
my $sdev = shift @$sdevs;
return undef, undef unless $sdev;
$state->{sdev_current} = $sdev;
$state->{sdev_lastfid} = 0;
my $limit;
if ($p->{limit_type} eq 'device') {
if ($p->{limit_by} eq 'size') {
# Parse the size (default in megs?) out into bytes.
$limit = $self->_human_to_bytes($p->{limit});
} elsif ($p->{limit_by} eq 'count') {
$limit = $p->{limit};
} elsif ($p->{limit_by} eq 'percent') {
croak("policy size limits by percent are unimplemented");
} elsif ($p->{limit_by} eq 'none') {
$limit = 'none';
}
}
# Must mark device in "drain" mode while we work on it.
Mgd::get_store()->set_device_state($sdev, 'drain');
$state->{sdev_limit} = $limit;
}
return $state->{sdev_current};
}
# FIXME: Move to MogileFS::Util
# take a numeric string with a char suffix and turn it into bytes.
# no suffix means it's already bytes.
sub _human_to_bytes {
my $self = shift;
my $num = shift;
my ($digits, $type);
if ($num =~ m/^(\d+)([bkmgtp])?$/i) {
$digits = $1;
$type = lc($2);
} else {
croak("Don't know what this number is: " . $num);
}
return $digits unless $type || $type eq 'b';
# Sorry, being cute here :P
return $digits * (1024 ** index('bkmgtpezy', $type));
}
# Apply policy to destination devices.
sub filter_dest_devices {
my $self = shift;
my $devs = shift;
my $policy = $self->{policy};
my $state = $self->{state};
# skip anything we would source from.
# FIXME: ends up not skipping stuff out of completed_devs? :/
my %sdevs = map { $_ => 1 } @{$state->{source_devs}},
@{$state->{completed_devs}}, $state->{sdev_current};
my @devs = grep { ! $sdevs{$_->id} } @$devs;
my @ddevs = ();
for my $dev (@devs) {
next unless $dev->should_get_new_files;
my $id = $dev->id;
my $hostid = $dev->hostid;
if (@{$policy->{to_devices}}) {
next unless grep { $_ == $id } @{$policy->{to_devices}};
}
if (@{$policy->{to_hosts}}) {
next unless grep { $_ == $hostid } @{$policy->{to_hosts}};
}
if (@{$policy->{not_to_devices}}) {
next if grep { $_ == $id } @{$policy->{not_to_devices}};
}
if (@{$policy->{not_to_hosts}}) {
next if grep { $_ == $hostid } @{$policy->{not_to_hosts}};
}
if ($policy->{to_percent_used}) {
my $full = $dev->percent_full * 100;
next unless defined $full;
next unless $full > $policy->{to_percent_used};
}
if ($policy->{to_percent_free}) {
my $free = $dev->percent_free * 100;
next unless $free; # hope this never lands at exact zero.
next unless $free > $policy->{to_percent_free};
}
if ($policy->{to_space_used}) {
my $used = $dev->mb_used;
next unless $used && $used > $policy->{to_space_used};
}
if ($policy->{to_space_free}) {
my $free = $dev->mb_free;
next unless $free && $free > $policy->{to_space_free};
}
push @ddevs, $id;
}
return \@ddevs;
}