The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
=head1 NAME

DynGig::Multiplex::CMD - Multiplexed Fork Client

=cut
package DynGig::Multiplex::CMD;

use warnings;
use strict;
use Carp;

use POSIX ":sys_wait_h";
use IPC::Open3;
use Errno qw( :POSIX );
use Time::HiRes qw( time sleep );
use IO::Poll 0.04 qw( POLLIN POLLERR POLLHUP POLLOUT );

use constant { MAX_BUF => 2 ** 20, MULTIPLEX => 2 ** 5 }; 

=head1 SYNOPSIS
 
 use DynGig::Multiplex::CMD;

 my @target = qw( host1 host2 ... );

 my %config =
 (
     timeout => 30,
     buffer => 'bob loblaw',
     ## {} is replaced with each of the individual 'targets'.
     ## also assuming no SSH password/pass phrase challenge.
     command => 'ssh {} wc',
 ); 

 my $client = DynGig::Multiplex::CMD->new( map { $_ => \%config } @target );

 my %option =
 (
     timeout => 300,     ## global timeout in seconds
     max_buf => 1024,    ## max number of bytes in each read buffer
     multiplex => 100,   ## max number of children processes
     verbose => *STDERR  ## report progress to STDERR
 );

 if ( $client->run( %option ) )
 {
     my $result = $client->result() || {};
     my $error = $client->error() || {};
 }
 else
 {
     print $client->error();
 }

=cut
sub new
{
    my ( $class, %config ) = @_;
    my %done;

    for my $target ( keys %config )
    {
        my $error = "$target: invalid definition"; 
        my $config = $config{$target};

        next if $done{$config};

        croak $error unless $config && ref $config eq 'HASH';

        my $buffer = $config->{buffer};
        my $timeout = $config->{timeout};
        my $command = $config->{command};
        my $ref = defined $command ? ref $command : 'UNDEF';

        croak "$error for timeout" if $timeout
            && ( ref $timeout || $timeout !~ /^\d+$/o );

        croak "$error for command" if $ref && $ref ne 'ARRAY';
        croak "$error for buffer" if $buffer && ref $buffer;

        $config->{buffer} = '' unless defined $buffer;
        $config->{length} = length $config->{buffer};
        $done{$config} = 1;
    }

    bless +{ config => \%config }, ref $class || $class;
}

=head1 DESCRIPTION

=head2 run

Launches client with the following parameter.
Returns 1 if successful. Returns 0 otherwise.

 timeout   : global timeout in seconds
 max_buf   : max number of bytes in each read buffer
 multiplex : max number of children processes
 verbose   : report progress to a file handle opened for write

