The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Yars::Tools;

use strict;
use warnings;
use Clustericious::Config;
use List::Util qw/shuffle/;
use List::MoreUtils qw/uniq/;
use Hash::MoreUtils qw/safe_reverse/;
use Clustericious::Log;
use File::Find::Rule;
use File::Basename qw/dirname/;
use Data::Dumper;
use File::Path qw/mkpath/;
use File::Temp;
use File::Compare;
use JSON::XS;
# TODO: rm dep on stat
use File::stat qw/stat/;
use Mojo::ByteStream qw/b/;
use File::HomeDir;
use File::Spec;
use Mojo::UserAgent;
use File::Spec;


# ABSTRACT: various utility functions dealing with servers, hosts, etc
our $VERSION = '0.98'; # VERSION


sub new
{
  my($class, $config) = @_;
  WARN "No url found in config file" unless eval { $config->url };
  bless {
    bucket_to_url                => { }, # map buckets to server urls
    bucket_to_root               => { }, # map buckets to disk roots
    disk_is_local                => { }, # our disk roots (values are just 1)
    servers                      => { }, # all servers
    our_url                      => '',  # our server url
    state_file                   => '',  # name of file with disk states
    ua                           => '',  # UserAgent
    server_status_cache          => {},
    server_status_cache_lifetime => 3,
    default_dir                  => '',
  }, $class;
}

sub _set_ua
{
  my($self, $ua) = @_;
  $self->{ua} = $ua;
  return;
}

sub _ua
{
  my($self) = @_;
  
  unless($self->{ua})
  {
    $self->{ua} = Mojo::UserAgent->new;
  }
  
  return $self->{ua};
}


sub refresh_config {
  my $self = shift;
  my $config = shift;
  return 1 if defined($self->{our_url}) && keys %{ $self->{bucket_to_root} } > 0 && keys %{ $self->{bucket_to_url} } > 0;
  $config ||= Clustericious::Config->new("Yars");
  $self->{our_url} ||= $config->url or WARN "No url found in config file";
  TRACE "Our url is " . $self->{our_url};
  for my $server ($config->servers) {
    $self->{servers}->{$server->{url}} = 1;
    for my $disk (@{ $server->{disks} }) {
        for my $bucket (@{ $disk->{buckets} }) {
            $self->{bucket_to_url}->{$bucket} = $server->{url};
            next unless $server->{url} eq $self->{our_url};
            $self->{bucket_to_root}->{$bucket} = $disk->{root};
            LOGDIE "Disk root not given" unless defined($disk->{root});
            $self->{disk_is_local}->{$disk->{root}} = 1;
        }
    }
  }
  my $default_dir = $self->{default_dir} = File::HomeDir->my_home . "/var/run/yars";
  
  my $state_file = $self->{state_file} = $config->state_file(default => "$default_dir/state.txt");
  -e $state_file or do {
    INFO "Writing new state file ($state_file)";
    my %disks = map { ($_ => "up") } keys %{ $self->{disk_is_local} };
    $self->_write_state({disks => \%disks});
  };
  -e $state_file or LOGDIE "Could not write state file $state_file";
  TRACE "bucket2url : ".Dumper($self->{bucket_to_url});
}

sub _dir_is_empty {
    # stolen from File::Find::Rule::DirectoryEmpty
    my $dir = shift;
    opendir( DIR, $dir ) or return;
    while ( $_ = readdir DIR ) {
        if ( !/^\.\.?$/ ) {
            closedir DIR;
            return 0;
        }
    }
    closedir DIR;
    return 1;
}


sub disk_for {
    my $self = shift;
    my $digest = shift;
    unless (keys %{ $self->{bucket_to_root} }) {
        $self->refresh_config;
        LOGDIE "No config data" unless keys %{ $self->{bucket_to_root} } > 0;
    }
    my ($bucket) = grep { $digest =~ /^$_/i } keys %{ $self->{bucket_to_root} };
    TRACE "no local disk for $digest in ".(join ' ', keys %{ $self->{bucket_to_root} }) unless defined($bucket);
    return unless defined($bucket);
    return $self->{bucket_to_root}->{$bucket};
}


sub local_buckets {
    my($self) = @_;
    $self->refresh_config unless keys %{ $self->{bucket_to_root} };
    my %r = safe_reverse $self->{bucket_to_root};
    do {$_ = [ $_ ] unless ref $_} for values %r;
    return %r;
}

sub _state {
    my $self = shift;
    $self->refresh_config() unless $self->{state_file} && -e $self->{state_file};
    # TODO: rm dep on File::stat
    return $self->{_state}->{cached} if $self->{_state}->{mod_time} && $self->{_state}->{mod_time} == stat($self->{state_file})->mtime;
    our $j ||= JSON::XS->new;
    -e $self->{state_file} or LOGDIE "Missing state file " . $self->{state_file};
    $self->{_state}->{cached} = $j->decode(Mojo::Asset::File->new(path => $self->{state_file})->slurp);
    # TODO: rm dep on File::stat
    $self->{_state}->{mod_time} = stat($self->{state_file})->mtime;
    return $self->{_state}->{cached};
}

