The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package MogileFS::Rebalance;
use strict;
use warnings;
use Carp qw(croak);
use List::Util ();
use MogileFS::Server ();

# Note: The filters aren't written for maximum speed, as they're not likely
# in the slow path. They're supposed to be readable/extensible. Please don't
# cram them down unless you have to.
# TODO: allow filters to note by dev why they were filtered in/out, and return
# that info for DEBUG display.
# TODO: Add "debug trace" lines to most functions. "choosing sdev to work on",
# etc.
# TODO: tally into the state how many fids/size/etc it's done so far.
# TODO: should track old device state and return to it. Overall this is
# probably better fit by switching "device state" to a set of "device flags",
# so we can disable specifically "stop getting new files" while we work :(

# Default policy structure are all of these fields.
# A minimum set of fields should be defined for a policy to be valid..
my %default_policy = (
    # source
    from_all_devs => 1,
    from_hosts => [],           # host ids.
    from_devices => [],         # dev ids.
    from_percent_used => undef, # 0.nn * 100
    from_percent_free => undef,
    from_space_used => undef,
    from_space_free => undef,
    fid_age => 'old',           # old|new
    limit_type => 'device',     # global|device
    limit_by => 'none',         # size|count|percent|none
    limit => undef,             # 100g|10%|5000
    # target
    to_all_devs => 1,
    to_hosts => [],
    to_devices => [],
    to_percent_used => undef,
    to_percent_free => undef,
    to_space_used => undef,
    to_space_free => undef,
    not_to_hosts => [],
    not_to_devices => [],
    use_dest_devs => 'all',     # all|N (list up to N devices to rep pol)
    leave_in_drain_mode => 0,
);

# State policy example
my %default_state = (
    completed_devs => [],
    source_devs => [],
    sdev_current => 0,
    sdev_lastfid => 0,
    sdev_limit => 0,
    limit => 0,
    fids_queued => 0,
    bytes_queued => 0,
    time_started => 0,
    time_finished => 0,
    time_stopped => 0,
    time_latest_run => 0,
    time_latest_empty_run => 0,
    empty_runs => 0,
);

sub new {
    my $class  = shift;
    my $policy = shift || "";
    my $state  = shift || '';

    # Validate policy here?
    my $self = bless {
        policy => '',
        state => '',
    }, $class;

    $self->policy($policy) if $policy;
    $self->load_state($state) if $state;

    return $self;
}

sub init {
    my $self = shift;
    my $devs = shift;

    croak "policy object already initialized" if $self->{state};
    croak "please pass in devices to filter" unless $devs && ref($devs);
    my %state = %default_state;

    # If we don't have an initial source device list, discover them.
    # Used to filter destination devices later.
    $state{source_devs} = $self->filter_source_devices($devs);
    $state{time_started} = time();
    $self->{state} = \%state;
}

sub stop {
    my $self = shift;
    my $p    = $self->{policy};
    my $s    = $self->{state};
    my $sdev = $self->{sdev_current};
    unless ($p->{leave_in_drain_mode}) {
        Mgd::get_store()->set_device_state($sdev, 'alive') if $sdev;
    }
    $s->{time_stopped} = time();
}

sub finish {
    my $self = shift;
    my $s    = $self->{state};
    $s->{time_finished} = time();
}

# Resume from saved as_string state.
sub load_state {
    my $self  = shift;
    my $state = shift;
    my $state_parsed = $self->_parse_settings($state, \%default_state);
    # TODO: validate state?
    $self->{state} = $state_parsed;
}

# Call as_string()? merge into load_state as "state"?
sub save_state {
    my $self = shift;
    return $self->_save_settings($self->{state});
}

sub source_devices {
    my $self = shift;
    return $self->{source_devs};
}

sub policy {
    my $self = shift;
    unless (@_) {
        # TODO: serialize it or pass a structure?
        return $self->{policy};
    }
    my $policy = shift;
    $self->{policy} = $self->_parse_settings($policy, \%default_policy);
    return $self->{policy};
}

sub _save_settings {
    my $self     = shift;
    my $settings = shift;
    my @tosave   = ();
    while (my ($key, $val) = each %{$settings}) {
        # Only ref we support is ARRAY at the mo'...
        if (ref($val) eq 'ARRAY') {
            push(@tosave, $key . '=' . join(',', @$val));
        } else {
            push(@tosave, $key . '=' . $val);
        }
    }
    return join(' ', @tosave);
}

# foo=bar foo2=bar2 foo3=baaz,quux
sub _parse_settings {
    my $self       = shift;
    my $settings   = shift;
    my $constraint = shift || '';
    my %parsed     = ();
    # the constraint also serves as a set of defaults.
    %parsed = %{$constraint} if ($constraint);

    return unless $settings;
    # parse out from a string: key=value key=value
    for my $tuple (split /\s/, $settings) {
        my ($key, $value) = split /=/, $tuple;
        if (index($value, ',') > -1) {
            # ',' is reserved for multivalue types.
            $value = [split /,/, $value];
        }
        # In the future we could do stronger type checking at load
        # time, but for now this will happen at use time :/
        if ($constraint) {
            if (exists $constraint->{$key}) {
                my $c = $constraint->{$key};
                # default says we should be an array.
                if (ref($c) && ref($c) eq 'ARRAY' && !ref($value)) {
                    $parsed{$key} = [$value];
                } else {
                    $parsed{$key} = $value;
                }
            } else {
                croak "Invalid setting $key";
            }
        } else {
            $parsed{$key} = $value;
        }
    }
    return \%parsed;
}