=cut
sub run
{
    my ( $this, %param ) = @_;
    my $config = $this->{config};

    return 1 unless my @target = keys %$config;

    map { delete $this->{$_} } qw( result error );

    for my $key ( qw( timeout max_buf multiplex ) )
    {
        my $value = $param{$key};
        croak "invalid definition for $key"
            if $value && ( ref $value || $value !~ /^\d+$/o );
    }

    my $poll = IO::Poll->new();

    unless ( $poll )
    {
        $this->{error} = "poll: $!";
        return 0;
    }

    my %mask =
    (
        in => POLLIN | POLLHUP | POLLERR,
        out => POLLOUT,
    );

    my $timeout = $param{timeout};
    my $verbose = $param{verbose};
    my $multiplex = $param{multiplex} || MULTIPLEX;
    my $max_buf = $param{max_buf} || MAX_BUF;
    my @io = qw( stdin stdout stderr );
    my ( %error, %result, %lookup, %pid );
    my $epoch = time;
    my $current = 0;

    $multiplex = @target if $multiplex > @target;
    $verbose = 0 unless $verbose && fileno $verbose;

    while ( $poll->handles || @target )
    {
        my $time = time;

        if ( $timeout && $time - $epoch > $timeout )
        {
            $this->{error} = 'timeout';
            return 0;
        }

        while ( $current < $multiplex && @target )
        {
            my $target = shift @target;
            my $config = $config->{$target};
            my $command = $config->{command};
            my @handle = ( undef, undef, Symbol::gensym );
            my @command = ref $command ? @$command : $command;

            @command = map { $_ =~ s/{}/$target/g; $_; } @command;

            my $pid = eval { IPC::Open3::open3( @handle, @command ); };

            if ( $@ )
            {
                $error{$target} = "open3: $@";
                next;
            }
            $pid{$pid} = 1;

            $lookup{target}{$target} = +
            {
                handle => \@handle,
                epoch => $time,
                pid => $pid,
            };

            my %config =
            (
                target => $target,
                stdout => '',
                stderr => '',
                stdin => \ $config->{buffer},
                length => $config->{length},
                offset => 0,
            );

            for my $i ( 0 .. $#handle )
            {
                my $handle = $handle[$i];

                if ( $i )
                {
                    $poll->mask( $handle => POLLIN );
                }
                else
                {
                    next unless $config{length};
                    $poll->mask( $handle => POLLOUT );
                }

                $lookup{handle}{$handle} = [ \%config, $io[$i] ];
            }

            $current ++;
        }

        $poll->poll( 0.1 );

        for my $handle ( $poll->handles( $mask{in} ) )
        {
            my $buffer;
            my $lookup = $lookup{handle}{$handle};
            my $read = sysread $handle, $buffer, $max_buf;
            my $io = $lookup->[1];

            if ( $read )
            {
                $lookup->[0]{$io} .= $buffer;
                next;
            }

            my $target = $lookup->[0]{target};

            if ( defined $read )
            {
                $result{$target}{$io} = $lookup->[0]{$io};
            }
            else
            {
                next if $! == EAGAIN;
                $error{$target} = "sysread: $!";
            }

            delete $lookup{handle}{$handle};

            $poll->remove( $handle );
            close $handle;
        }

        for my $handle ( $poll->handles( $mask{out} ) )
        {
            my $lookup = $lookup{handle}{$handle};
            my $length = $lookup->[0]{length};
            my $wrote = eval { syswrite $handle, ${ $lookup->[0]{stdin} },
                $length, $lookup->[0]{offset} };

            if ( defined $wrote )
            {
                next if $length != ( $lookup->[0]{offset} += $wrote );
            }
            else
            {
                next if $! == EAGAIN;
                $error{ $lookup->[0]{target} } = "syswrite: $!";
            }

            delete $lookup{handle}{$handle};

            $poll->remove( $handle );
            close $handle;
        }

        for my $target ( keys %{ $lookup{target} } )
        {
            my $timeout = $config->{$target}{timeout};
            my $lookup = $lookup{target}{$target};
            my $handle = $lookup->{handle};
            my $status = 'complete';

            goto NEXT unless grep { $lookup{handle}{$_} } @$handle;
            next if ! $timeout || $timeout > $time - $lookup->{epoch};

            $status = $error{$target} = 'timeout';
            die sprintf( "kill 9, %s", $lookup{pid} )
                unless kill 9, $lookup->{pid};

            for my $handle ( @$handle )
            {
                delete $lookup{handle}{$handle};
                $poll->remove( $handle );
                close $handle;
            }

            NEXT: $current --;
            delete $lookup{target}{$target};
            print $verbose "$target $status.\n" if $verbose;
        }
    }

    while ( keys %pid )
    {
        my $pid = waitpid( -1, WNOHANG );
        last if $! == ECHILD;
        sleep 0.1, next unless $pid > 0;
        delete $pid{$pid};
    }
  
    $this->{result} = \%result if %result;
    $this->{error} = \%error if %error;

    return 1;
}

=head2 result()

Returns undef if no result. Returns a HASH reference indexed by 'target'.

=cut
sub result
{
    my $this = shift;
    return $this->{result};
}

=head2 error()

Returns undef if no error.
Returns a string if a global error occurred, else if errors occurred with
children processes, returns a HASH reference indexed by 'target'.

=cut
sub error
{
    my $this = shift;
    return $this->{error};
}

=head1 SEE ALSO

IPC::Open3 and IO::Poll

=head1 NOTE

See DynGig::Multiplex

=cut

1;

__END__