package MooseX::Workers::Engine;
use Moose;
use POE qw(Wheel::Run);
has visitor => (
is => 'ro',
does => 'MooseX::Workers',
);
has max_workers => (
isa => 'Int',
is => 'rw',
default => sub { 5 },
);
# Processes currently running
has process_list => (
traits => [ 'Hash' ],
isa => 'HashRef',
default => sub { {} },
handles => {
set_process => 'set',
get_process => 'get',
remove_process => 'delete',
process_list => 'kv',
}
);
# Processes waiting to run
has process_queue => (
traits => [ 'Array' ],
isa => 'ArrayRef',
default => sub { [] },
handles => {
enqueue_process => 'push',
dequeue_process => 'shift',
process_queue => 'elements',
}
);
has workers => (
traits => [ 'Hash' ],
isa => 'HashRef',
is => 'rw',
lazy => 1,
required => 1,
default => sub { {} },
handles => {
set_worker => 'set',
get_worker => 'get',
remove_worker => 'delete',
has_workers => 'count',
num_workers => 'count',
get_worker_ids => 'keys',
},
);
has jobs => (
traits => [ 'Hash' ],
isa => 'HashRef',
is => 'rw',
lazy => 1,
required => 1,
default => sub { {} },
handles => {
set_job => 'set',
get_job => 'get',
remove_job => 'delete',
has_jobs => 'count',
num_jobs => 'count',
},
);
has session => (
isa => 'POE::Session',
is => 'ro',
required => 1,
lazy => 1,
default => sub {
POE::Session->create(
object_states => [
$_[0] => [
qw(
_start
_stop
_worker_stdout
_worker_stderr
_worker_error
_worker_done
_worker_started
_sig_child
add_worker
_kill_worker
)
],
],
);
},
clearer => 'remove_manager',
predicate => 'has_manager',
);
sub yield {
my $self = shift;
$poe_kernel->post( $self->session => @_ );
}
sub call {
my $self = shift;
return $poe_kernel->call( $self->session => @_ );
}
sub put_worker {
my ( $self, $wheel_id ) = splice @_, 0, 2;
$self->get_worker($wheel_id)->put(@_);
}
sub kill_worker {
my ( $self, $wheel_id ) = splice @_, 0, 2;
$self->get_worker($wheel_id)->kill(@_);
$self->remove_worker($wheel_id);
}
sub stdout_filter {
my $self = $_[OBJECT];
$self->visitor->stdout_filter;
}
sub stderr_filter {
my $self = $_[OBJECT];
$self->visitor->stderr_filter;
}
#
# EVENTS
#
sub add_worker {
my ( $self, $job, $args, $kernel, $heap ) = @_[ OBJECT, ARG0, ARG1, KERNEL, HEAP ];
# if we've reached the worker threashold, set off a warning
if ( $self->num_workers >= $self->max_workers ) {
if ( $args->{enqueue} ) {
$self->enqueue_process([$job, $args]);
return;
} else {
$self->visitor->max_workers_reached($job);
return;
}
}
my $command;
if ( blessed($job) && $job->isa('MooseX::Workers::Job') ) {
$command = $job->command;
$args = $job->args;
}
else {
$command = $job;
}
my @optional_io_filters;
push @optional_io_filters, 'StdoutFilter', $self->stdout_filter if $self->stdout_filter;
push @optional_io_filters, 'StderrFilter', $self->stderr_filter if $self->stderr_filter;
$args = [$args] unless ref $args eq 'ARRAY';
my $wheel = POE::Wheel::Run->new(
Program => $command,
ProgramArgs => $args,
@optional_io_filters,
StdoutEvent => '_worker_stdout',
StderrEvent => '_worker_stderr',
ErrorEvent => '_worker_error',
CloseEvent => '_worker_done',
);
$kernel->sig_child($wheel->PID, "_sig_child");
$self->set_worker( $wheel->ID => $wheel );
$self->set_process( $wheel->PID => $wheel->ID );
if ( blessed($job) && $job->isa('MooseX::Workers::Job') ) {
$job->ID($wheel->ID);
$job->PID($wheel->PID);
$self->set_job( $wheel->ID => $job );
if ($job->timeout) {
$heap->{wheel_to_timer}{$wheel->ID} =
$kernel->delay_set('_kill_worker', $job->timeout, $wheel->ID);
}
}
$self->yield( '_worker_started' => $wheel->ID => $job );
return ( $wheel->ID => $wheel->PID );
}
sub _kill_worker {
my ( $self, $wheel_id ) = @_[ OBJECT, ARG0 ];
my $job = $self->get_job($wheel_id);
$self->visitor->worker_timeout( $job )
if $self->visitor->can('worker_timeout');
$self->get_worker($wheel_id)->kill;
}
sub _start {
my ($self) = $_[OBJECT];
$self->visitor->worker_manager_start()
if $self->visitor->can('worker_manager_start');
# Set an alias to ensure our manager session is not cleaned up.
$_[KERNEL]->alias_set("manager");
# Register the generic signal handler for any signals our visitor
# class wishes to receive.
my @visitor_methods = map { $_->name } $self->visitor->meta->get_all_methods;
for my $sig_handler (grep { /^sig_/ } @visitor_methods){
(my $sig) = ($sig_handler =~ /^sig_(.*)/);
next if uc($sig) eq 'CHLD' or uc($sig) eq 'CHILD';
$poe_kernel->state( $sig_handler, $self, '_sig_handler' );
$poe_kernel->sig( $sig => $sig_handler );
}
}
sub _stop {
my ($self) = $_[OBJECT];
$self->visitor->worker_manager_stop()
if $self->visitor->can('worker_manager_stop');
$self->remove_manager;
}
sub _sig_child {
my ($self) = $_[OBJECT];
$self->visitor->sig_child( $self->get_process($_[ARG1]), $_[ARG2] )
if $self->visitor->can('sig_child');
$self->remove_process( $_[ARG1] );
$_[KERNEL]->sig_handled();
}
# A generic sig handler (for everything except SIGCHLD)
sub _sig_handler {
my ($self, $state) = @_[OBJECT,STATE];
$self->visitor->$state( @_[ARG0..ARG9] );
$_[KERNEL]->sig_handled();
}
sub _worker_stdout {
my ($self, $input, $wheel_id) = @_[ OBJECT, ARG0, ARG1 ];
my $job = $self->get_job($wheel_id);
$self->visitor->worker_stdout( $input, $job || $wheel_id )
if $self->visitor->can('worker_stdout');
}
sub _worker_stderr {
my ($self, $input, $wheel_id) = @_[ OBJECT, ARG0, ARG1 ];
$wheel_id =~ tr[ -~][]cd;
my $job = $self->get_job($wheel_id);
$self->visitor->worker_stderr( $input, $job || $wheel_id )
if $self->visitor->can('worker_stderr');
}
sub _worker_error {
my ($self) = $_[OBJECT];
return if $_[ARG0] eq "read" && $_[ARG1] == 0;
# $operation, $errnum, $errstr, $wheel_id
$self->visitor->worker_error( @_[ ARG0 .. ARG3 ] )
if $self->visitor->can('worker_error');
}
sub _worker_done {
my ($self, $wheel_id, $kernel, $heap) = @_[ OBJECT, ARG0, KERNEL, HEAP ];
my $job = $self->get_job($wheel_id);
$kernel->alarm_remove(delete $heap->{wheel_to_timer}{$wheel_id}) if $heap->{wheel_to_timer}{$wheel_id};
if ($self->visitor->can('worker_done')) {
if ($job) {
$self->visitor->worker_done( $job );
} else {
$self->visitor->worker_done( $wheel_id );
}
}
$self->delete_worker( $wheel_id );
# If we have free workers and processes in queue, then dequeue one of them.
while ( $self->num_workers < $self->max_workers &&
(my $jobref = $self->dequeue_process)
) {
my ($cmd, $args) = @$jobref;
# This has to be call(), not yield() so num_workers increments before
# next loop above.
$self->call(add_worker => $cmd, $args);
}
}
sub delete_worker {
my ( $self, $wheelID ) = @_;
my $wheel = $self->get_worker($wheelID);
$self->remove_worker( $wheel->ID );
}
sub _worker_started {
my ( $self, $wheel_id, $command ) = @_[ OBJECT, ARG0, ARG1 ];
my $job = $self->get_job($wheel_id);
if ($self->visitor->can('worker_started')) {
if ($job) {
$self->visitor->worker_started( $job )
} else {
$self->visitor->worker_started( $wheel_id, $command )
}
}
}
no Moose;
1;
__END__
=head1 NAME
MooseX::Workers::Engine - Provide the workhorse to MooseX::Workers
=head1 SYNOPSIS
package MooseX::Workers;
has Engine => (
isa => 'MooseX::Workers::Engine',
is => 'ro',
lazy => 1,
required => 1,
default => sub { MooseX::Workers::Engine->new( visitor => $_[0] ) },
handles => [
qw(
max_workers
has_workers
num_workers
put_worker
kill_worker
)
],
);
=head1 DESCRIPTION
MooseX::Workers::Engine provides the main functionality
to MooseX::Workers. It wraps a POE::Session and as many POE::Wheel::Run
objects as it needs.
=head1 ATTRIBUTES
=over
=item visitor
Hold a reference to our main object so we can use the callbacks on it.
=item max_workers
An Integer specifying the maximum number of workers we have.
=item workers
An ArrayRef of POE::Wheel::Run objects that are our workers.
=item session
Contains the POE::Session that controls the workers.
=back
=head1 METHODS
=over
=item yield
Helper method to post events to our internal manager session.
=item call
Helper method to call events to our internal manager session.
This is synchronous and will block incoming data from the children
if it takes too long to return.
=item set_worker($key)
Set the worker at $key
=item get_worker($key)
Retrieve the worker at $key
=item delete_worker($key)
Remove the worker atx $key
=item has_workers
Check to see if we have *any* workers currently. This is delegated to the MooseX::Workers::Engine object.
=item num_workers
Return the current number of workers. This is delegated to the MooseX::Workers::Engine object.
=item has_manager
Check to see if we have a manager session.
=item remove_manager
Remove the manager session.
=item meta
The Metaclass for MooseX::Workers::Engine see Moose's documentation.
=back
=head1 EVENTS
=over
=item add_worker ($command)
Create a POE::Wheel::Run object to handle $command. If $command holds a scalar, it will be executed as exec($scalar).
Shell metacharacters will be expanded in this form. If $command holds an array reference,
it will executed as exec(@$array). This form of exec() doesn't expand shell metacharacters.
If $command holds a code reference, it will be called in the forked child process, and then
the child will exit.
See POE::Wheel::Run for more details.
=back
=head1 INTERFACE
MooseX::Worker::Engine fires the following callbacks to its visitor object:
=over
=item worker_manager_start
Called when the managing session is started.
=item worker_manager_stop
Called when the managing session stops.
=item max_workers_reached
Called when we reach the maximum number of workers.
=item worker_stdout
Called when a child prints to STDOUT.
=item worker_stderr
Called when a child prints to STDERR.
=item worker_error
Called when there is an error condition detected with the child.
=item worker_done
Called when a worker completes $command.
=item worker_started
Called when a worker starts $command.
=item sig_child($PID, $ret)
Called when the managing session receives a SIG CHLD event.
=item sig_*
Called when the underlying POE Kernel receives a signal; this is not limited to
OS signals (ie. what you'd usually handle in Perl's %SIG) so will also accept
arbitrary POE signals (sent via POE::Kernel->signal), but does exclude
SIGCHLD/SIGCHILD, which is instead handled by sig_child above.
These interface methods are automatically inserted when MooseX::Worker::Engine
detects that the visitor object contains any methods beginning with sig_.
Signals are case-sensitive, so if you wish to handle a TERM signal, you must
define a sig_TERM() method. Note also that this action is performed upon
MooseX::Worker::Engine startup, so any run-time modification of the visitor
object is not likely to be detected.
=back
=cut
1;