# step through the filters and find the next set of fids to rebalance.
# should $sto be passed in here or should we fetch it ourselves?
# also, should device info be passed in? I think so.
# returning 'undef' means there's nothing left
# returning an empty array means "try again"
sub next_fids_to_rebalance {
    my $self  = shift;
    my $devs  = shift;
    my $sto   = shift;
    my $limit = shift || 100; # random low default.
    # Balk unless we have a policy or a state?
    my $policy = $self->{policy};
    croak "No policy loaded" unless $policy;
    croak "Must pass in device list" unless $devs;
    croak "Must pass in storage object" unless $sto;
    my $state = $self->{state};

    # If we're not working against a source device, discover one
    my $sdev = $self->_find_source_device($state->{source_devs});
    return undef unless $sdev;
    $sdev = Mgd::device_factory()->get_by_id($sdev);
    my $filtered_destdevs = $self->filter_dest_devices($devs);

    croak("rebalance cannot find suitable destination devices")
        unless (@$filtered_destdevs);

    my @fids = $sdev->fid_chunks(age => $policy->{fid_age},
        fidid => $state->{sdev_lastfid},
        limit => $limit);
    # We'll wait until the next cycle to find a new sdev.
    if (! @fids || ! $self->_check_limits) {
        $self->_finish_source_device;
        return [];
    }

    # In both old or new cases, the "last" fid in the list is correct.
    $state->{sdev_lastfid} = $fids[-1]->id;

    # TODO: create a filterset for $fid settings. filesize, class, domain, etc.
    my @devfids = ();
    for my $fid (@fids) {
        # count the fid or size against device limit.
        next unless $fid->exists;
        $self->_check_limits($fid) or next;
        my $destdevs = $self->_choose_dest_devs($fid, $filtered_destdevs);
        # Update internal stats.
        $state->{fids_queued}++;
        $state->{bytes_queued} += $fid->length;
        push(@devfids, [$fid->id, $sdev->id, $destdevs]);
    }

    $state->{time_latest_run} = time;
    unless (@devfids) {
        $state->{empty_runs}++;
        $state->{time_latest_empty_run} = time;
    }

    # return block of fiddev combos.
    return \@devfids;
}

# ensure this fid wouldn't overrun a limit.
sub _check_limits {
    my $self = shift;
    my $fid  = shift;
    my $p = $self->{policy};
    my $s = $self->{state};
    return 1 if ($p->{limit_by} eq 'none');

    my $limit;
    if ($p->{limit_type} eq 'global') {
        $limit = \$s->{limit};
    } else {
        $limit = \$s->{sdev_limit};
    }

    if ($p->{limit_by} eq 'count') {
        return $fid ? $$limit-- : $$limit;
    } elsif ($p->{limit_by} eq 'size') {
        if ($fid) {
            if ($fid->length() <= $$limit) {
                $$limit -= $fid->length();
                return 1;
            } else {
                return 0;
            }
        } else {
            if ($$limit < 1024) {
                # Arbitrary "give up if there's less than 1kb in the limit"
                # FIXME: Make this configurable
                return 0;
            } else {
                return 1;
            }
        }
    } else {
        croak("uknown limit_by type");
    }
}

# shuffle the list and return by limit.
# TODO: use the fid->length to ensure we don't send the file to devices
# that can't handle it.
sub _choose_dest_devs {
    my $self          = shift;
    my $fid           = shift;
    my $filtered_devs = shift;
    my $p             = $self->{policy};

    my @shuffled_devs = List::Util::shuffle(@$filtered_devs);
    return \@shuffled_devs if ($p->{use_dest_devs} eq 'all');

    return [splice @shuffled_devs, 0, $p->{use_dest_devs}];
}

# Iterate through all possible constraints until we have a final list.
# unlike the source list we try this 
sub filter_source_devices {
    my $self = shift;
    my $devs = shift;
    my $policy = $self->{policy};
 
    my @sdevs = ();
    for my $dev (@$devs) {
        next unless $dev->can_delete_from;
        my $id = $dev->id;
        if (@{$policy->{from_devices}}) {
            next unless grep { $_ == $id } @{$policy->{from_devices}};
        }
        if (@{$policy->{from_hosts}}) {
            my $hostid = $dev->hostid;
            next unless grep { $_ == $hostid } @{$policy->{from_hosts}};
        }
        # "at least this much used"
        if ($policy->{from_percent_used}) {
            # returns undef if it doesn't have stats on the device.
            my $full = $dev->percent_full * 100;
            next unless defined $full;
            next unless $full > $policy->{from_percent_used};
        }
        # "at least this much free"
        if ($policy->{from_percent_free}) {
            # returns *0* if lacking stats. Must fix :(
            my $free = $dev->percent_free * 100;
            next unless $free; # hope this never lands at exact zero.
            next unless $free > $policy->{from_percent_free};
        }
        # "at least this much used"
        if ($policy->{from_space_used}) {
            my $used = $dev->mb_used;
            next unless $used && $used > $policy->{from_space_used};
        }
        # "at least this much free"
        if ($policy->{from_space_free}) {
            my $free = $dev->mb_free;
            next unless $free && $free > $policy->{from_space_free};
        }
        push @sdevs, $id;
    }

    return \@sdevs;
}

