The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Hg::Lib::Server::Pipe;

use 5.10.1;

use Carp;

use IO::Select;

# for perl 5.10.1
use FileHandle;

use System::Command;
use Try::Tiny;
use Types::Standard -all;
use Type::Params;

use Hg::Lib::Utils 'find_hg';

use Hg::Lib::Exception -aliases;

use Hg::Lib::Types '-all';

use Moo;

no if $] >= 5.018, 'warnings', "experimental::smartmatch";

with 'MooX::Attributes::Shadow::Role';

shadowable_attrs( qw[ hg args path configs encoding env timeout ] );

# path to hg executable; allow multiple components
has hg => (
    is      => 'ro',
    default => sub { find_hg() },
    isa     => StrList,
    coerce  => StrList->coercion,
);

# arguments to hg
has args => (
    is      => 'ro',
    isa     => StrList,
    coerce  => StrList->coercion,
    default => sub { [] },
);

has path => (
    is        => 'ro',
    isa       => Str,
    predicate => 1,
);

has configs => (
    is      => 'ro',
    isa     => StrList,
    coerce  => StrList->coercion,
    default => sub { [] },
);

# used for optional initial encoding; set to encoding returned in
# server's hello message
has encoding => (
    is        => 'rwp',
    clearer   => 1,
    predicate => 1,
    isa       => Str,
);

# what the server claims i can do
has capabilities => (
    is        => 'rwp',
    predicate => 1,
    init_arg  => undef,
);

# constructed command line; does not include environment variables
has cmdline => (
    is       => 'lazy',
    init_arg => undef,
    builder  => sub {

        my $self = shift;

        my @cmd = (
            @{ $self->hg },
            qw[ --config ui.interactive=True
              serve
              --cmdserver pipe
              ],
        );

        push @cmd, '-R', $self->path if $self->has_path;

        push @cmd, map { ( '--config' => $_ ) } @{ $self->configs };

        push @cmd, @{ $self->args };

        return \@cmd;
    },

);

# passed command environment
has env => (

    is      => 'ro',
    isa     => HashRef,
    default => sub { {} },

);

# Pipe object
has pipe => (
    is        => 'rwp',
    init_arg  => undef,
    handles   => [qw[ stdin stdout stderr ]],
    clearer   => 1,
    predicate => 1,
);

has timeout => (
    is      => 'rw',
    isa     => Num,
    default => 5,
);

sub BUILD {

    my $self = shift;

    my $env = $self->env;
    $env->{HGPLAIN}    = 1;
    $env->{HGENCODING} = $self->encoding
      if $self->has_encoding;

    $self->_set_pipe(
        System::Command->new( @{ $self->cmdline }, { env => $self->env } ) );
    $self->stderr->blocking( 0 );

    # get hello message; on failure make sure we pick up
    # any output on stderr vi $self->close
    try {

        $self->_get_hello;

    }
    catch {

        local $@ = $_;

        try {
	    my $e = $@;

            $self->close;

	    # make sure we rethrow the original error
	    die $e;

        }
        catch {

            local $@ = $_;

	    if ( /no Mercurial repository here/ ) {

		ENoRepo->throw( "no repository?" );

	    }
	    else {
		EHandshake->throw( "error in handshake with server" );
	    }
        };
    };

    return;
}


sub DEMOLISH {

    local $@;
    $_[0]->close;

}

sub close {

    my $self = shift;

    return unless $self->has_pipe;

    # whatever happens, $self->pipe must die!
    my $pipe = $self->pipe;
    $self->clear_pipe;

    # this signals hg to quit.
    $pipe->stdin->close;

    my $stderr
      = IO::Select->new( $pipe->stderr )->can_read( $self->timeout )
      ? join( '', $pipe->stderr->getlines )
      : '';

    chomp $stderr;

    $pipe->close;

    if ( $pipe->exit ) {
        EPipeTerminated->throw(
            join( "\n",
                "unexpected termination of server: exit code: " . $pipe->exit,
                "server stderr: $stderr", '' ) );
    }

    EPipeStderr->throw( "server stderr: $stderr" ) if length( $stderr );

    return;
}

sub _read {

    my $self = shift;

    # note that can_read returns an fh on its EOF as well as when it
    # is available for reading.
    EPipeTimeout->throw( "timed out waiting for data from server" )
      unless IO::Select->new( $self->stdout )->can_read( $self->timeout );

    # use aliased data in @_ to prevent copying
    return $self->stdout->sysread( @_ );
}

