package MogileFS::Client::Async;
use strict;
use warnings;
use AnyEvent;
use AnyEvent::HTTP;
use AnyEvent::Socket;
use URI;
use Carp qw/confess/;
use POSIX qw( EAGAIN );
use Socket qw/ IPPROTO_TCP /;
use base qw/ MogileFS::Client /;
use IO::AIO qw/ fadvise /;
use constant TCP_CORK => ($^O eq "linux" ? 3 : 0); # XXX
our $VERSION = '0.022';
=head1 NAME
MogileFS:Client::Async
=head1 SYNOPSIS
my $mogfs = MogileFS::Client::Async->new( ... )
$mogfs->read_to_file($key, $filename);
$mogfs->store_file($key, $class, $filename, \%opts );
$mogfs->store_content($key, $class, \$content, \%opts );
=head1 DESCRIPTION
This package provides replacement implementations of some methods in
L<MogileFS::Client> to allow for non-blocking IO under L<AnyEvent> and the
ability to read and write files stored in MogileFS without needing to store
the entire file in memory.
=head1 SEE ALSO
=over
=item *
L<AnyEvent>
=item *
L<MogileFS::Client>
=item *
L<http://danga.com/mogilefs/>
=back
=cut
use namespace::clean;
sub new_file { confess("new_file is unsupported in " . __PACKAGE__) }
sub edit_file { confess("edit_file is unsupported in " . __PACKAGE__) }
sub read_file { confess("read_file is unsupported in " . __PACKAGE__) }
sub read_to_file {
my $self = shift;
my $key = shift;
my $fn = shift;
my @paths = $self->get_paths($key);
die("No paths for $key") unless @paths;
for (1..2) {
foreach my $path (@paths) {
my ($bytes, $write) = (0, undef);
open $write, '>', $fn or confess("Could not open $fn to write");
my $cv = AnyEvent->condvar;
my $h;
my $guard = http_request
GET => $path,
timeout => 120, # 2m
on_header => sub {
my ($headers) = @_;
return 0 if ($headers->{Status} != 200);
$h = $headers;
1;
},
on_body => sub {
syswrite $write, $_[0] or return 0;
$bytes += length($_[0]);
1;
},
sub { # On complete!
my (undef, $headers) = @_;
$h = $headers;
close($write);
undef $write;
$cv->send;
1;
};
$cv->recv;
return $bytes if ($bytes && !$write);
# Error..
$h->{Code} = 590;
$h->{Reason} = "Unknown error";
warn("HTTP error getting mogile $key: " . $h->{Reason} . "\n");
close $write;
unlink $fn;
}
}
confess("Could not read $key from mogile");
}
sub store_file {
my $self = shift;
return undef if $self->{readonly};
my ($key, $class, $file, $opts) = @_;
$opts ||= {};
# Extra args to be passed along with the create_open and create_close commands.
# Any internally generated args of the same name will overwrite supplied ones in
# these hashes.
my $create_open_args = $opts->{create_open_args} || {};
my $create_close_args = $opts->{create_close_args} || {};
$self->run_hook('store_file_start', $self, $key, $class, $opts);
$self->run_hook('new_file_start', $self, $key, $class, $opts);
my $res = $self->{backend}->do_request(
create_open => {
%$create_open_args,
domain => $self->{domain},
class => $class,
key => $key,
fid => $opts->{fid} || 0, # fid should be specified, or pass 0 meaning to auto-generate one
multi_dest => 1,
}
) or return undef;
my $dests = []; # [ [devid,path], [devid,path], ... ]
# determine old vs. new format to populate destinations
unless (exists $res->{dev_count}) {
push @$dests, [ $res->{devid}, $res->{path} ];
} else {
for my $i (1..$res->{dev_count}) {
push @$dests, [ $res->{"devid_$i"}, $res->{"path_$i"} ];
}
}
my ($length, $error, $devid, $path);
my @dests = (@$dests, @$dests, @$dests); # 2 retries
my $try = 0;
foreach my $dest (@dests) {
$try++;
($devid, $path) = @$dest;
my $uri = URI->new($path);
my $cv = AnyEvent->condvar;
my ($socket_guard, $socket_fh);
$socket_guard = tcp_connect $uri->host, $uri->port, sub {
my ($fh, $host, $port) = @_;
$error = $!;
if (!$fh) {
$cv->send;
return;
}
$socket_fh = $fh;
setsockopt($socket_fh, IPPROTO_TCP, TCP_CORK, 1) or warn "could not set TCP_CORK" if TCP_CORK;
$cv->send;
}, sub { 10 };
$cv->recv;
if (! $socket_fh) {
$error ||= 'unknown error';
warn("Connection error: $error to $path");
next;
}
undef $error;
# We are connected!
open my $fh_from, "<", $file or confess("Could not open $file");
# Hint to Linux that doubling readahead will probably pay off.
fadvise($fh_from, 0, 0, IO::AIO::FADV_SEQUENTIAL());
$length = -s $file;
my $buf = 'PUT ' . $uri->path . " HTTP/1.0\r\nConnection: close\r\nContent-Length: $length\r\n\r\n";
$cv = AnyEvent->condvar;
my $w;
my $timeout;
my $reset_timer = sub {
my ($type, $time) = @_;
$type ||= 'unknown';
$time ||= 60;
my $start = time();
$timeout = AnyEvent->timer(
after => $time,
cb => sub {
undef $w;
my $took = time() - $start;
$error = "Connection timed out duing data transfer of type $type (after $took seconds)";
$cv->send;
},
);
};
$w = AnyEvent->io( fh => $socket_fh, poll => 'w', cb => sub {
$reset_timer->('read');
if (!length($buf)) {
my $bytes = sysread $fh_from, $buf, '4096';
$reset_timer->('write');
if (!defined $bytes) { # Error, read FH blocking, no need to check EAGAIN
$error = $!;
$cv->send;
return;
}
if (0 == $bytes) { # EOF reading, and we already wrote everything
$cv->send;
return;
}
}
my $len = syswrite $socket_fh, $buf;
$reset_timer->('loop');
if ($len && $len > 0) {
$buf = substr $buf, $len;
}
if (!defined $len && $! != EAGAIN) { # Error, we could get EAGAIN as write sock non-blocking
$error = $!;
$cv->send;
return;
}
});
$reset_timer->('start PUT');
$cv->recv;
setsockopt($socket_fh, IPPROTO_TCP, TCP_CORK, 0) or warn "could not unset TCP_CORK" if TCP_CORK;
$cv = AnyEvent->condvar;
# FIXME - Cheat here, the response should be small, so we assume it'll allways all be
# readable at once, THIS MAY NOT BE TRUE!!!
$w = AnyEvent->io( fh => $socket_fh, poll => 'r', cb => sub {
undef $timeout;
undef $w;
$cv->send;
my $buf;
do {
if ($socket_fh->eof) {
$error = "Connection closed unexpectedly without response";
return;
}
my $res; $socket_fh->read($res, 4096); $buf .= $res;
} while (!length($buf));
my ($top, @headers) = split /\r?\n/, $buf;
if ($top =~ m{HTTP/1.[01]\s+2\d\d}) {
# Woo, 200!
undef $error;
}
else {
$error = "Got non-200 from remote server $top";
}
});
$reset_timer->('response', 1200); # Wait up to 20m, as lighty
# may have to copy the file between
# disks. EWWWW
$cv->recv;
undef $timeout;
if ($error) {
warn("Error sending data (try $try) to $uri: $error");
next; # Retry
}
last; # Success
}
die("Could not write to any mogile hosts, should have tried " . scalar(@$dests) . " did try $try")
if $error;
$self->run_hook('new_file_end', $self, $key, $class, $opts);
my $rv = $self->{backend}->do_request
("create_close", {
fid => $res->{fid},
devid => $devid,
domain => $self->{domain},
size => $length,
key => $key,
path => $path,
});
unless ($rv) {
die "$self->{backend}->{lasterr}: $self->{backend}->{lasterrstr}";
return undef;
}
$self->run_hook('store_file_end', $self, $key, $class, $opts);
return $length;
}
sub store_content {
my MogileFS::Client $self = shift;
return undef if $self->{readonly};
my($key, $class, $content, $opts) = @_;
$self->run_hook('store_content_start', $self, $key, $class, $opts);
my $fh = $self->new_file($key, $class, undef, $opts) or return;
$content = ref($content) eq 'SCALAR' ? $$content : $content;
$fh->print($content);
$self->run_hook('store_content_end', $self, $key, $class, $opts);
$fh->close or return;
length($content);
}
1;