sub _finish_source_device {
    my $self = shift;
    my $state  = $self->{state};
    my $policy = $self->{policy};
    croak "Not presently working on a source device"
        unless $state->{sdev_current};

    delete $state->{sdev_lastfid};
    delete $state->{sdev_limit};
    my $sdev = delete $state->{sdev_current};
    # Unless the user wants a device to never get new files again (sticking in
    # drain mode), return to alive.
    unless ($policy->{leave_in_drain_mode}) {
        Mgd::get_store()->set_device_state($sdev, 'alive');
    }
    push @{$state->{completed_devs}}, $sdev;
}

# TODO: Be aware of down/unavail devices. temp skip them?
sub _find_source_device {
    my $self  = shift;
    my $sdevs = shift;

    my $state = $self->{state};
    my $p     = $self->{policy};
    unless ($state->{sdev_current}) {
        my $sdev = shift @$sdevs;
        return undef, undef unless $sdev;
        $state->{sdev_current} = $sdev;
        $state->{sdev_lastfid} = 0;
        my $limit;
        if ($p->{limit_type} eq 'device') {
            if ($p->{limit_by} eq 'size') {
                # Parse the size (default in megs?) out into bytes.
                $limit = $self->_human_to_bytes($p->{limit});
            } elsif ($p->{limit_by} eq 'count') {
                $limit = $p->{limit};
            } elsif ($p->{limit_by} eq 'percent') {
                croak("policy size limits by percent are unimplemented");
            } elsif ($p->{limit_by} eq 'none') {
                $limit = 'none';
            }
        }
        # Must mark device in "drain" mode while we work on it.
        Mgd::get_store()->set_device_state($sdev, 'drain');
        $state->{sdev_limit} = $limit;
    }

    return $state->{sdev_current};
}

# FIXME: Move to MogileFS::Util
# take a numeric string with a char suffix and turn it into bytes.
# no suffix means it's already bytes.
sub _human_to_bytes {
    my $self = shift;
    my $num  = shift;

    my ($digits, $type);
    if ($num =~ m/^(\d+)([bkmgtp])?$/i) {
        $digits = $1;
        $type   = lc($2);
    } else {
        croak("Don't know what this number is: " . $num);
    }

    return $digits unless $type || $type eq 'b';
    # Sorry, being cute here :P
    return $digits * (1024 ** index('bkmgtpezy', $type));
}

# Apply policy to destination devices.
sub filter_dest_devices {
    my $self = shift;
    my $devs = shift;
    my $policy = $self->{policy};
    my $state  = $self->{state};

    # skip anything we would source from.
    # FIXME: ends up not skipping stuff out of completed_devs? :/
    my %sdevs = map { $_ => 1 } @{$state->{source_devs}},
        @{$state->{completed_devs}}, $state->{sdev_current};
    my @devs  = grep { ! $sdevs{$_->id} } @$devs;

    my @ddevs = ();
    for my $dev (@devs) {
        next unless $dev->should_get_new_files;
        my $id = $dev->id;
        my $hostid = $dev->hostid;

        if (@{$policy->{to_devices}}) {
            next unless grep { $_ == $id } @{$policy->{to_devices}};
        }
        if (@{$policy->{to_hosts}}) {
            next unless grep { $_ == $hostid } @{$policy->{to_hosts}};
        }
        if (@{$policy->{not_to_devices}}) {
            next if grep { $_ == $id } @{$policy->{not_to_devices}};
        }
        if (@{$policy->{not_to_hosts}}) {
            next if grep { $_ == $hostid } @{$policy->{not_to_hosts}};
        }
        if ($policy->{to_percent_used}) {
            my $full = $dev->percent_full * 100;
            next unless defined $full;
            next unless $full > $policy->{to_percent_used};
        }
        if ($policy->{to_percent_free}) {
            my $free = $dev->percent_free * 100;
            next unless $free; # hope this never lands at exact zero.
            next unless $free > $policy->{to_percent_free};
        }
        if ($policy->{to_space_used}) {
            my $used = $dev->mb_used;
            next unless $used && $used > $policy->{to_space_used};
        }
        if ($policy->{to_space_free}) {
            my $free = $dev->mb_free;
            next unless $free && $free > $policy->{to_space_free};
        }
        push @ddevs, $id;
    }

    return \@ddevs;
}