The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package POE::Component::Generic::Net::SSH2;
# $Id: SSH2.pm 762 2011-05-18 19:34:32Z fil $

use strict;

use POE::Component::Generic;
use POE::Component::Generic::Child;

use Carp qw(carp croak);
use vars qw( @ISA $TIMEOUT $VERSION );

use         # hide from CPANTS
    Net::SSH2;

$VERSION = '0.1400';
@ISA = qw( POE::Component::Generic );
$TIMEOUT = 100;

require Net::SSH2;
if( $Net::SSH2::VERSION < 0.18 ) {
    croak __PACKAGE__, " requires version Net::SSH2 0.18 or better";
}

##########################################################
#
# Create the worker object
# This is called from PoCo::Generic->spawn()
# We set-up everything for the package, so the user doesn't have to.
sub new
{
    my( $package, %params ) = @_;

    $params{package}    ||= 'Net::SSH2';
    $params{factories}  ||= [ qw( channel real_exec ) ];
    $params{packages}   ||= {};
    $params{packages}{'Net::SSH2::Channel'}{ postbacks }
                    ||= [ qw(handler_stderr handler_stdout handler_closed) ];
    $params{postbacks}  ||= { disconnect=>0, real_exec=>[1..4] };
    $params{alias}      ||= 'net-ssh';
    $params{child_package} ||= join '::', __PACKAGE__, 'Child';

    $TIMEOUT = delete $params{timeout} if $params{timeout};

    $package->SUPER::new( %params );
}


##########################################################
# This method converts the SomethingEvent hash into a linear
# argument list, so that the user doesn't have to.
sub exec
{
    my( $self, $data, $command, %events ) = @_;

    my @args = ( $data, $command );
    foreach my $ev ( qw(Stdout Stderr Closed Error) ) {
        push @args, $self->__postback_argument( $ev, \%events );
    }

    $self->real_exec( @args );
}

#####################################################################
# Child process worker
package POE::Component::Generic::Net::SSH2::Child;
use strict;

use IO::Poll qw(POLLRDNORM POLLIN);
use Scalar::Util qw( reftype blessed );
use vars qw( @ISA );

sub DEBUG () { 0 }

@ISA = qw( POE::Component::Generic::Child );


sub new
{
    my $package = shift;
    my $self = $package->SUPER::new( @_ );

    $self->{timeout} ||= $POE::Component::Generic::Net::SSH2::TIMEOUT;
    
    $Net::SSH2::CHILD = $self;
    return $self;
}

#######################################
# Poll stdin and the SSH connection
sub get_requests
{
    my( $self ) = @_;

    unless( $self->{ssh_channels} ) {       # no registered SSH channels
        return $self->SUPER::get_requests();
    }
    DEBUG and warn "we have ssh_channel handlers";

    my $rv;
    while( 1 ) {
        $self->poll_ssh;                    # let SSH channels have a chance

        $rv = $self->poll_stdin;
        return $rv if defined $rv;
    }
}

#######################################
sub poll_ssh
{
    my( $self ) = @_;

    my @poll;
    while( my( $name, $hdef ) = each %{ $self->{ssh_channels} || {} } ) {
        push @poll, { handle=>$hdef->{channel},
                      events=>[ grep { $hdef->{$_} } qw( in ext ) ]
                    };
    }
    return unless @poll;
    DEBUG and warn "ssh_poll";
    # Blocking has to be off for polling
    $self->{obj}->blocking( 0 );
    $self->{obj}->poll( $self->{timeout}, [ @poll ] );

    DEBUG and warn "ssh_poll returned ", 0+@poll, " events";

    my( $n, $buf );
    foreach my $poll ( @poll ) {
        my $hdef = $self->{ssh_channels}{ $poll->{handle} };
        foreach my $ev ( qw( in ext ) ) {
            next unless $poll->{revents}{$ev};

            if( $n = $hdef->{channel}->read( $buf, 1024, $ev eq 'ext' ) ) {
                DEBUG and warn "$ev: $n bytes";
                $hdef->{$ev}->( $buf, $n );
            }
        }
        if( $hdef->{channel}->eof ) {
            DEBUG and warn "EOF";
            $hdef->{channel_closed}->();
            delete $self->{ssh_channels}{ $poll->{handle} };
        }
    }
    return;
}

