package Yars::Client;
# ABSTRACT: Yet Another RESTful-Archive Service Client
our $VERSION = '1.04'; # VERSION
use strict;
use warnings;
use Clustericious::Client;
use Clustericious::Client::Command;
use Clustericious::Config;
use Mojo::Asset::File;
use Mojo::ByteStream 'b';
use Mojo::URL;
use Mojo::Base '-base';
use File::Basename;
use File::Spec;
use Log::Log4perl qw(:easy);
use Digest::file qw/digest_file_hex/;
use Data::Dumper;
use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
use Number::Bytes::Human qw( format_bytes parse_bytes );
use 5.10.0;
route_doc upload => "<filename>";
route_doc content => "<filename> <md5>";
route_doc download => "<filename> <md5> [dir]";
route_doc remove => "<filename> <md5>";
has bucket_map_cached => sub { 0; }; # Computed on demand.
route 'welcome' => "GET", '/';
route 'bucket_map' => "GET", '/bucket_map';
route 'disk_usage' => "GET", '/disk/usage';
route 'bucket_usage' => "GET", '/bucket/usage';
route 'servers_status' => "GET", '/servers/status';
route 'get' => "GET", '/file', \"<md5> <filename>";
route 'check' => "HEAD", '/file', \"<md5> <filename>";
route 'set_status' => "POST", '/disk/status';
route 'check_files' => "POST", '/check/manifest';
route_meta 'welcome' => { auto_failover => 1, dont_read_files => 1 };
route_meta 'bucket_map' => { auto_failover => 1, dont_read_files => 1 };
route_meta 'disk_usage' => { auto_failover => 1, dont_read_files => 1 };
route_meta 'bucket_usage' => { auto_failover => 1, dont_read_files => 1 };
route_meta 'servers_status' => { auto_failover => 1, dont_read_files => 1 };
route_meta 'get' => { auto_failover => 1, dont_read_files => 1 };
route_meta 'check' => { auto_failover => 1, dont_read_files => 1 };
route_meta 'set_status' => { auto_failover => 1, dont_read_files => 1 };
route_meta 'check_files' => { auto_failover => 1, dont_read_files => 1 };
route_meta 'upload' => { dont_read_files => 1 };
route_meta 'download' => { dont_read_files => 1 };
route_meta 'check_manifest' => { dont_read_files => 1 };
route_meta 'check' => { dont_read_files => 1 };
route_args send => [
{ name => 'content', type => '=s', required => 1 },
{ name => 'name', type => '=s' },
];
route_args retrieve => [
{ name => 'location', type => '=s' },
{ name => 'name', type => '=s' },
{ name => 'md5', type => '=s' },
];
sub new {
my $self = shift->SUPER::new(@_);
$self->client->max_redirects(30);
$self->client->connect_timeout(30);
return $self;
}
sub _get_url {
# Helper to create the Mojo URL objects
my ($self, $path) = @_;
my $url = Mojo::URL->new( $self->server_url );
$url->path($path) if $path;
return $url;
}
sub _hex2b64 {
my $hex = shift or return;
my $b64 = b(pack 'H*', $hex)->b64_encode;
local $/="\n";
chomp $b64;
return $b64;
}
sub _b642hex {
my $b64 = shift or return;
# Mojo::Headers apparently become array refs sometimes
$b64 = $b64->[0] if ref($b64) eq 'ARRAY';
return unpack 'H*', b($b64)->b64_decode;
}
sub location {
my ($self, $filename, $md5) = @_;
( $filename, $md5 ) = ( $md5, $filename ) if $filename =~ /^[0-9a-f]{32}$/i;
LOGDIE "Can't compute location without filename" unless defined($filename);
LOGDIE "Can't compute location without md5" unless $md5;
$self->server_url($self->_server_for($md5));
return $self->_get_url("/file/$md5/$filename")->to_abs->to_string;
}
sub download {
# Downloads a file and saves it to disk.
my $self = shift;
my ( $filename, $md5, $dest_dir ) = @_;
my $abs_url;
if (@_ == 1) {
$abs_url = shift;
($filename) = $abs_url =~ m|/([^/]+)$|;
}
( $filename, $md5 ) = ( $md5, $filename ) if $filename =~ /^[0-9a-f]{32}$/i;
if (!$md5 && !$abs_url) {
LOGDIE "Need either an md5 or a url: download(url) or download(filename, md5, [dir] )";
}
my @hosts;
@hosts = $self->_all_hosts($self->_server_for($md5)) unless $abs_url;
my $tries = 0;
my $success = 0;
my $host = 0;
while ($tries++ < 10) {
if ($tries > @hosts + 1) {
TRACE "Attempt $tries";
WARN "Waiting $tries seconds before retrying...";
sleep $tries;
}
my $url;
if ($abs_url) {
$url = $abs_url;
} else {
$host = 0 if $host > $#hosts;
$url = Mojo::URL->new($hosts[$host++]);
$url->path("/file/$filename/$md5");
}
TRACE "GET $url";
my $tx = $self->client->build_tx(GET => $url, { "Connection" => "Close", "Accept-Encoding" => "gzip" } );
# TODO: set timeout for mojo 4.0
$tx->res->max_message_size(parse_bytes($self->_config->max_message_size_client(default => 53687091200)));
$self->client->start($tx);
$self->res($tx->res);
$self->tx($tx);
my $res = $tx->success or do {
my ($msg,$code) = $tx->error;
if ($code) {
ERROR "$code $msg";
last;
}
if ($msg =~ /connection refused/i) {
WARN "Error : $msg (may retry)";
next;
}
WARN "Error (may retry) : $msg";
next;
};
DEBUG "Received asset with size ".$res->content->asset->size;
TRACE "Received headers : ".$res->headers->to_string;
my $out_file = $dest_dir ? $dest_dir . "/$filename" : $filename;
DEBUG "Writing to $out_file";
if (my $e = $res->headers->header("Content-Encoding")) {
LOGDIE "unsupported encoding" unless $e eq 'gzip';
# This violate the spec (MD5s depend on transfer-encoding
# not content-encoding, per
# http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html
# but we must support it.
TRACE "unzipping $out_file";
my $asset = $res->content->asset;
gunzip($asset->is_file ? $asset->path : \( $asset->slurp )
=> $out_file) or do {
unlink $out_file;
LOGDIE "Gunzip failed : $GunzipError";
};
} else {
$res->content->asset->move_to($out_file);
}
my $verify = digest_file_hex($out_file,'MD5');
$md5 ||= _b642hex($res->headers->header("Content-MD5"));
unless ($md5) {
WARN "No md5 in response header";
next;
}
unless ($verify eq $md5) {
WARN "Bad md5 for file (got $verify instead of $md5)";
WARN "Response headers : ".$res->headers->to_string;
unlink $out_file or WARN "couldn't remove $out_file : $!";
WARN "Removed $out_file. This is attempt $tries.";
next;
}
$success = 1;
last;
}
ERROR "Download failed." unless $success;
return '' unless $success;
return 'ok'; # return TRUE
}
sub remove {
# Removes a file
my ( $self, $filename, $md5 ) = @_;
LOGDIE "file and md5 needed for remove"
unless $filename && $md5;
my $url = $self->_get_url("/file/$md5/$filename");
TRACE("removing $filename $md5 from ", $url->to_string);
# Delete the file
$self->_doit(DELETE => $url);
}
# Given an md5, determine the correct server
# using a cached list of bucket->server assignments.
sub _server_for {
my $self = shift;
my $md5 = shift or LOGDIE "Missing argument md5";
my $bucket_map = $self->bucket_map_cached;
unless ($bucket_map && ref($bucket_map) eq 'HASH' && keys %$bucket_map > 0) {
$bucket_map = $self->bucket_map or WARN $self->errorstring;
$self->bucket_map_cached({ %$bucket_map }) if $bucket_map && ref $bucket_map && (keys %$bucket_map > 0);
}
unless ($bucket_map && ref $bucket_map && (keys %$bucket_map > 0)) {
WARN "Failed to retrieve bucket map, using ".$self->server_url;
return $self->server_url;
}
for (0..length($md5)) {
my $prefix = substr($md5,0,$_);
return $bucket_map->{ lc $prefix } if exists($bucket_map->{lc $prefix});
return $bucket_map->{ uc $prefix } if exists($bucket_map->{uc $prefix});
}
LOGDIE "Can't find url for $md5 in bucket map : ".Dumper($bucket_map);
}
sub put {
my $self = shift;
my $remote_filename = shift;
my $content = shift || join '', <STDIN>;
# NB: slow for large content.
my $md5 = b($content)->md5_sum;
my $url = Mojo::URL->new($self->_server_for($md5));
$url->path("/file/$remote_filename");
TRACE "PUT $url";
my $tx = $self->client->put("$url" => { "Content-MD5" => _hex2b64($md5), "Connection" => "Close" } => $content);
$self->res($tx->res);
$self->tx($tx);
return $tx->success ? 'ok' : '';
}
sub _all_hosts {
my $self = shift;
my $assigned = shift;
# Return all the hosts, any parameter will be put first in
# the list.
my @servers = ($assigned);
push @servers, $self->server_url;
push @servers, $self->_config->url;
push @servers, @{ $self->_config->failover_urls(default => []) };
my %seen;
return grep { !$seen{$_}++ } @servers;
}
sub upload {
my $self = shift;
my $filename = pop;
my $nostash;
if (@_) {
# To avoid failover :
# yarsclient upload --nostash 1 foo
# $yc->upload("--nostash",1","foo")
# This is undocumented since it is only intended to be
# used on a server when balancing, not as a public interface.
if ($_[0] =~ /nostash$/) {
shift;
$nostash = shift;
}
}
if (@_) {
LOGDIE "unknown options to upload : @_";
}
LOGDIE "file needed for upload" unless $filename;
$filename = File::Spec->rel2abs($filename);
-r $filename or LOGDIE "Could not read " . $filename;
# Don't read the file.
my $basename = basename($filename);
my $asset = Mojo::Asset::File->new( path => $filename );
my $md5 = digest_file_hex($filename, 'MD5');
my @servers = $self->_all_hosts( $self->_server_for($md5) );
my $tx;
my $code;
my $host;
while (!$code && ($host = shift @servers)) {
my $url = Mojo::URL->new($host);
$url->path("/file/$basename/$md5");
DEBUG "Sending $md5 to $url";
my @nostash = ($nostash ? ("X-Yars-NoStash" => 1) : ());
$tx = $self->client->build_tx(
PUT => "$url" => {
@nostash,
"Content-MD5" => _hex2b64($md5),
"Connection" => "Close"
}
);
$tx->req->content->asset($asset);
# TODO: set timeout for mojo 4.0
$tx = $self->client->start($tx);
$code = $tx->res->code;
$self->res($tx->res);
$self->tx($tx);
if (!$tx->success) {
my ($msg,$code) = $tx->error;
if ($code) {
INFO "Failed to PUT to $host : $code $msg";
} else {
INFO "PUT to $host failed : ".$msg;
}
}
}
return '' if !$code || !$tx->res->is_status_class(200);
DEBUG "Response : ".$tx->res->code." ".$tx->res->message;
$self->res($tx->res);
return 'ok';
}
sub _rand_filename {
my $a = '';
$a .= ('a'..'z','A'..'Z',0..9)[rand 62] for 1..43;
return $a;
}
sub send {
my $self = shift;
my %args = $self->meta_for->process_args(@_);
my $content = $args{content};
my $filename = $args{name} || $self->_rand_filename;
my $status = $self->put($filename, $content);
return unless $status eq 'ok';
return $self->res->headers->location;
}
sub retrieve {
my $self = shift;
my %args = $self->meta_for->process_args(@_);
if (my $location = $args{location}) {
my $tx = $self->client->get($location);
my $res = $tx->success or do {
$self->tx($tx);
$self->res($tx->res);
return;
};
return $res->body;
}
my $md5 = $args{md5} or LOGDIE "need md5 or location to retrieve";
my $name = $args{name} or LOGDIE "need name or location to retrieve";
return $self->get($md5,$name);
}
sub res_md5 {
my $self = shift;
my $res = $self->res or return;
if (my $b64 = $res->headers->header("Content-MD5")) {
return _b642hex($b64);
}
if (my $location = $res->headers->location) {
my ($md5) = $location =~ m[/file/([0-9a-f]{32})/];
return $md5;
}
return;
}
sub check_manifest {
my $self = shift;
my @args = @_;
my $check = 0;
my $params = "";
my $manifest;
while ($_ = shift @_) {
/^-c$/ and do { $check = 1; next; };
/^--show_corrupt$/ and do { $params = "?show_corrupt=" . shift; next; };
$manifest = $_;
}
LOGDIE "Missing manifest" unless $manifest;
LOGDIE "Cannot open manifest $manifest" unless -e $manifest;
my $contents = Mojo::Asset::File->new(path => $manifest)->slurp;
my $got = $self->_doit(POST => "/check/manifest$params", { manifest => $contents });
return $got unless $self->tx->success;
$got->{$manifest} = (@{$got->{missing}}==0 ? 'ok' : 'not ok');
return { $manifest => $got->{$manifest} } if $check;
return $got;
}
sub remote {
my $self = shift;
$self->bucket_map_cached(0);
$self->SUPER::remote(@_);
}
1;
=pod
=head1 NAME
Yars::Client - Yet Another RESTful-Archive Service Client
=head1 VERSION
version 1.04
=head1 SYNOPSIS
my $r = Yars::Client->new;
# Send and retrieve content.
my $location = $y->send(content => 'hello, world') or die $y->errorstring;
say $y->retrieve(location => $location);
# Alternatively, use names and md5s explicitly.
my $location = $y->send(content => 'hello there', name => "greeting");
my $md5 = $y->res_md5;
say $y->retrieve(filename => 'greeting', md5 => $md5);
# Upload a file.
$r->upload($filename) or die $r->errorstring;
print $r->res->headers->location;
# Download a file.
$r->download($filename, $md5) or die $r->errorstring;
$r->download($filename, $md5, '/tmp'); # download it to the /tmp directory
$r->download("http://yars/0123456890abc/filename.txt"); # Write filename.txt to current directory.
# More concise version of retrieve.
my $content = $r->get($filename,$md5);
# Delete a file.
$r->remove($filename, $md5) or die $r->errorstring;
# Compute the URL of a file based on the md5 and the buckets.
print $r->location($filename, $md5);
print "Server version is ".$r->status->{server_version};
my $usage = $r->disk_usage(); # Returns usage for a single server.
my $nother_usage = Yars::Client->new(url => "http://host1:9999")->disk_usage();
my $status = $r->servers_status(); # return a hash of servers, disks, and their statuses
# Mark a disk down.
my $ok = $r->set_status({ root => "/acps/disk/one", state => "down" });
my $ok = $r->set_status({ root => "/acps/disk/one", state => "down", host => "http://host2" });
# Mark a disk up.
my $ok = $r->set_status({ root => "/acps/disk/one", state => "up" });
# Check a manifest file or list of files.
my $details = $r->check_manifest( $filename );
my $check = $r->check_manifest( "-c", $filename );
my $check = $r->check_manifest( "--show_corrupt" => 1, $filename );
my $ck = $r->check_files({ files => [
{ filename => $f1, md5 => $m1 },
{ filename => $f2, md5 => $m2 } ] });
=head1 DESCRIPTION
Client for L<Yars>.
=head1 SEE ALSO
L<yarsclient>, L<Clustericious::Client>
=cut
=head1 AUTHOR
original author: Marty Brandon
current maintainer: Graham Ollis <plicease@cpan.org>
contributors:
Brian Duggan
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2013 by NASA GSFC.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut
__END__