The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package PJob::Server;
our $VERSION = '0.41';

our $ALIAS = "POE JOB SERVER, Version: $VERSION";

use Any::Moose;
use Data::Dumper;
use POSIX qw/strftime/;
use Scalar::Util qw/reftype/;
use List::Util qw/first/;
use List::MoreUtils qw/uniq/;
use POE qw/Component::Server::TCP Wheel::Run/;
#use Smart::Comments;

use constant {
    OUTPUT    => 'Out',
    ERROR     => 'Err',
    NOSUCHJOB => 'No Such A Job',
    NOMORECON => 'Sorry, no more connection on this server',
    NOTALLOWD => 'Sorry, you are not allowed on this server',
    NOCLIEJOB => 'Sorry, no job found for you on this server',
};

has 'jobs' => (
    is      => 'rw',
    isa     => 'HashRef',
    default => sub { {} },
);

has 'port' => (
    is      => 'rw',
    isa     => 'Int',
    default => '32080',
);

has 'logfile' => (
    is  => 'rw',
    isa => 'Str',
);

has 'log_commands' => (
    is      => 'rw',
    isa     => 'Bool',
    default => 0,
);

has 'job_table' => (
    is      => 'rw',
    isa     => 'HashRef',
    default => sub { {} },
);

has '_dispatched' => (
    is      => 'rw',
    isa     => 'Bool',
    default => 0,
);

has '_pid' => (
    is      => 'rw',
    default => sub { {} },
);

has 'allowed_hosts' => (
    is      => 'rw',
    isa     => 'ArrayRef',
    default => sub { [] },
);

has 'max_connections' => (
    is      => 'rw',
    isa     => 'Int',
    default => '-1',
);

has '_interactive' => (
    is      => 'rw',
    isa     => 'ArrayRef',
    default => sub { [] },
);

# add programs to the job server
sub add {
    my ($self, @programs) = @_;

    foreach my $p (@programs) {
        if (reftype $p && reftype $p eq 'HASH') {
            $self->jobs({%{$self->jobs}, %{$p}});
            next;
        }
        elsif (!reftype $p) {
            $self->jobs({%{$self->jobs}, $p => $p});
        }
    }
    return $self;
}

sub add_interactive {
    my ($self, @programs) = @_;
    foreach my $p (@programs) {
        if (reftype $p && reftype $p eq 'HASH') {
            $self->jobs({%{$self->jobs}, %{$p}});
            push @{$self->_interactive}, keys %{$p};
            next;
        }
        elsif (!reftype $p) {
            $self->jobs({%{$self->jobs}, $p => $p});
            push @{$self->_interactive}, $p;
        }
    }
    return $self;
}

# run the job server
sub run {
    my $self = shift;

    $self->_check_jobs;
    $self->_append_jobs;
    $self->_log_redirect;
    $self->{_clients} = 0;
    $self->{_session} = POE::Component::Server::TCP->new(
        Alias              => $ALIAS,
        Port               => $self->port,
        ClientInput        => sub { $self->_spawn(@_) },
        ClientConnected    => sub { $self->_client_connect(@_) },
        ClientDisconnected => sub { $self->_client_disconnected(@_) },
        InlineStates       => {
            job_stdout => sub { $self->send_to_client(OUTPUT, @_) },
            job_stderr => sub { $self->send_to_client(ERROR,  @_) },
            job_close  => sub { $self->_close(@_) },
            job_signal => sub { $self->_sigchld(@_) },
            usage      => sub { $self->_usage(@_) },
        }
    );
    $self->log(*STDOUT, "Started $ALIAS at Port: " . $self->port . "\n");
    POE::Kernel->run();
    return $self;
}

# print usage information
sub _usage {
    my $self         = shift;
    my $client       = $_[HEAP]->{client};
    my $remote_ip    = $_[HEAP]->{remote_ip};
    my $allowed_jobs = $self->job_table->{$remote_ip};    # jobs for this ip

    my $usage_str;

    #dispatched ? fetch from dispatched table, or else fetch from defined jobs
    if ($self->_dispatched) {
        if (@{$allowed_jobs}) {
            $usage_str = 'Usage: ' . join ' ', sort @{$allowed_jobs};
        }
        else {
            $usage_str = ERROR . "\t" . NOCLIEJOB;
            $client->put($usage_str);
            $_[KERNEL]->yield("shutdown");
        }
    }
    else {
        $usage_str = 'Usage: ' . join ' ', sort keys %{$self->jobs};
    }
    $client->put($usage_str);
    $client->put('.');
}