#######################################
sub __handler
{
    my( $self, $channel, $event, $coderef ) = @_;

    $self->{ssh_channels} ||= {};

    if( $coderef ) {
        die "Must be a coderef" unless 'CODE' eq reftype $coderef;
        $self->{ssh_channels}{$channel} ||= { channel=> $channel };
        $self->{ssh_channels}{$channel}{$event} = $coderef;
    }
    else {
        delete $self->{ssh_channels}{$channel}{$event};
        delete $self->{ssh_channels}{$channel} 
                             if 1 == keys %{ $self->{ssh_channels}{$channel} };
        delete $self->{ssh_channels} if 0 == keys %{ $self->{ssh_channels} };
    }
}

#######################################
sub __remove_channel
{
    my( $self, $channel ) = @_;

    return unless $self->{ssh_channels};

    delete $self->{ssh_channels}{$channel};
    delete $self->{ssh_channels} if 0 == keys %{ $self->{ssh_channels} };
    return;
}


#######################################
# Poll STDIN for a request
# Return 
#   undef() when STDIN closed (parent shutdown)
#   0 nothing to do
#   [ requests...] what to do
sub poll_stdin
{
    my( $self ) = @_;

    DEBUG and warn "poll_stdin ($self->{timeout})";
    my $poll = IO::Poll->new();

    $poll->mask( *STDIN, POLLIN );
    $poll->poll( $self->{timeout}/1000 );
    my $raw;
    my $ev = $poll->events( *STDIN );
    DEBUG and warn "ev=$ev";
    return unless $ev;

    # This should be encapsulated in Generic::Child somehow.
    return 0 unless sysread ( STDIN, $raw, $self->{size} );
    return $self->{filter}->get([$raw]);
}






#####################################################################
# Extend Net::SSH2
#   Net::SSH2 doesn't play nicely with subclassing, so we just throw
#   Things into their namespace.
package             # On 2 lines so the PAUSE indexer doesn't gripe
    Net::SSH2;

use strict;
use vars qw( $CHILD );
sub DEBUG () { 0 }

*true_channel = \&channel;

#################################
{
    no strict 'subs';
    *channel = sub {
        my( $self, @args ) = @_;
        # Blocking has to be on when setting up a channel
        # And for ->exec and ->cmd, it seems, but ...
        $self->blocking( 1 );
        my $obj = true_channel( $self, @args );
        warn "Can't create channel ".$self->error unless $obj;
        return $obj;
    };
}

#################################
# This method does the heavy-lifting for ->exec
sub real_exec
{
    my( $self, $command, $on_stdout, $on_stderr, 
                         $on_closed, $on_error ) = @_;

    my $channel = $self->channel;
    unless( $channel ) {
        $on_error->( $self->error ) if $on_error;
        return
    }
    DEBUG and warn "Created channel";

    $CHILD->__handler( $channel, 'in', $on_stdout ) if $on_stdout;
    $CHILD->__handler( $channel, 'ext', $on_stderr ) if $on_stderr;
    $CHILD->__handler( $channel, 'channel_closed', $on_closed ) if $on_closed;
    # $CHILD->__handler( $channel, 'channel_error', $on_error ) if $on_error;

    DEBUG and 
        warn "Exec $command";
    my $ok = $channel->exec( $command );

    if( $ok ) {
        return $channel; 
    }

    $CHILD->__remove_channel( $channel );
    warn "Couldn't exec $command";
    $on_error->( $self->error ) if $on_error;
    return;
}


sub cmd
{
    my( $self, $command, $input ) = @_;

    my $channel = $self->channel;
    unless( $channel ) {
        return ( '', "Unable to create channel: ".$self->error );
    }

    return $channel->cmd( $command, $input );
}


#####################################################################
# Extend Net::SSH2::Channel
#   Net::SSH2 doesn't play nicely with subclassing, so we just throw
#   things into their namespace.
package                     # on 2 lines so PAUSE indexer doesn't gripe
    Net::SSH2::Channel;

use strict;
sub DEBUG () { 0 }

sub handler_stdout
{
    my( $self, $coderef ) = @_;

    DEBUG and warn "handler_stdout";
    $Net::SSH2::CHILD->__handler( $self, 'in', $coderef );
}