sub _write_state {
    my $self = shift;
    my $state = shift;
    my $dir = dirname($self->{state_file});
    our $j ||= JSON::XS->new;
    mkpath $dir;
    my $temp = File::Temp->new(DIR => $dir, UNLINK => 0);
    print $temp $j->encode($state);
    $temp->close;
    rename "$temp", $self->{state_file} or return 0;
    return 1;
}


sub disk_is_up {
    my $class = shift;
    my $root = shift;
    return 0 if -d $root && ! -w $root;
    return 1 if ($class->_state->{disks}{$root} || 'up') eq 'up';
    return 0;
}


sub disk_is_up_verified
{
    my($self, $root) = @_;
    return unless $self->disk_is_up($root);
    my $tmpdir = File::Spec->catdir($root, 'tmp');
    my $temp;
    eval {
        use autodie;
        unless(-d $tmpdir)
        {
            mkpath $tmpdir;
            chmod 0777, $tmpdir;
        };
        $temp = File::Temp->new("disk_is_up_verifiedXXXXX", DIR => $tmpdir, SUFFIX => '.txt');
        print $temp "test";
        close $temp;
        die "file has zero size" if -z $temp->filename;
        unlink $temp->filename;
    };
    if(my $error = $@)
    {
        INFO "Create temp file in $tmpdir FAILED: $error";
        return;
    }
    else
    {
        INFO "created temp file to test status: " . $temp->filename;
        return 1;
    }
}


sub disk_is_down {
    return not shift->disk_is_up(@_);
}


sub disk_is_local {
    my $self = shift;
    my $root = shift;
    return $self->{disk_is_local}->{$root};
}


sub server_is_up {
    # TODO use state file for this
    my $self = shift;
    my $server_url = shift;
    if (exists($self->{server_status_cache}->{$server_url}) && $self->{server_status_cache}->{$server_url}{checked} > time - $self->{server_status_cache_lifetime}) {
        return $self->{server_status_cache}->{$server_url}{result};
    }
    TRACE "Checking $server_url/status";
    my $tx = $self->_ua->get( "$server_url/status" );
    $self->{server_status_cache}->{$server_url}{checked} = time;
    if (my $res = $tx->success) {
        my $got = $res->json;
        if (defined($got->{server_version}) && length($got->{server_version})) {
            return ($self->{server_status_cache}->{$server_url}{result} = 1);
        }
        TRACE "/status did not return version, got : ".Dumper($got);
        return ($self->{server_status_cache}->{$server_url}{result} = 0);
    }
    TRACE "Server $server_url is not up : response was ".$tx->error;
    return ($self->{server_status_cache}->{$server_url}{result} = 0);
}
sub server_is_down {
    return not shift->server_is_up(@_);
}

sub _touch {
    my $path = shift;
    my $dir = dirname($path);
    -d $dir or do {
        my $ok;
        eval { mkpath($dir); $ok = 1; };
        if($@) { WARN "mkpath $dir failed : $@;"; $ok = 0; };
        return 0 unless $ok;
    };
    open my $fp, ">>$path" or return 0;
    close $fp;
    return 1;
}


sub mark_disk_down {
    my $class = shift;
    my $root = shift;
    return 1 if $class->disk_is_down($root);
    my $state = $class->_state;
    INFO "Marking disk $root down";
    exists($state->{disks}{$root}) or WARN "$root not present in state file";
    $state->{disks}{$root} = 'down';
    $class->_write_state($state) and return 1;
    ERROR "Could not mark disk $root down";
    return 0;
}

sub mark_disk_up {
    my $class = shift;
    my $root = shift;
    return 1 if $class->disk_is_up($root);
    my $state = $class->_state;
    INFO "Marking disk $root up";
    $state->{disks}{$root} = 'up';
    $class->_write_state($state) and return 1;
    ERROR "Could not mark disk up";
    return 0;
}


sub server_for {
    my $self = shift;
    my $digest = shift;
    my $found;
    $self->refresh_config unless keys %{ $self->{bucket_to_url} } > 0;
    for my $i (0..length($digest)) {
        last if $found = $self->{bucket_to_url}->{ uc substr($digest,0,$i) };
        last if $found = $self->{bucket_to_url}->{ lc substr($digest,0,$i) };
    }
    return $found;
}


sub bucket_map {
    return shift->{bucket_to_url};
}


sub storage_path {
    my $class = shift;
    my $digest = shift;
    my $root = shift || $class->disk_for($digest) || LOGDIE "No local disk for $digest";
    return join "/", $root, ( grep length, split /(..)/, $digest );
}


sub remote_stashed_server {
    my $self = shift;
    my ($c,$filename,$digest) = @_;

    my $assigned_server = $self->server_for($digest);
    # TODO broadcast these requests all at once
    for my $server (shuffle(keys %{ $self->{servers} })) {
        next if $server eq $self->{our_url};
        next if $server eq $assigned_server;
        DEBUG "Checking remote $server for $filename";
        my $tx = $self->_ua->head( "$server/file/$filename/$digest", { "X-Yars-Check-Stash" => 1, "Connection" => "Close" } );
        if (my $res = $tx->success) {
            # Found it!
            return $server;
        }
    }
    return '';
}