# always use aliased $_[0] as buffer to prevent copying
# call as get_chunk( $buf )
sub get_chunk {

    my $self = shift;

    # catch pipe errors from child
    local $SIG{'PIPE'} = sub {
        EPipe->throw( "SIGPIPE on read from server" );
    };

    my $nr = $self->_read( $_[0], 5 );
    EPipe->throw( "error reading chunk header from server: $!" )
      unless defined $nr;

    $nr > 0
      or EPipeEOF->throw(
        "unexpected end-of-file getting chunk header from server" );

    my ( $ch, $len ) = unpack( 'A[1] l>', $_[0] );

    if ( $ch =~ /[IL]/ ) {

        EPipe->throw(
            "get_chunk called incorrectly called in scalar context for channel $ch"
        ) unless wantarray();

        return $ch, $len;
    }

    else {

        if ( $len ) {

            my $nr = $self->_read( $_[0], $len );

            $nr == $len
              or EPipeEOF->throw(
                "unexpected end-of-file on channel $ch; expected $len bytes, got $nr"
              );
        }
        else {
            $_[0] = '';
        }

        return $ch;
    }

}

# call as $self->write( $buf, [ $len ] )
sub write {

    my $self = shift;
    my $len = @_ > 1 ? $_[1] : length( $_[0] );
    $self->stdin->syswrite( $_[0], $len ) == $len
      or EPipe->throw( "error writing $len bytes to server" );
}

sub writeblock {

    my $self = shift;

    $self->write( pack( "N/a*", $_[0] ) );
}

sub _get_hello {

    my $self = shift;

    my $buf;
    my $ch = $self->get_chunk( $buf );

    EPipe->throw(
        "corrupt or incomplete hello message from server: channel = $ch; length = "
          . length $buf )
      unless $ch eq 'o' && length $buf;

    my $requested_encoding = $self->has_encoding ? $self->encoding : undef;
    $self->clear_encoding;

    for my $item ( split( "\n", $buf ) ) {

        my ( $field, $value ) = $item =~ /([a-z0-9]+):\s*(.*)/;

        if ( $field eq 'capabilities' ) {

            $self->_set_capabilities(
                { map { $_ => 1 } split( ' ', $value ) } );
        }

        elsif ( $field eq 'encoding' ) {

            EEncoding->throw(
                "incorrect encoding returned: requested '$requested_encoding', got '$value'"
            ) if defined $requested_encoding && $requested_encoding ne $value;

            $self->_set_encoding( $value );

        }

        # ignore anything else 'cause we don't know what it means

    }

    # make sure hello message meets minimum standards
    ECapability->throw( "server did not provide capabilities?" )
      unless $self->has_capabilities;

    ECapability->throw( "server is missing runcommand capability" )
      unless exists $self->capabilities->{runcommand};

    EEncoding->throw( "server did not provide encoding?" )
      unless $self->has_encoding;

    return;
}

sub getencoding {

    my $self = shift;

    $self->write( "getencoding\n" );

    my $buffer;
    my ( $ch, $len ) = $self->get_chunk( $buffer );

    EPipe->throw( "unexpected return message for getencoding on channel $ch\n" )
      unless $ch eq 'r' && length( $buffer );

    return $buffer;

}

# $server->runcommand( [ $command, @args ],
#                      inchannels => \%callbacks,
#                      outchannels => \%callbacks )
sub runcommand {

    my $self = shift;

    # constraint check
    state $check = compile(
        ArrayRef [Str],
        slurpy Dict [
            inchannels  => Optional [ HashRef [CodeRef] ],
            outchannels => Optional [ HashRef [CodeRef] ],
        ] );

    state $outchannels = {
        o => sub { print STDOUT @_ },
        e => sub { print STDERR @_ },
    };

    my ( $args, $opts ) = $check->( @_ );

    $opts->{inchannels}  //= {};
    $opts->{outchannels} //= $outchannels;

    $self->write( "runcommand\n" );
    $self->writeblock( join( "\0", @$args ) );

    # read from server until a return channel is specified
    my $buffer;
    while ( 1 ) {

        my ( $ch, $len ) = $self->get_chunk( $buffer );

        for ( $ch ) {

            when ( $opts->{inchannels} ) {

                $self->writeblock( $opts->{inchannels}{$ch}->( $len ) );
            }

            when ( $opts->{outchannels} ) {

                $opts->{outchannels}{$ch}->( $buffer );
            }

            when ( 'r' ) {

                state $length_exp = length( pack( 'l>', 0 ) );
                EPipe->throw(
                    sprintf "incorrect message length (got %d, expected %d)",
                    length( $buffer ), $length_exp )
                  if length( $buffer ) != $length_exp;

                return unpack( 'l>', $buffer );
            }

            when ( /[[:upper:]]/ ) {

                EPipe->throw( "unexpected data on required channel $ch\n" );
            }
        }
    }
    return;
}

1;

__END__

=pod

=head1 NAME

Hg::Lib::Server::Pipe

=head1 VERSION

version 0.01_05

=head1 AUTHOR

Diab Jerius E<lt>djerius@cpan.orgE<gt>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2013 by Diab Jerius E<lt>djerius@cpan.orgE<gt>.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut