The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# BEGIN BPS TAGGED BLOCK {{{
# COPYRIGHT:
# 
# This software is Copyright (c) 2003-2006 Best Practical Solutions, LLC
#                                          <clkao@bestpractical.com>
# 
# (Except where explicitly superseded by other copyright notices)
# 
# 
# LICENSE:
# 
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of either:
# 
#   a) Version 2 of the GNU General Public License.  You should have
#      received a copy of the GNU General Public License along with this
#      program.  If not, write to the Free Software Foundation, Inc., 51
#      Franklin Street, Fifth Floor, Boston, MA 02110-1301 or visit
#      their web page on the internet at
#      http://www.gnu.org/copyleft/gpl.html.
# 
#   b) Version 1 of Perl's "Artistic License".  You should have received
#      a copy of the Artistic License with this package, in the file
#      named "ARTISTIC".  The license is also available at
#      http://opensource.org/licenses/artistic-license.php.
# 
# This work is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
# 
# CONTRIBUTION SUBMISSION POLICY:
# 
# (The following paragraph is not intended to limit the rights granted
# to you to modify and distribute this software under the terms of the
# GNU General Public License and is only of importance to you if you
# choose to contribute your changes and enhancements to the community
# by submitting them to Best Practical Solutions, LLC.)
# 
# By intentionally submitting any modifications, corrections or
# derivatives to this work, or any other work intended for use with SVK,
# to Best Practical Solutions, LLC, you confirm that you are the
# copyright holder for those contributions and you grant Best Practical
# Solutions, LLC a nonexclusive, worldwide, irrevocable, royalty-free,
# perpetual, license to use, copy, create derivative works based on
# those contributions, and sublicense and distribute those contributions
# and any derivatives thereof.
# 
# END BPS TAGGED BLOCK }}}
package SVK::Mirror::Backend::SVNRaPipe;
use strict;

use base 'Class::Accessor::Fast';
__PACKAGE__->mk_accessors(qw(ra requests fh unsent_buf buf_call current_editors pid));

use POSIX 'EPIPE';
use Socket;
use Storable qw(nfreeze thaw);
use SVK::Editor::Serialize;
use SVK::Util qw(slurp_fh);
use SVK::Config;
use SVK::I18N;

=head1 NAME

SVK::Mirror::Backend::SVNRaPipe - Transparent SVN::Ra requests pipelining