sub handler_stderr
{
    my( $self, $coderef ) = @_;

    DEBUG and warn "handler_stderr";
    $Net::SSH2::CHILD->__handler( $self, 'ext', $coderef );
}

sub handler_closed
{
    my( $self, $coderef ) = @_;

    DEBUG and warn "handler_closed";
    $Net::SSH2::CHILD->__handler( $self, 'channel_closed', $coderef );
}

sub cmd
{
    my( $self, $command ) = @_;

    DEBUG and warn "cmd: $command";

    return unless $self->exec( $command );

    my @ret = ( '', '' );

    $self->blocking( 0 );
    my @poll = ({ handle => $self, events => [ qw( in ext ) ] });
    do {
        my( $buf, $n );
        $self->session->poll(100, \@poll);
        if( $self->eof ) {
            DEBUG and 
                warn "EOF";
            return @ret if wantarray;
            return $ret[0];
        }
        foreach my $ev ( qw( in ext ) ) {
            next unless $poll[0]->{revents}->{$ev};

            DEBUG and warn "cmd: read $ev";
            if( $n = $self->read($buf, 1024, $ev eq 'ext') ) {
                DEBUG and warn "$ev: $n bytes";
                $ret[ $ev eq 'in' ? 0 : 1 ] .= $buf;
            }
        }
    } while( 1 );
    # we'd never get here
}

no strict 'subs';
*DESTROY = sub {
        my( $self ) = @_;
        $Net::SSH2::CHILD->__remove_channel( $self ) if $Net::SSH2::CHILD;
    };

1;

__END__

=head1 NAME

POE::Component::Generic::Net::SSH2 - A POE component that provides non-blocking access to Net::SSH2

