@@ -1,5 +1,15 @@
Revision history for Perl extension Parallel::Prefork.
+0.17
+ - fix the broken $pm->wait_all_children with timeout
+
+0.16
+ - $pm->wait_all_children takes an optional argument specifying a timeout value (in seconds) (by karupanerura)
+
+0.15
+ - fix bug in Parallel::Prefork::SpareWorkers that did not spawn proceses up to the given maximum number
+ - fix crash in Parallel::Prefork::SpareWorkers when it receives a signal while reading the status file (by Perlover)
+
0.14
- fix doc issues
@@ -14,6 +14,8 @@ lib/Parallel/Prefork/SpareWorkers/Scoreboard.pm
Makefile.PL
MANIFEST This list of files
META.yml
+MYMETA.json
+MYMETA.yml
README
t/01-base.t
t/02-reconfigure.t
@@ -25,7 +25,8 @@ requires:
List::MoreUtils: 0
Proc::Wait3: 0.03
Scope::Guard: 0
+ Signal::Mask: 0
perl: 5.8.1
resources:
license: http://dev.perl.org/licenses/
-version: 0.14
+version: 0.17
@@ -0,0 +1,54 @@
+{
+ "abstract" : "A simple prefork server framework",
+ "author" : [
+ "Kazuho Oku"
+ ],
+ "dynamic_config" : 0,
+ "generated_by" : "Module::Install version 1.00, CPAN::Meta::Converter version 2.131560",
+ "license" : [
+ "perl_5"
+ ],
+ "meta-spec" : {
+ "url" : "http://search.cpan.org/perldoc?CPAN::Meta::Spec",
+ "version" : "2"
+ },
+ "name" : "Parallel-Prefork",
+ "no_index" : {
+ "directory" : [
+ "inc",
+ "t"
+ ]
+ },
+ "prereqs" : {
+ "build" : {
+ "requires" : {
+ "ExtUtils::MakeMaker" : "6.42",
+ "Test::Requires" : "0",
+ "Test::SharedFork" : "0"
+ }
+ },
+ "configure" : {
+ "requires" : {
+ "ExtUtils::MakeMaker" : "6.42"
+ }
+ },
+ "runtime" : {
+ "requires" : {
+ "Class::Accessor::Lite" : "0.04",
+ "List::MoreUtils" : "0",
+ "Proc::Wait3" : "0.03",
+ "Scope::Guard" : "0",
+ "Signal::Mask" : "0",
+ "perl" : "5.008001"
+ }
+ }
+ },
+ "release_status" : "stable",
+ "resources" : {
+ "license" : [
+ "http://dev.perl.org/licenses/"
+ ]
+ },
+ "version" : "0.16",
+ "x_module_name" : "Parallel::Prefork"
+}
@@ -0,0 +1,32 @@
+---
+abstract: 'A simple prefork server framework'
+author:
+ - 'Kazuho Oku'
+build_requires:
+ ExtUtils::MakeMaker: 6.42
+ Test::Requires: 0
+ Test::SharedFork: 0
+configure_requires:
+ ExtUtils::MakeMaker: 6.42
+dynamic_config: 0
+generated_by: 'Module::Install version 1.00, CPAN::Meta::Converter version 2.131560'
+license: perl
+meta-spec:
+ url: http://module-build.sourceforge.net/META-spec-v1.4.html
+ version: 1.4
+name: Parallel-Prefork
+no_index:
+ directory:
+ - inc
+ - t
+requires:
+ Class::Accessor::Lite: 0.04
+ List::MoreUtils: 0
+ Proc::Wait3: 0.03
+ Scope::Guard: 0
+ Signal::Mask: 0
+ perl: 5.008001
+resources:
+ license: http://dev.perl.org/licenses/
+version: 0.16
+x_module_name: Parallel::Prefork
@@ -7,6 +7,7 @@ requires 'Class::Accessor::Lite' => 0.04;
requires 'List::MoreUtils';
requires 'Proc::Wait3' => 0.03;
requires 'Scope::Guard';
+requires 'Signal::Mask';
test_requires 'Test::Requires';
test_requires 'Test::SharedFork';
@@ -83,9 +83,11 @@ METHODS
signal_all_children
Sends signal to all worker processes. Only usable from manager process.
- wait_all_children
- Blocks until all worker processes exit. Only usable from manager
- process.
+ wait_all_children()
+ wait_all_children($timeout)
+ Waits until all worker processes exit or timeout (given as an optional
+ argument in seconds) exceeds. The method returns the number of the
+ worker processes still running.
AUTHOR
Kazuho Oku
@@ -7,6 +7,7 @@ use Fcntl qw(:DEFAULT :flock);
use File::Temp qw();
use POSIX qw(SEEK_SET);
use Scope::Guard;
+use Signal::Mask;
use Parallel::Prefork::SpareWorkers qw(:status);
@@ -37,6 +38,8 @@ sub new {
}
sub get_statuses {
+ local ($Signal::Mask{CHLD}, $Signal::Mask{TERM}, $Signal::Mask{INT}) = (1, 1, 1);
+
my $self = shift;
sysseek $self->{fh}, 0, SEEK_SET
or die "seek failed:$!";
@@ -49,6 +52,8 @@ sub get_statuses {
}
sub clear_child {
+ local ($Signal::Mask{CHLD}, $Signal::Mask{TERM}, $Signal::Mask{INT}) = (1, 1, 1);
+
my ($self, $pid) = @_;
my $lock = $self->_lock_file;
sysseek $self->{fh}, 0, SEEK_SET
@@ -70,6 +75,8 @@ sub clear_child {
}
sub child_start {
+ local ($Signal::Mask{CHLD}, $Signal::Mask{TERM}, $Signal::Mask{INT}) = (1, 1, 1);
+
my $self = shift;
die "child_start cannot be called twite"
if defined $self->{slot};
@@ -18,7 +18,7 @@ our %EXPORT_TAGS = (
our @EXPORT_OK = uniq sort map { @$_ } values %EXPORT_TAGS;
$EXPORT_TAGS{all} = \@EXPORT_OK;
-__PACKAGE__->mk_accessors(qw/min_spare_workers max_spare_workers scoreboard/);
+__PACKAGE__->mk_accessors(qw/min_spare_workers max_spare_workers scoreboard heartbeat/);
sub new {
my $klass = shift;
@@ -26,6 +26,7 @@ sub new {
die "mandatory option min_spare_workers not set"
unless $self->{min_spare_workers};
$self->{max_spare_workers} ||= $self->max_workers;
+ $self->{heartbeat} ||= 0.25;
$self->{scoreboard} ||= do {
require 'Parallel/Prefork/SpareWorkers/Scoreboard.pm';
Parallel::Prefork::SpareWorkers::Scoreboard->new(
@@ -76,6 +77,11 @@ sub _on_child_reap {
$self->scoreboard->clear_child($exit_pid);
}
+sub _max_wait {
+ my $self = shift;
+ return $self->{heartbeat};
+}
+
1;
__END__
@@ -133,6 +139,10 @@ minimum number of spare workers (mandatory)
maxmum number of spare workers (default: max_workers)
+=head3 heartbeat
+
+a fractional period (in seconds) of child amount checking. Do not use very small numbers to avoid frequent use of CPU (default: 0.25)
+
=head3 scoreboard_file
filename of scoreboard. If not set, C<Parallel::Prefork::SpareWorkers> will create a temporary file.
@@ -14,7 +14,7 @@ use Class::Accessor::Lite (
rw => [ qw/max_workers spawn_interval err_respawn_interval trap_signals signal_received manager_pid on_child_reap before_fork after_fork/ ],
);
-our $VERSION = '0.14';
+our $VERSION = '0.17';
sub new {
my $klass = shift;
@@ -189,15 +189,37 @@ sub _action_for {
}
sub wait_all_children {
- my $self = shift;
+ my ($self, $timeout) = @_;
$self->{_no_adjust_until} = undef;
- while (%{$self->{worker_pids}}) {
- if (my ($pid) = $self->_wait(1)) {
- if (delete $self->{worker_pids}{$pid}) {
- $self->_on_child_reap($pid, $?);
+
+ my $wait_loop = sub {
+ while (%{$self->{worker_pids}}) {
+ if (my ($pid) = $self->_wait(1)) {
+ if (delete $self->{worker_pids}{$pid}) {
+ $self->_on_child_reap($pid, $?);
+ }
}
}
+ };
+
+ if ($timeout) {
+ local $@;
+ my $is_timeout = 0;
+ eval {
+ local $SIG{ALRM} = sub {
+ $is_timeout = 1;
+ die "timeout";
+ };
+ alarm($timeout);
+ $wait_loop->();
+ alarm(0);
+ };
+ die $@
+ if $@ && ! $is_timeout;
+ } else {
+ $wait_loop->();
}
+ return $self->num_workers;
}
sub _update_spawn_delay {
@@ -220,6 +242,7 @@ sub _wait {
my $sleep_secs = min grep { defined $_ } (
$delayed_task_sleep,
$delayed_fork_sleep,
+ $self->_max_wait(),
);
if (defined $sleep_secs) {
# wait max sleep_secs or until signalled
@@ -236,6 +259,10 @@ sub _wait {
}
}
+sub _max_wait {
+ return undef;
+}
+
1;
__END__
@@ -321,9 +348,12 @@ Child processes (when executed by a zero-argument call to C<start>) should call
Sends signal to all worker processes. Only usable from manager process.
-=head2 wait_all_children
+=head2 wait_all_children()
+
+=head2 wait_all_children($timeout)
-Blocks until all worker processes exit. Only usable from manager process.
+Waits until all worker processes exit or timeout (given as an optional argument in seconds) exceeds.
+The method returns the number of the worker processes still running.
=head1 AUTHOR
@@ -2,7 +2,6 @@ use strict;
use warnings;
use File::Temp qw();
-use Time::HiRes qw(sleep);
use Test::More tests => 8;
use_ok('Parallel::Prefork::SpareWorkers');
@@ -21,31 +20,37 @@ my $pm = Parallel::Prefork::SpareWorkers->new({
is $pm->num_active_workers, 0, 'no active workers';
my @tests = (
- wait_and_test(
- sub {
- is $pm->num_workers, 3, 'min_spare_workers';
- is $pm->num_active_workers, 0, 'no active workers';
- open my $fh, '>', "$tempdir/active"
- or die "failed to touch file $tempdir/active:$!";
- close $fh;
- },
- ),
- wait_and_test(
- sub {
- is $pm->num_workers, 10, 'max_workers';
- is $pm->num_active_workers, 10, 'all workers active';
- unlink "$tempdir/active"
- or die "failed to unlink file $tempdir/active:$!";
- },
- ),
- wait_and_test(
- sub {
- is $pm->num_workers, 5, 'max_spare_workers';
- is $pm->num_active_workers, 0, 'no active workers';
- },
- ),
+ sub {
+ is $pm->num_workers, 3, 'min_spare_workers';
+ is $pm->num_active_workers, 0, 'no active workers';
+ open my $fh, '>', "$tempdir/active"
+ or die "failed to touch file $tempdir/active:$!";
+ close $fh;
+ },
+ sub {
+ is $pm->num_workers, 10, 'max_workers';
+ is $pm->num_active_workers, 10, 'all workers active';
+ unlink "$tempdir/active"
+ or die "failed to unlink file $tempdir/active:$!";
+ },
+ sub {
+ is $pm->num_workers, 5, 'max_spare_workers';
+ is $pm->num_active_workers, 0, 'no active workers';
+ },
);
-next_test();
+
+my $SLEEP_SECS = 3; # 1 second until all clients update their state, plus 10 invocations to min/max the process, plus 1 second bonus
+
+$SIG{ALRM} = sub {
+ my $test = shift @tests;
+ $test->();
+ if (@tests) {
+ alarm $SLEEP_SECS;
+ } else {
+ $pm->signal_received('TERM');
+ }
+};
+alarm $SLEEP_SECS;
while ($pm->signal_received ne 'TERM') {
$pm->start and next;
@@ -59,24 +64,3 @@ while ($pm->signal_received ne 'TERM') {
}
$pm->wait_all_children;
-
-sub wait_and_test {
- my $check_func = shift;
- my $cnt = 0;
- return sub {
- sleep 0.1;
- $cnt++;
- return if $cnt < 30; # 1 second until all clients update their state, plus 10 invocations to min/max the process, plus 1 second bonus
- $check_func->();
- next_test();
- };
-}
-
-sub next_test {
- if (@tests) {
- $pm->{__dbg_callback} = shift @tests;
- } else {
- $pm->{__dbg_callback} = sub {};
- $pm->signal_received('TERM');
- }
-}