=head1 SYNOPSIS

 my @req = (['rev_proplist', 3'], ['replay', 3 0, 1, 'EDITOR'])
 $generator = sub { shift @req };
 $pra = SVK::Mirror::Backend::SVNRaPipe->new($ra, $generator);

 $pra->rev_proplsit(3);
 $pra->replay(3, 0, 1, SVK::Editor->new);

=head1 DESCRIPTION



=cut

sub new {
    my ($class, $ra , $gen) = @_;

    socketpair(my $c, my $p, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
	or  die "socketpair: $!";

    my $self = $class->SUPER::new(
        {
            ra              => $ra,
            requests        => $gen,
            fh              => $c,
            current_editors => 0,
            buf_call        => [],
            unsent_buf      => ''
        }
    );

    if (my $pid = fork) {
	close $p;
	$self->pid($pid);
	return $self;
    }
    else {
	die "cannot fork: $!" unless defined $pid;
	close $c;
    }

    $self->fh($p);
    $File::Temp::KEEP_ALL = 1;
    # Begin external process for buffered ra requests and send response to parent.
    my $config = SVK::Config->svnconfig;
    my $max_editor_in_buf
        = $config ? $config->{config}->get( 'svk', 'ra-pipeline-buffer', '5' ) : 5;
    my $pool = SVN::Pool->new_default;
    local $SIG{INT} = 'IGNORE';
    while ( my $req = $gen->() ) {
        $pool->clear;
        my ( $cmd, @arg ) = @$req;

        my $default_threshold = 2 * 1024 * 1024;
        @arg = map {
            $_ eq 'EDITOR'
                ? SVK::Editor::Serialize->new(
                  { textdelta_threshold => $config ? $config->{config}->get(
                        'svk', 'ra-pipeline-delta-threshold',
                        "$default_threshold"
                    ) : $default_threshold,
                    cb_serialize_entry =>
                        sub { $self->_enqueue(@_); $self->try_flush }
                  } )
                : $_
        } @arg;

	# Note that we might want to switch to bandwidth based buffering,
	while ($self->current_editors > $max_editor_in_buf) {
	    $self->try_flush(1);
	}

	my $ret = $self->ra->$cmd(@arg);
	if ($cmd eq 'replay') { # XXX support other requests using editors
	    ++$self->{current_editors};
	    $self->_enqueue([undef, 'close_edit']);
	}
	else {
	    $self->_enqueue([$ret, $cmd]);
	}
	$self->try_flush();
    }

    while ($#{$self->buf_call} >= 0) {
	$self->try_flush($p, 1) ;
    }
    exit;
}

sub _enqueue {
    my ($self, $entry) = @_;
    push @{$self->buf_call}, $entry;
}

sub try_flush {
    my $self = shift;
    my $wait = shift;
    my $max_write = $wait ? -1 : 10;
    if ($wait) {
	$self->fh->blocking(1);
    }
    else {
	$self->fh->blocking(0);
	my $wstate = '';
	vec($wstate,fileno($self->fh),1) = 1;
	select(undef, $wstate, undef, 0);;
	return unless vec($wstate,fileno($self->fh),1);
    }
    my $i = 0;
    my $buf = $self->buf_call;
    while ( $#{$buf} >= 0 || length($self->unsent_buf) ) {
	if (my $len = length $self->unsent_buf) {
	    if (my $ret = syswrite($self->fh, $self->unsent_buf)) {
		substr($self->{unsent_buf}, 0, $ret, '');
		last if $ret != $len;
	    }
	    else {
		die if $! == EPIPE;
		return;
	    }
	}
	last if $#{$buf} < 0;
	my $msg = nfreeze($buf->[0]);
	$msg = pack('N', length($msg)).$msg;

	if (my $ret = syswrite($self->fh, $msg)) {
	    $self->{unsent_buf} .= substr($msg, $ret)  if length($msg) != $ret;
	    if ((shift @$buf)->[1] eq 'close_edit') {
		--$self->{current_editors} ;
	    }
	}
	else {
	    die if $! == EPIPE;
	    # XXX: check $! for fatal
	    last;
	}
    }
}

# Client code reading pipelined responses

sub read_msg {
    my $self = shift;
    my ($len, $msg);
    read $self->fh, $len, 4 or Carp::confess $!;
    $len = unpack ('N', $len);
    my $rlen = read $self->fh, $msg, $len or die $!;
    return \$msg;
}

sub ensure_client_cmd {
    my ($self, @arg) = @_;
    # XXX: error message
    my @exp = @{$self->requests->()};
    for (@exp) {
	my $arg = shift @arg;
	if ($_ eq 'EDITOR') {
	    die unless UNIVERSAL::isa($arg, 'SVK::Editor');
	    return $arg;
	}
	Carp::confess "pipeline ra error: got $arg but expecting $_" if ($_ cmp $arg);
    }
    die join(',',@arg) if @arg;
}

sub rev_proplist {
    my $self = shift;
    $self->ensure_client_cmd('rev_proplist', @_);
    # read synchronous msg
    my $data = thaw( ${$self->read_msg} );
    die 'inconsistent response' unless $data->[1] eq 'rev_proplist';
    return $data->[0];
}


sub replay {
    my $self = shift;
    my $editor = $self->ensure_client_cmd('replay', @_);
    my $baton_map = {};
    my $baton_pool = {};

    eval {

    while ((my $data = $self->read_msg )) {
	my ($next, $func, @arg) = @{thaw($$data)};
	my $baton_at = SVK::Editor->baton_at($func);
	my $baton = $arg[$baton_at];
	if ($baton_at >= 0) {
	    $arg[$baton_at] = $baton_map->{$baton};
	}

	my $pool = SVN::Pool->new;
	my $ret = $self->emit_editor_call($editor, $func, $pool, @arg);

	last if $func eq 'close_edit';

	if ($func =~ m/^close/) {
	    Carp::cluck $func unless $baton_map->{$baton};
	    delete $baton_map->{$baton};
	    delete $baton_pool->{$baton};
	}

	if ($next) {
	    # if we are keeping this parent baton, set the pool as the
	    # default pool as well.
	    $pool->default if $pool;
	    $baton_pool->{$next} = $pool if $pool;
	    $baton_map->{$next} = $ret
	}
    }
    };

    if ($@) {
	kill 15, $self->pid;
	waitpid $self->pid, 0;
	$self->pid(undef);
    }

    # destroy the remaining pool that became default pools in order.
    delete $baton_pool->{$_} 
        for reverse sort keys %$baton_pool;

    die $@ if $@;
}

sub emit_editor_call {
    my ($self, $editor, $func, $pool, @arg) = @_;
    my $ret;
    if ($func eq 'apply_textdelta') {
	my $svndiff = pop @arg;
	$ret = $editor->apply_textdelta(@arg, $pool);

	if ($ret && $#$ret > 0) {
	    my $stream = SVN::TxDelta::parse_svndiff(@$ret, 1, $pool);
	    if (ref $svndiff) { # inline
		print $stream $$svndiff;
	    }
	    else { # filename
		open my $fh, '<', $svndiff or die $!;
		slurp_fh($fh, $stream);
		close $fh;
		unlink $svndiff;
	    }
	    close $stream;
	}
    }
    else {
	# do not emit the fabricated close_edit, as replay doesn't
	# give us that.  We need that in the stream so the client code
	# of replay knows the end of response has reached.
	$ret = $editor->$func(@arg, $pool)
	    unless $func eq 'close_edit';
    }
    return $ret;
}

sub DESTROY {
    my $self = shift;
    return unless $self->pid;
    wait;
}

1;