# run the program
sub _spawn {
    my $self = shift;
    my ($heap, $input) = @_[HEAP, ARG0];
    my $client      = $heap->{client};
    my $remote_ip   = $heap->{remote_ip};
    my $remote_port = $heap->{remote_port};

    if ($heap->{job}->{$client}) {
        $heap->{job}->{$client}->put($input);
        return;
    }
    if ($input =~ /^quit$/i) {
        $client->put("B'bye!");
        $_[KERNEL]->yield("shutdown");
        return;
    }

    if ($input =~ /^usage$/i) {
        $_[KERNEL]->yield('usage');
        return;
    }

  #dispatched, fetch jobs from dispatched job table, or else from defined jobs
    my $program;
    if ($self->_dispatched) {
        $program = first { $_ eq $input } $self->job_table->{$remote_ip};
    }
    else {
        $program = $self->jobs->{$input};
    }

    unless (defined $program) {
        $client->put(ERROR . "\t" . NOSUCHJOB);
        $_[KERNEL]->yield("usage");
        return;
    }

    my $interactive = 1 if first { $_ eq $input } @{$self->_interactive};

    $self->log(*STDOUT, "$remote_ip:$remote_port : $program  \n")
      if $self->log_commands;

    my $kid = POE::Wheel::Run->new(
        Program     => $program,
        StdoutEvent => 'job_stdout',
        StderrEvent => 'job_stderr',
        CloseEvent  => 'job_close',
        Conduit     => $interactive ? 'pty' : 'pipe',
    );

    $heap->{job}->{$client} = $kid;

    #just the program is enough right now. Feature can be added if necessary
    $self->_pid->{$kid->PID} = $program;
    $_[KERNEL]->sig_child($kid->PID, "job_signal");
    $client->put("Job $program :::" . $kid->PID . " started.");
}

# send information to client
sub send_to_client {
    my $self = shift;
    my $mark = shift;

    $_[HEAP]->{client}->put($mark . "\t" . $_[ARG0]);
}

# not sure if it is needed
sub error_event {
    my $self = shift;

#    my($oper,$errno,$errmsg) = @_[ARG0,ARG1,ARG2];
#    $_[HEAP]->{client}->put("Error: $oper failed, message-- $errmsg");
}

# _sigchld, delete Wheels stored in the HEAP and object
sub _sigchld {
    my $self = shift;
    my ($pid, $exit) = @_[ARG1, ARG2];
    my $program = $self->_pid->{$pid};

    if ($exit != 0) {
        $exit >>= 8;
    }
    $_[HEAP]->{client}->put("Job $program :::$pid exited with status $exit");
    $_[HEAP]->{client}->put('.');
    delete $_[HEAP]->{job}->{$_[HEAP]->{client}};
    delete $self->_pid->{$pid};
}

# not sure we need this or not
sub _close {

#    my $self = shift;

#    delete $_[HEAP]->{job};
}

# connected, check max connections, check allowed hosts, yield usage and log information
sub _client_connect {
    my $self = shift;
    my ($kernel, $heap) = @_[KERNEL, HEAP];

    my $remote_ip   = $heap->{remote_ip};
    my $remote_port = $heap->{remote_port};
    my $allow_hosts = $self->allowed_hosts;

    # reached max connection
    if ($self->max_connections > 0) {
        if ($self->{_clients} >= $self->max_connections) {
            $self->send_to_client(ERROR, NOMORECON);
            $kernel->yield('shutdown');
            return;
        }
    }
    $self->{_clients}++;

    # not allowed on this server
    if (@{$allow_hosts} || $self->_dispatched) {
        if (!first { $remote_ip eq $_ } @{$allow_hosts},
            keys %{$self->job_table})
        {
            $heap->{client}->put(ERROR . "\t" . NOTALLOWD);
            $kernel->yield('shutdown');
            return;
        }
    }

    # allowed server
    $kernel->yield('usage');
    $self->log(*STDOUT,
        "CONNECTION FROM ${remote_ip}:${remote_port} ESTABLISHED\n");
}

# when disconnected, log it and reduce clients number by 1
sub _client_disconnected {
    my $self        = shift;
    my $remote_ip   = $_[HEAP]->{remote_ip};
    my $remote_port = $_[HEAP]->{remote_port};

    $self->log(*STDERR, "DISCONNECTED FORM ${remote_ip}:${remote_port} \n");
    $self->{_clients}--;
}