sub local_stashed_dir {
    my $self = shift;
    my ($filename,$md5) = @_;
    for my $root ( shuffle(keys %{ $self->{disk_is_local} })) {
        my $dir = $self->storage_path($md5,$root);
        TRACE "Checking for $dir/$filename";
        return $dir if -r "$dir/$filename";
    }
    return '';
}


sub server_exists {
    my $self = shift;
    my $server_url = shift;
    return exists($self->{servers}->{$server_url}) ? 1 : 0;
}


sub server_url {
    return shift->{our_url};
}


sub disk_roots {
    return keys %{ shift->{disk_is_local} };
}


sub server_urls {
    return keys %{ shift->{servers} }
}


sub cleanup_tree {
    my $self = shift;
    my ($dir) = @_;
    while (_dir_is_empty($dir)) {
        last if $self->{disk_is_local}->{$dir};
        rmdir $dir or do { warn "cannot rmdir $dir : $!"; last; };
        $dir =~ s[/[^/]+$][];
     }
}


sub count_files {
    my $class = shift;
    my $dir = shift;
    -d $dir or return 0;
    my @list = File::Find::Rule->file->in($dir);
    return scalar @list;
}


sub human_size {
    my $class = shift;
    my $val   = shift;
    my @units = qw/B K M G T P/;
    my $unit = shift @units;
    do {
        $unit = shift @units;
        $val /= 1024;
    } until $val < 1024 || !@units;
    return sprintf( "%.0f%s", $val + 0.5, $unit );
}


sub content_is_same {
    my $class = shift;
    my ($filename,$asset) = @_;
    my $check;
    if ($asset->isa("Mojo::Asset::File")) {
        $asset->handle->flush;
        $check = ( compare($filename,$asset->path) == 0 );
    } else {
        # Memory asset.  Assume that if one can fit in memory, two can, too.
        my $existing = Mojo::Asset::File->new(path => $filename);
        $check = ( $existing->size == $asset->size && $asset->slurp eq $existing->slurp );
    }
    return $check;
}


sub hex2b64 {
    my $class = shift;
    my $hex = shift;
    my $b64 = b(pack 'H*', $hex)->b64_encode;
    local $/="\n";
    chomp $b64;
    return $b64;
}

sub b642hex {
    my $class = shift;
    my $b64 = shift;
    return unpack 'H*', b($b64)->b64_decode;
}


1;


__END__
=pod

=head1 NAME

Yars::Tools - various utility functions dealing with servers, hosts, etc

=head1 VERSION

version 0.98

=head1 DESCRIPTION

Just some useful functions here.

=head1 FUNCTIONS

=head2 new

Create a new instance of Yars::Tools

=head2 refresh_config

Refresh the configuration data cached in memory.

=head2 disk_for

Given an md5 digest, calculate the root directory of this file.
Undef is returned if this file does not belong on the current host.

=head2 local_buckets

Get a hash from disk to list of buckets for this server.

=head2 disk_is_up

Given a disk root, return true unless the disk is marked down.
A disk is down if the state file indicates it, or if it exists
but is unwriteable.

=head2 disk_is_up_verified

This is the same as disk_is_up, but doesn't trust the operating system, and
tries to write a file to the disk's temp directory and verify that the file
is not of zero size.

=head2 disk_is_down

Disk is not up.

=head2 disk_is_local

Return true if the disk is on this server.

=head2 server_is_up, server_is_down

Check to see if a remote server is up or down.

=head2 mark_disk_down, mark_disk_up

Mark a disk as up or down.

=head2 server_for

Given an md5, return the url for the server for this file.

=head2 bucket_map

Return a map from bucket prefix to server url.

=head2 storage_path

Calculate the directory of an md5 on disk.
Optionally pass a second parameter to force it onto a particular disk.

=head2 remote_stashed_server

Find a server which is stashing this file, if one exists.
Parameters :
    $c - controller
    $filename - filename
    $digest - digest

=head2 local_stashed_dir

Find a local directory stashing this file, if one exists.
Parameters :
    $filename - filename
    $digest - digest
Returns :
    The directory or false.

=head2 server_exists

Does this server exist?

=head2 server_url

Returns the url of the current server.

=head2 disk_roots

Return all the local directory roots, in a random order.

=head2 server_urls

Return all the other urls, in a random order.

=head2 cleanup_tree

Given a directory, traverse upwards until encountering
a local disk root or a non-empty directory, and remove
all empty directories.

=head2 count_files

Count the number of files in a directory tree.

=head2 human_size

Given a size, format it like df -kh

=head2 content_is_same

Given a filename and an Asset, return true if the
content is the same for both.

=head2 hex2b64, b642hex

Convert from hex to base 64.

=head1 SEE ALSO

L<Yars>, L<Yars::Client>

=head1 AUTHOR

original author: Marty Brandon

current maintainer: Graham Ollis <plicease@cpan.org>

contributors:

Brian Duggan

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2013 by NASA GSFC.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut