The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Gearman::Driver::Job;

use Moose;
use Gearman::Driver::Adaptor;
use POE qw(Wheel::Run);

=head1 NAME

Gearman::Driver::Job - Handles the POE magic

=head1 DESCRIPTION

This class is responsible for starting/stopping processes as well as
handling all pipes (STDOUT/STDERR/STDIN) of the processes. All events
are written to a logfile. Possible events are:

=over 4

=item * Starting processes

=item * STDOUT of processes

=item * STDERR of processes

=item * Stopping processes

=back

The current interface may only be interesting for people subclassing
L<Gearman::Driver> or for people writing commands/extensions for
L<Gearman::Driver::Console>.

=head1 ATTRIBUTES

=head2 driver

Reference to the L<Gearman::Driver> instance.

=cut

has 'driver' => (
    handles  => { log => 'log' },
    is       => 'rw',
    isa      => 'Gearman::Driver',
    required => 1,
    weak_ref => 1,
);

=head2 name

The job's name.

=cut

has 'name' => (
    is       => 'rw',
    isa      => 'Str',
    required => 1,
);

=head2 methods

ArrayRef of L<Gearman::Driver::Job::Method> objects.

=cut

has 'methods' => (
    is       => 'rw',
    isa      => 'ArrayRef[Gearman::Driver::Job::Method]',
    required => 1,
);

=head2 max_processes

Maximum number of concurrent processes this job may have.

=cut

has 'max_processes' => (
    default  => 1,
    is       => 'rw',
    isa      => 'Int',
    required => 1,
);

=head2 min_processes

Minimum number of concurrent processes this job may have.

=cut

has 'min_processes' => (
    default  => 1,
    is       => 'rw',
    isa      => 'Int',
    required => 1,
);

=head2 processes

This attribute stores a key/value pair containing:
C<$pid> => L<$job|Gearman::Driver::Job>

It provides following methods:

=over 4

=item * C<count_processes()>

=item * C<delete_process($pid)>

=item * C<get_process($pid)>

=item * C<get_processes()>

=item * C<get_pids()>

=item * C<set_process($pid => $job)>

=back

=cut

has 'processes' => (
    default => sub { {} },
    handles => {
        count_processes => 'count',
        delete_process  => 'delete',
        get_process     => 'get',
        get_processes   => 'values',
        get_pids        => 'keys',
        set_process     => 'set',
    },
    is     => 'ro',
    isa    => 'HashRef',
    traits => [qw(Hash)],
);

=head2 gearman

Instance of L<Gearman::Driver::Adaptor>.

=cut

has 'gearman' => (
    is  => 'ro',
    isa => 'Gearman::Driver::Adaptor',
);

=head2 session

Instance of L<POE::Session>.

=cut

has 'session' => (
    is  => 'ro',
    isa => 'POE::Session',
);

=head2 lastrun

Each time this job is called it stores C<time()> in this attribute.

=cut

has 'lastrun' => (
    default => 0,
    is      => 'rw',
    isa     => 'Int',
);

=head2 lasterror

Each time this job failed it stores C<time()> in this attribute.

=cut

has 'lasterror' => (
    default => 0,
    is      => 'rw',
    isa     => 'Int',
);

=head2 lasterror_msg

Each time this job failed it stores the error message in this
attribute.

=cut

has 'lasterror_msg' => (
    default => '',
    is      => 'rw',
    isa     => 'Str',
);

=head2 worker

Reference to the worker object.

=cut

has 'worker' => (
    is       => 'rw',
    isa      => 'Any',
    required => 1,
);

=head1 METHODS

=head2 add_process

Starts/forks/adds another process of this job.

=cut

sub add_process {
    my ($self) = @_;
    POE::Kernel->post( $self->session => 'add_process' );
}

=head2 remove_process

Removes/kills one process of this job.

=cut

sub remove_process {
    my ($self) = @_;
    POE::Kernel->post( $self->session => 'remove_process' );
}