# open log file and redirect stdout/stdin to it
sub _log_redirect {
    my $self = shift;

    if ($self->logfile) {
        open STDOUT, '>>', $self->logfile or die $!;
        open STDERR, ">&STDOUT" or die $!;
    }
}

# simple log method
sub log {
    my $self = shift;
    my ($fh, $output) = @_;
    chomp $output;

    return unless $output;
    my $now = strftime "%y/%m/%d %H:%M:%S", localtime;
    print $fh "$now\t$output\n";
}

# disaptch jobs to job table
sub job_dispatch {
    my ($self, %table) = @_;

    $self->_dispatched(1);
    foreach my $host (keys %table) {
        foreach (@{$table{$host}}) {
            if (reftype $_ && reftype $_ eq 'HASH') {
                $self->add($_);
                push @{$self->job_table->{$host}}, keys %$_;
                next;
            }
            elsif (!reftype $_) {
                if (exists ${$self->jobs}{$_}) {
                    push @{$self->job_table->{$host}}, $_;
                }
                else {
                    $self->log(*STDERR, "no program '$_' found in the jobs");
                }
            }
        }
    }

    my $comm_jobs = $self->job_table->{'*'};
    return unless $comm_jobs;

    foreach my $key (keys %{$self->job_table}) {
        my @all = uniq @{$self->job_table->{$key}}, @{$comm_jobs};
        $self->job_table->{$key} = [@all];
    }
}

# Called before start the server. Dispatch the jobs for $self->allowed_hosts
sub _append_jobs {
    my $self = shift;

    my $comm_jobs = delete $self->job_table->{'*'};
    foreach my $host (@{$self->allowed_hosts}) {
        my @all = uniq @{$self->job_table->{$host}}, @{$comm_jobs};
        $self->job_table->{$host} = [@all];
    }
}

# don't set up any jobs like 'usage/quit'
sub _check_jobs {
    my $self = shift;

    if (my $c = first { $_ =~ /^usage|quit$/i } keys %{$self->jobs}) {
        $self->log(*STDERR, "'$c' is defined by default, choose another one");
        exit 1;
    }
}

#
#sub _ {
#    my $output = shift;
#    $output = '\.' if $output =~ /^\.$/;
#    return $output;
#}

no Any::Moose;
__PACKAGE__->meta->make_immutable;
1;
__END__

=pod

=head1 NAME

PJob::Server --- Simple POE Job Server 

=head1 VERSION

This document describes version 0.41 of PJob::Server

=head1 SYNOPSIS

    use PJob::Server;
    my $server = PJob::Server->new(port => '10086');
    $server->logfile('./.logfile');
    $server->log_commands(1);
    $server->add({ls => 'ls ~/', run => 'perl ~/test.pl'},'ls');
    $server->run();

=head1 DESCRIPTION
    
PJob::Server is a simple POE Job Server module, it provides you some api to write a job server very quickly.

=over 

=item B<new>

Create a PJob::Server object. The available arguments are:
    
    port            : port to listen. 32800 by default
    jobs            : available programs
    logfile         : log file name
    log_commands    : log the command runed by the client
    allowed_hosts   : hosts' ip that are allowed to run job on this server
    max_connections : default is set to -1, means no limit

=item B<add>

Add some programs, it receive both hashref and scalar. The key of the hashref is alias of the program. when scalar, the alias and the program have the same value.

=item B<add_interactive>

Add interactive programs. Use Conduit => 'pty' in POE::Wheel::Run to convince the programs that they are interacting with terminals.

=item B<job_dispatch>

    $ser->job_dispatch('127.0.0.1' => [qw/ls ps/,{cat => 'cat file'}], '*' => ['pwd']);
    
Dispatch available job to clients, receive client ip and its job table. * means common jobs which can be dispatched to all clients. Job table should be a arrayref. Jobs must be defined by $self->add except the situation that the element is a hashref, at this time, $self->add is called to add new job. 

Any hosts dispatched with job_dispatch is considered to be an allowed host.

=item B<run>

Run the server, no arguments needed.

=item B<quit/usage>

Use 'quit' to disconnect with the server. Use 'usage' to get avaiable commands

=back

=head1 SEE ALSO
    
L<PJob::Client>,L<POE::Component::Server::TCP>,L<POE::Wheel::Run>,L<Any::Moose>

=head1 AUTHOR

woosley.xu<woosley.xu@gmail.com>

=head1 COPYRIGHT & LICENSE

This software is copyright (c) 2009 by woosley.xu.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.