=head1 SYNOPSIS

    use POE::Component::Generic::Net::SSH2;

    my $ssh = POE::Component::Generic::Net::SSH2->spawn( 
                    alias   => 'my-ssh',
                    verbose => 1,
                    debug   => 0 );

    my $channel;

    POE::Session->create( 
        inline_states => {
            _start => sub {
                $poe_kernel->delay( 'connect', $N );
            },

            connect => sub {
                $ssh->connect( {event=>'connected'}, $HOST, $PORT );
            },
            connected => sub {
                $ssh->auth_password( {event=>'login'}, $USER, $PASSWORD );
            },
            error => sub {
                my( $resp, $code, $name, $error ) = @_[ARG0, $#_];
                die "Error $name ($code) $error";
            },

            login => sub {
                my( $resp, $OK ) = @_[ARG0..$#_];
                unless( $OK ) {
                    $ssh->error( {event=>'error', wantarray=>1} );
                    return;
                }

                $poe_kernel->yield( 'cmd_do' );
            },

            ################
            cmd_do => sub {
                $ssh->cmd( { event=>'cmd_output', wantarray=>1 }, "ls -l" );
                return;
            },
            cmd_output => sub {
                my( $resp, $stdout, $stderr ) = @_[ARG0..$#_];
                warn "Contents of home directory on $HOST:\n$stdout\n";


                $poe_kernel->yield( 'exec_do' );
            },

            exec_do => sub {
                 
                $ssh->exec( { event=>'exec_running', wantarray=>0 }, 
                                "cat - >$FILE", 
                                StdoutEvent => 'exec_stdout', 
                                StderrEvent => 'exec_stderr', 
                                ClosedEvent => 'exec_closed', 
                                ErrorEvent  => 'exec_error' 
                          );
            },
            exec_running => sub {
                my( $resp, $ch ) = @_[ARG0..$#_];
                # keep channel alive
                $channel = $ch;
                $channel->write( {}, "$$-CONTENTS OF THE FILE\n" );
                $channel->send_eof( {event=>'done'} );
            },

            done => sub {
                undef( $channel );
                $ssh->shutdown;
            }

            exec_error => sub {
                my( $code, $name, $string ) = @_[ARG0..$#_];
                die "ERROR: $name $string";
            },        
            exec_stderr => sub {
                my( $text, $bytes ) = @_[ARG0..$#_];
                die "STDERR: $text";
                return;
            },
            exec_stdout => sub {
                my( $text, $bytes ) = @_[ARG0..$#_];
                warn "STDOUT: $text";
                return;
            },
            exec_closed => sub {
                undef( $channel );
                $ssh->shutdown;
            },
        }
    );


=head1 DESCRIPTION


L<POE::Component::Generic::Net::SSH2> is a component for handling SSH2
connections from POE.  It uses L<POE::Component::Generic> to wrap
L<Net::SSH2> into a non-blocking framework.

This component demonstrates many tricks that you might find useful when you
build your own components based on L<POE::Component::Generic>.

It is still ALPHA quality.  Missing are scp, sftp support and better error
handling.

Patches welcome.


=head1 METHODS

=head2 spawn


=head1 Net::SSH2 METHODS

L<POE::Component::Generic::Net::SSH2> supports most Net::SSH2 method calls 
using L<POE::Component::Generic>'s interface.  The following additional 
methods are added for better POE support.


=head2 cmd

    $ssh->cmd( $data_hash, $command, $input );

Ultra-simple command execution.  Runs C<$command> in a new channel on the
remote host.  C<$input> is then feed to it (NOT YET!).  All output is 
accumulated until the command exits, then is sent to the response event
as C<ARG1> (STDOUT) and C<ARG2> (STDERR).


=head2 exec

    $ssh->exec( $data_hash, $command, %events );

Runs C<$command> in a new channel on the remote host.  The response event
will receive an SSH channel that it may use to write to or read from.

C<%events> is a hash that may contain the following keys:

=over 4

=item StdoutEvent

=item StderrEvent

Called when C<$command> writes to STDOUT or STDERR, respectively, with 2
arguments: C<ARG0> is the data, C<ARG1> is the number of bytes.

=item ErrorEvent

Called when there is an SSH error.  The documentation for libssh2 is
piss-poor, so I'm not really sure what these could be, nor how to detect
them all.  Arguments are the same as C<Net::SSH2/error>: C<ARG0> is error
number, C<ARG1> is error name and C<ARG2> is the error string.

=item ClosedEvent

Called when the we encounter an SSH C<eof> on the channel, which normaly
corresponds to when C<$command> exits.

No arguments.

=back


=head1 Net::SSH2::Channel METHODS

A channel is a conduit by which SSH communicates to a sub-process on the
remote side.  Each command, shell or sub-system uses its own channel.  A
channel may only run one command, shell or sub-system.

Channels are created with the C<channel> factory method:

    $ssh->channel( {event=>'got_channel'} );

    # Your got_channel event
    sub got_channel {
        my( $heap, $resp, $channel ) = @_[HEAP, ARG0, ARG1];
        die $resp->{error} if $resp->{error};

        $heap->{channel} = $channel;
    }

You may call most L<Net::SSH2::Channel> methods on a channel using the
normal L<POE::Component::Generic> calling conventions.

    $heap->{channel}->write( {}, $string );

Channels are closed when you drop all references to the object.

    delete $heap->{channel};


There are some extensions to the channel interface:

=head2 cmd

    $heap->{channel}->cmd( {event=>'cmd_response'}, $command, $input );

Runs C<$command> on the channel.  Response event receives 2 arguments:
C<ARG1> is the commands output to STDOUT,
C<ARG2> is the commands output to STDERR.

If you do not set C<wantarray> to 1, you will only receive STDOUT.

=head2 handler_stdout

=head2 handler_stderr

Registers a postback that is posted when the data is present on STDOUT
(called 'in' in libssh2) or STDERR (called 'ext' in libssh2) respectively.  

These could be used when you call L<Net::SSH2::Channel/exec> on a channel.

=head2 handler_closed

Registers a postback that is posted when the channel closes, which normaly
happens when the command has finished.

These could be used when you call L<Net::SSH2::Channel/exec> on a channel.



=head1 AUTHOR

Philip Gwyn E<lt>gwyn-at-cpan.orgE<gt>

=head1 SEE ALSO

L<POE>, L<Net::SSH2>, L<POE::Component::Generic>.

=head1 RATING

Please rate this module.
L<http://cpanratings.perl.org/rate/?distribution=POE-Component-Generic>

=head1 BUGS

Probably.  Report them here:
L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=POE%3A%3AComponent%3A%3AGeneric>

=head1 COPYRIGHT AND LICENSE

Copyright 2006, 2011 by Philip Gwyn.


This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut