The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# AIO abstraction layer
#
# Copyright 2004, Danga Interactive, Inc.
# Copyright 2005-2007, Six Apart, Ltd.

package Perlbal::AIO;

use strict;
use POSIX qw(ENOENT EACCES EBADF);
use Fcntl qw(SEEK_CUR SEEK_SET SEEK_END O_RDWR O_CREAT O_TRUNC);

# Try and use IO::AIO, if it's around.
BEGIN {
    $Perlbal::OPTMOD_IO_AIO        = eval "use IO::AIO 1.6 (); 1;";
}

END {
    IO::AIO::max_parallel(0)
        if $Perlbal::OPTMOD_IO_AIO;
}

$Perlbal::AIO_MODE = "none";
$Perlbal::AIO_MODE = "ioaio" if $Perlbal::OPTMOD_IO_AIO;

############################################################################
# AIO functions available to callers
############################################################################

sub aio_rename {
    my ($srcpath, $dstpath, $user_cb) = @_;
    aio_channel_push(get_chan($srcpath), $user_cb, sub {
        my $cb = shift;

        if ($Perlbal::AIO_MODE eq "ioaio") {
            IO::AIO::aio_rename($srcpath, $dstpath, $cb);
        } else {
            my $rv = rename($srcpath, $dstpath);
            $rv = $rv ? 0 : -1;
            $cb->($rv);
        }
    });
}

sub aio_readahead {
    my ($fh, $offset, $length, $user_cb) = @_;

    aio_channel_push(get_chan(), $user_cb, sub {
        my $cb = shift;
        # $fh could end up closed.
        if ($Perlbal::AIO_MODE eq "ioaio" && defined fileno($fh)) {
            IO::AIO::aio_readahead($fh, $offset, $length, $cb);
        } else {
            $cb->();
        }
    });
}

sub aio_stat {
    my ($file, $user_cb) = @_;

    aio_channel_push(get_chan($file), $user_cb, sub {
        my $cb = shift;
        if ($Perlbal::AIO_MODE eq "ioaio") {
            IO::AIO::aio_stat($file, $cb);
        } else {
            stat($file);
            $cb->();
        }
    });
}

sub aio_open {
    my ($file, $flags, $mode, $user_cb) = @_;

    aio_channel_push(get_chan($file), $user_cb, sub {
        my $cb = shift;

        if ($Perlbal::AIO_MODE eq "ioaio") {
            IO::AIO::aio_open($file, $flags, $mode, $cb);
        } else {
            my $fh;
            my $rv = sysopen($fh, $file, $flags, $mode);
            $cb->($rv ? $fh : undef);
        }
    });
}

sub aio_unlink {
    my ($file, $user_cb) = @_;
    aio_channel_push(get_chan($file), $user_cb, sub {
        my $cb = shift;

        if ($Perlbal::AIO_MODE eq "ioaio") {
            IO::AIO::aio_unlink($file, $cb);
        } else {
            my $rv = unlink($file);
            $rv = $rv ? 0 : -1;
            $cb->($rv);
        }
    });
}

sub aio_write {
    #   0    1        2        3(data) 4
    my ($fh, $offset, $length, undef,  $user_cb) = @_;
    return no_fh($user_cb) unless $fh;
    my $alist = \@_;

    aio_channel_push(get_chan(), $user_cb, sub {
        my $cb = shift;
        if ($Perlbal::AIO_MODE eq "ioaio") {
            IO::AIO::aio_write($fh, $offset, $length, $alist->[3], 0, $cb);
        } else {
            my $old_off = sysseek($fh, 0, SEEK_CUR);
            sysseek($fh, $offset, 0);
            my $rv = syswrite($fh, $alist->[3], $length, 0);
            sysseek($fh, $old_off, SEEK_SET);
            $cb->($rv);
        }
    });
}

sub aio_read {
    #   0    1        2        3(data) 4
    my ($fh, $offset, $length, undef,  $user_cb) = @_;
    return no_fh($user_cb) unless $fh;
    my $alist = \@_;

    aio_channel_push(get_chan(), $user_cb, sub {
        my $cb = shift;
        if ($Perlbal::AIO_MODE eq "ioaio") {
            IO::AIO::aio_read($fh, $offset, $length, $alist->[3], 0, $cb);
        } else {
            my $old_off = sysseek($fh, 0, SEEK_CUR);
            sysseek($fh, $offset, 0);
            my $rv = sysread($fh, $alist->[3], $length, 0);
            sysseek($fh, $old_off, SEEK_SET);
            $cb->($rv);
        }
    });
}

############################################################################
# AIO channel stuff
#    prevents all AIO threads from being consumed by requests for same
#    failing/overloaded disk by isolating them into separate 'channels' in
#    parent process and not dispatching more than the max in-flight count
#    allows.  think of a channel as a named queue.  or in reality, a disk.
############################################################################

my %chan_outstanding;  # $channel_name -> $num_in_flight
my %chan_pending;      # $channel_name -> [ [$subref, $cb], .... ]
my %chan_hitmaxdepth;  # $channel_name -> $times_enqueued   (not dispatched immediately)
my %chan_submitct;     # $channel_name -> $times_submitted  (total AIO requests for this channel)
my $use_aio_chans = 0; # keep them off for now, until mogstored code is ready to use them
my $file_to_chan_hook; # coderef that returns $chan_name given a $filename

my %chan_concurrency;  # $channel_name -> concurrency per channel
                       #  (cache. definitive version via function call)

sub get_aio_stats {
    my $ret = {};
    foreach my $c (keys %chan_outstanding) {
        $ret->{$c} = {
            cur_running  => $chan_outstanding{$c},
            ctr_queued   => $chan_hitmaxdepth{$c} || 0,
            ctr_total    => $chan_submitct{$c},
        };
    }

    foreach my $c (keys %chan_pending) {
        my $rec = $ret->{$c} ||= {};
        $rec->{cur_queued} = scalar @{$chan_pending{$c}};
    }

    return $ret;
}

# (external API).  set trans hook, but also enables AIO channels.
sub set_file_to_chan_hook {
    $file_to_chan_hook = shift;   # coderef that returns $chan_name given a $filename
    $use_aio_chans     = 1;
}

# internal API:
sub aio_channel_push {
    my ($chan, $user_cb, $action) = @_;

    # if we were to do it immediately, bypassing AIO channels (future option?)
    unless ($use_aio_chans) {
        $action->($user_cb);
        return;
    }

    # IO::AIO/etc only take one callback.  so we wrap the user
    # (caller) function with our own that first calls theirs, then
    # does our bookkeeping and queue management afterwards.
    my $wrapped_cb = sub {
        $user_cb->(@_);
        $chan_outstanding{$chan}--;
        aio_channel_cond_run($chan);
    };

    # in case this is the first time this queue has been used, init stuff:
    my $chanpend = ($chan_pending{$chan} ||= []);
    $chan_outstanding{$chan} ||= 0;
    $chan_submitct{$chan}++;

    my $max_out  = $chan_concurrency{$chan} ||= aio_chan_max_concurrent($chan);

    if ($chan_outstanding{$chan} < $max_out) {
        $chan_outstanding{$chan}++;
        $action->($wrapped_cb);
        return;
    } else {
        # too deep.  enqueue.
        $chan_hitmaxdepth{$chan}++;
        push @$chanpend, [$action, $wrapped_cb];
    }
}

sub aio_chan_max_concurrent {
    my ($chan) = @_;
    return 100 if $chan eq '[default]';
    return 10;
}

sub aio_channel_cond_run {
    my ($chan) = @_;

    my $chanpend = $chan_pending{$chan} or return;
    my $max_out  = $chan_concurrency{$chan} ||= aio_chan_max_concurrent($chan);

    my $job;
    while ($chan_outstanding{$chan} < $max_out && ($job = shift @$chanpend)) {
        $chan_outstanding{$chan}++;
        $job->[0]->($job->[1]);
    }
}

my $next_chan;
sub set_channel {
    $next_chan = shift;
}

sub set_file_for_channel {
    my ($file) = @_;
    if ($file_to_chan_hook) {
        $next_chan = $file_to_chan_hook->($file);
    } else {
        $next_chan = undef;
    }
}

# gets currently-set channel, then clears it.  or if none set,
# lets registered hook set the channel name from the optional
# $file parameter.  the default channel, '[default]' has no limits
sub get_chan {
    return undef unless $use_aio_chans;
    my ($file) = @_;
    set_file_for_channel($file) if $file;

    if (my $chan = $next_chan) {
        $next_chan = undef;
        return $chan;
    }

    return "[default]";
}

############################################################################
# misc util functions
############################################################################

sub _fh_of_fd_mode {
    my ($fd, $mode) = @_;
    return undef unless defined $fd && $fd >= 0;

    #TODO: use the write MODE for the given $mode;
    my $fh = IO::Handle->new_from_fd($fd, 'r+');
    my $num = fileno($fh);
    return $fh;
}

sub no_fh {
    my $cb = shift;

    my $i = 1;
    my $stack_trace = "";
    while (my ($pkg, $filename, $line, $subroutine, $hasargs,
               $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller($i++)) {
        $stack_trace .= " at $filename:$line $subroutine\n";
    }

    Perlbal::log("crit", "Undef \$fh: $stack_trace");
    $cb->(undef);
    return undef;
}

1;