sub BUILD {
    my ($self) = @_;

    $self->{gearman} = Gearman::Driver::Adaptor->new( server => $self->driver->server );

    foreach my $method ( @{ $self->methods } ) {
        $self->gearman->add_function( $method->name => $method->wrapper );
    }

    $self->{session} = POE::Session->create(
        object_states => [
            $self => {
                _start             => '_start',
                got_process_stdout => '_on_process_stdout',
                got_process_stderr => '_on_process_stderr',
                got_process_close  => '_on_process_close',
                got_process_signal => '_on_process_signal',
                add_process        => '_add_process',
                remove_process     => '_remove_process',
            }
        ]
    );
}

sub _start {
    $_[KERNEL]->alias_set( $_[OBJECT]->name );
}

sub _add_process {
    my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
    my $process = POE::Wheel::Run->new(
        Program => sub {
            POE::Kernel->stop();

            if ( my $process_name = $self->worker->process_name( $0, $self->name ) ) {
                $0 = $process_name;
            }

            $self->gearman->work;
        },
        StdoutEvent => "got_process_stdout",
        StderrEvent => "got_process_stderr",
        CloseEvent  => "got_process_close",
    );
    $kernel->sig_child( $process->PID, "got_process_signal" );

    # Wheel events include the wheel's ID.
    $heap->{wheels}{ $process->ID } = $process;

    $self->log->info( sprintf '(%d) [%s] Process started', $process->PID, $self->name );

    $self->set_process( $process->PID => $process );
}

sub _remove_process {
    my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
    my ($pid) = ( $self->get_pids )[0];
    return unless $pid;
    my $process = $self->delete_process($pid);
    $process->kill();
    $self->log->info( sprintf '(%d) [%s] Process killed', $process->PID, $self->name );
}

sub _on_process_stdout {
    my ( $self, $heap, $stdout, $wid ) = @_[ OBJECT, HEAP, ARG0, ARG1 ];
    my $process = $heap->{wheels}{$wid};
    my ( $attr, $value ) = $stdout =~ /^(\w+) (.*?)$/;
    return if !defined $attr || !defined $value;
    $self->$attr($value) if $self->can($attr);
}

sub _on_process_stderr {
    my ( $self, $heap, $stderr, $wid ) = @_[ OBJECT, HEAP, ARG0, ARG1 ];
    my $process = $heap->{wheels}{$wid};
    $self->log->info( sprintf '(%d) [%s] STDERR: %s', $process->PID, $self->name, $stderr );
}

sub _on_process_close {
    my ( $self, $heap, $wid ) = @_[ OBJECT, HEAP, ARG0 ];

    my $process = delete $heap->{wheels}{$wid};

    # May have been reaped by got_process_signal
    return unless defined $process;

    $self->delete_process( $process->PID );
}

sub _on_process_signal {
    my ( $self, $heap, $pid, $status ) = @_[ OBJECT, HEAP, ARG1 .. ARG2 ];

    my $process = $self->delete_process($pid);

    $self->log->info( sprintf '(%d) [%s] Exited with status %s', $pid, $self->name, $status );

    # May have been reaped by got_process_close
    return unless defined $process;

    delete $heap->{wheels}{ $process->ID };
}

no Moose;

__PACKAGE__->meta->make_immutable;

=head1 AUTHOR

See L<Gearman::Driver>.

=head1 COPYRIGHT AND LICENSE

See L<Gearman::Driver>.

=head1 SEE ALSO

=over 4

=item * L<Gearman::Driver>

=item * L<Gearman::Driver::Adaptor>

=item * L<Gearman::Driver::Console>

=item * L<Gearman::Driver::Console::Basic>

=item * L<Gearman::Driver::Console::Client>

=item * L<Gearman::Driver::Job::Method>

=item * L<Gearman::Driver::Loader>

=item * L<Gearman::Driver::Observer>

=item * L<Gearman::Driver::Worker>

=back

=cut

1;