The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/env perl

use strict;
use warnings;

#use Smart::Comments::JSON '##';
use lib 'lib';
use Net::OpenSSH;
use Term::ReadKey;
use SSH::Batch::ForNodes;
use File::Temp qw/ :POSIX /;

sub help ($);

if (!@ARGV) {
    warn "No argument specified.\n\n";
    help(1);
}

my $list_hosts_only = 0;
my ($user, $port, $timeout, $verbose, $ask_for_pass, $ask_for_passphrase, $recursive, $use_glob);
my ($bwlimit, $use_rsync);
my $concurrency = 20;
my (@files, @exprs);
my $fetch_value;
my $found_sep;
my $last_option;
my $line_mode = $ENV{SSH_BATCH_LINE_MODE};

my $rsync_archive;
my $rsync_update;
my $rsync_compress;

for (@ARGV) {
    if (defined $fetch_value) {
        $fetch_value->($_);
        undef $fetch_value;
        next;
    }
    if ($_ eq '--') {
        @files = @exprs;
        @exprs = ();
        $found_sep = 1;
        next;
    }
    if (/^-(.+)/) {
        my $group = $1;
        if ($group eq 'l') {
            $list_hosts_only = 1;
        } elsif ($group eq 'u') {
            $fetch_value = sub { $user = shift };
        } elsif ($group eq 't') {
            $fetch_value = sub { $timeout = shift };
        } elsif ($group eq 'h') {
            help(0);
        } elsif ($group eq 'p') {
            $fetch_value = sub { $port = shift };
        } elsif ($group eq 'v') {
            $verbose = 1;
        } elsif ($group eq 'w') {
            $ask_for_pass = 1;
        } elsif ($group eq 'P') {
            $ask_for_passphrase = 1;
        } elsif ($group eq 'r') {
            $recursive = 1;
        } elsif ($group eq 'g') {
            $use_glob = 1;
        } elsif ($group eq 'b') {
            $fetch_value = sub { $bwlimit = shift };
        } elsif ($group eq 'rsync') {
            $use_rsync = 1;
        } elsif ($group eq 'archive') {
            $rsync_archive = 1;
        } elsif ($group eq 'update') {
            $rsync_update = 1;
        } elsif ($group eq 'compress') {
            $rsync_compress = 1;
        } elsif ($group eq 'c') {
            $fetch_value = sub { $concurrency = shift };
        } elsif ($group eq 'L') {
            $line_mode = 1;
        } else {
            die "Unknown option: $_\n";
        }
        $last_option = $_;
        next;
    }
    push @exprs, $_;
}
if (defined $fetch_value) {
    die "ERROR: Option $last_option takes a value.\n";
}

if (!$found_sep && !@files) {
    push @files, shift @exprs;
}
if (!@files) {
    die "No local files/directories specified.\n";
}

if ($use_glob) {
    @files = map glob, @files;
}

my $changed = 0;
for my $file (@files) {
    if (!-e $file) {
        die "Local file/directory $file not found.\n";
    }
    if (!$recursive && -d $file) {
        warn "Warning: Skipped directory $file.\n";
        undef $file;
        $changed = 1;
    }
}

if ($changed) {
    @files = grep { defined $_ } @files;
    if (!@files) {
        die "ERROR: No files to be transferred.\n";
    }
}

if ($verbose) {
    warn "Using ", $use_rsync ? 'Rsync' : 'Scp', " method.\n";
    warn "Local files: ", (map { "[$_]" } @files), "\n";
}

my $expr = join ' ', @exprs;
my $target_path;
if ($expr =~ s/\s*:(\D\S*)\s*$//) {
    $target_path = $1;
} else {
    die "No remote target path specified.\n",
        "  (You forgot to specify \":/path/to/target\" at the end of the command line?)\n";
}
my $expanded_path = 0;
if (defined $ENV{USER}) {
    my $old_target = $target_path;
    # XXX FIXME Maybe we should expand ~ on the remote server?
    if ($target_path =~ s{^~(/|$)}{/home/$ENV{USER}$1}) {
        warn "WARNING: Expanding target path $old_target to $target_path\n";
        $expanded_path = 1;
    }
}
if (!$expanded_path) {
    my $old_target = $target_path;
    if ($target_path =~ s{^~(\w+)}{/home/$1}) {
        warn "WARNING: Expanding target path $old_target to $target_path\n";
        $expanded_path = 1;
    }
}

if ($expr =~ /^\s*$/) {
    die "No cluster expression specified.\n";
}

if ($verbose) {
    warn "Cluster expression: $expr\n";
    warn "Target path: ", defined $target_path ? $target_path : '', "\n";
}

SSH::Batch::ForNodes::init_rc();
my $set = SSH::Batch::ForNodes::parse_expr($expr);

if ($set->is_empty) {
    die "No machine to be operated.\n";
}
my @hosts = sort $set->elements;

if ($verbose) {
    warn "Cluster set: @hosts\n";
} elsif ($list_hosts_only) {
    print "Cluster set: @hosts\n";
}

if ($list_hosts_only) {
    exit(0);
}
my ($passphrase, $password);
if ($ask_for_passphrase) {
    $passphrase = $ENV{SSH_BATCH_PASSPHRASE};
    if (!$passphrase) {
        print STDERR "Passphrase:";
        ReadMode(2);
        while (not defined ($passphrase = ReadLine(0))) {
        }
        ReadMode(0);
        print "\n";
        chomp $passphrase;
    }
    if (!$passphrase) {
        die "No passphrase specified.\n";
    }
} elsif ($ask_for_pass) {
    $password = $ENV{SSH_BATCH_PASSWORD};
    if (!$password) {
        print STDERR "Password:";
        ReadMode(2);
        while (not defined ($password = ReadLine(0))) {
        }
        ReadMode(0);
        print "\n";
        chomp $password;
    }
    if (!$password) {
        die "No password specified.\n";
    }
}

my (%conns, @pids, @outs);
my %pid2host;
my $active_count = 0;
while (1) {
    last if !@hosts && !@pids;
    my @active_hosts;
    while ($active_count < $concurrency) {
        last if !@hosts;
        my $host = shift @hosts;
        ## connecting to host: $host
        my $ssh = Net::OpenSSH->new(
            $host,
            async => 1,
            defined $timeout ? (timeout => $timeout) : (),
            defined $user ? (user => $user) : (),
            defined $port ? (port => $port) : (),
            defined $passphrase ? (passphrase => $passphrase) : (),
            defined $password ? (password => $password) : (),
        );
        if ($ssh->error) {
            if ($line_mode) {
                print STDERR "$host: ";
            } else {
                print "===" x 7, " $host ", "===" x 7, "\n";
            }
            warn "ERROR: Failed to establish SSH connection: ",
                $ssh->error, "\n";
            next;
        }
        $conns{$host} = $ssh;
        $active_count++;
        push @active_hosts, $host;
    }
    for my $host (@active_hosts) {
        my ($out, $outfile) = tmpnam();
        my $meth = $use_rsync ? 'rsync_put' : 'scp_put';

        ## starting transfer to host: $host
        my $ssh = $conns{$host};
        my $pid = $ssh->$meth({
            stdout_fh => $out,
            stderr_to_stdout => 1,
            async => 1,
            defined $recursive ? (recursive => $recursive) : (),
            defined $bwlimit ? (bwlimit => $bwlimit) : (),

            defined $use_rsync && defined $rsync_archive ? (archive => 1) : (),
            defined $use_rsync && defined $rsync_update ? (update => 1) : (),
            defined $use_rsync && defined $rsync_compress ? (compress => 1) : (),
            # XXX recursive
        }, @files, $target_path);
        if (!defined $pid or $pid == -1) {
            $active_count--;
            if ($line_mode) {
                print STDERR "$host: ";
            } else {
                print "===" x 7, " $host ", "===" x 7, "\n";
            }
            if ($ssh->error) {
                warn "ERROR: ", $ssh->error, "\n";
            } else {
                warn "ERROR: Failed to transfer files.\n";
            }
            close $out;
            unlink $outfile;
            delete $conns{$host};
            next;
        }
        push @outs, $outfile;
        push @pids, $pid;
        $pid2host{$pid} = $host;
    }
    if (@pids) {
        my $pid = shift @pids;
        my $host = delete $pid2host{$pid};
        $active_count--;
        ## waiting transfer to host: $host
        if (!$line_mode) {
            print "===" x 7, " $host ", "===" x 7, "\n";
        }
        if (!defined $pid) {
            warn "Failed to connect to host $host.\n";
            delete $conns{$host};
            next;
        }
        my $ret = waitpid($pid, 0);
        my $exit = ($? >> 8);

        delete $conns{$host};

        if ($ret > 0) {
            if ($exit > 0) {
                warn "$host: Transfer of files failed (status code: $exit)\n";
                next;
            }
        } else {
            #redo if ($! == EINTR);
            if ($line_mode) {
                print STDERR "$host: ";
            }
            warn "$host: waitpid($pid) failed: $!\n";
            next;
        }
        my $outfile = shift @outs;
        my $in;
        if (!open $in, $outfile) {
            if ($line_mode) {
                print STDERR "$host: ";
            }
            warn "Can't open $outfile for reading: $!\n";
            next;
        }
        while (<$in>) {
            chomp;
            if ($line_mode) {
                print "$host: ";
            }
            print "$_\n";
        }
        if (!$line_mode) {
            print "\n";
        }
        close $in;
    }
}

sub help ($) {
    my $exit_code = shift;
    my $msg = <<'_EOC_';
USAGE:

    tonodes [OPTIONS] FILE... -- HOST_PATTERN... [OPTIONS]
    tonodes [OPTIONS] FILE HOST_PATTERN... [OPTIONS]

OPTIONS:
    -c <num>      Set SSH concurrency limit. (default: 20)
    -b <num>      bandwidth limit in Kbits/sec.
    -g            Use glob to process the input files/directories.
    -h            Print this help.
    -l            List the hosts and do nothing else.
    -L            Use the line-mode output format, i.e., prefixing
                  every output line with the machine name.
                  (could be controlled by the env SSH_BATCH_LINE_MODE)
    -p <port>     Port for the remote SSH service.
    -r            Recurse into directories too.
    -rsync        Use "rsync" to transfer files.
    -archive      Enable rsync archive mode
    -update       Enable rsync update
    -compress     Enable rsync compress
    -t <timeout>  Specify timeout for net traffic.
    -u <user>     User account for SSH login.
    -v            Be verbose.
    -w            Prompt for password (used mostly for login and sudo,
                  could be privided by SSH_BATCH_PASSWORD).
    -P            Prompt for passphrase (used mostly for login,
                  could be privided by SSH_BATCH_PASSPHRASE).
_EOC_
    if ($exit_code == 0) {
        print $msg;
        exit(0);
    } else {
        warn $msg;
        exit($exit_code);
    }
}
__END__

=encoding utf-8

=head1 NAME

tonodes - Upload local files/directories to remote clusters

=head1 SYNOPSIS

    # tonodes calls fornodes internally...

    $ tonodes /tmp/*.inst -- '{as}:/tmp/'
    $ tonodes foo.txt 'ws1105*' :/tmp/bar.txt

    $ tonodes -r /opt /bin/* -- 'ws[1101-1102].foo.com' 'bar.com' :/foo/bar/

    # use rsync instead of scp:
    $ tonodes foo.txt 'ws1105*' :/tmp/bar.txt -rsync

    # use rsync archive update compress
    $ tonodes foo.txt 'ws1105*' :/tmp/bar.txt -rsync -archive -update -compress


=head1 USAGE

    tonodes [OPTIONS] FILE... -- HOST_PATTERN... [OPTIONS]
    tonodes [OPTIONS] FILE HOST_PATTERN... [OPTIONS]

=head1 OPTIONS

    -c <num>      Set SSH concurrency limit. (default: 20)
    -b <num>      bandwidth limit in Kbits/sec.
    -g            Use glob to process the input files/directories.
    -h            Print this help.
    -l            List the hosts and do nothing else.
    -L            Use the line-mode output format, i.e., prefixing
                  every output line with the machine name.
                  (could be controlled by the env SSH_BATCH_LINE_MODE)
    -p <port>     Port for the remote SSH service.
    -r            Recurse into directories too.
    -rsync        Use "rsync" to transfer files.
    -archive      Enable rsync archive mode
    -update       Enable rsync update
    -compress     Enable rsync compress
    -t <timeout>  Specify timeout for net traffic.
    -u <user>     User account for SSH login.
    -v            Be verbose.
    -w            Prompt for password (used mostly for login and sudo,
                  could be privided by SSH_BATCH_PASSWORD).
    -P            Prompt for passphrase (used mostly for login,
                  could be privided by SSH_BATCH_PASSPHRASE).

=head1 DESCRIPTION

Please refer to L<SSH::Batch> for more documentation.

=head1 AUTHORS

=over

=item *

Zhang "agentzh" Yichun (章亦春) C<< <agentzh@gmail.com> >>

=item *

Liseen Wan (万珣新) C<< <liseen.wan@gmail.com> >>

=back

=head1 COPYRIGHT & LICENSE

This module as well as its programs are licensed under the BSD License.

Copyright (c) 2009, Yahoo! China EEEE Works, Alibaba Inc. All rights reserved.

Copyright (C) 2009, 2010, 2011, Zhang "agentzh" Yichun. All rights reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

=over

=item *

Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.

=item *

Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.

=item *

Neither the name of the Yahoo! China EEEE Works, Alibaba Inc. nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.

=back

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

=head1 SEE ALSO

L<fornodes>, L<atnodes>, L<SSH::Batch>, L<key2nodes>, L<Net::OpenSSH>.