The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
Changes 010
MANIFEST 02
META.yml 12
MYMETA.json 054
MYMETA.yml 032
Makefile.PL 01
README 35
lib/Parallel/Prefork/SpareWorkers/Scoreboard.pm 07
lib/Parallel/Prefork/SpareWorkers.pm 111
lib/Parallel/Prefork.pm 838
t/03-spareworkers.t 4630
11 files changed (This is a version diff) 59192
@@ -1,5 +1,15 @@
 Revision history for Perl extension Parallel::Prefork.
 
+0.17
+	- fix the broken $pm->wait_all_children with timeout
+
+0.16
+	- $pm->wait_all_children takes an optional argument specifying a timeout value (in seconds) (by karupanerura)
+
+0.15
+	- fix bug in Parallel::Prefork::SpareWorkers that did not spawn proceses up to the given maximum number
+	- fix crash in Parallel::Prefork::SpareWorkers when it receives a signal while reading the status file (by Perlover)
+
 0.14
 	- fix doc issues
 
@@ -14,6 +14,8 @@ lib/Parallel/Prefork/SpareWorkers/Scoreboard.pm
 Makefile.PL
 MANIFEST			This list of files
 META.yml
+MYMETA.json
+MYMETA.yml
 README
 t/01-base.t
 t/02-reconfigure.t
@@ -25,7 +25,8 @@ requires:
   List::MoreUtils: 0
   Proc::Wait3: 0.03
   Scope::Guard: 0
+  Signal::Mask: 0
   perl: 5.8.1
 resources:
   license: http://dev.perl.org/licenses/
-version: 0.14
+version: 0.17
@@ -0,0 +1,54 @@
+{
+   "abstract" : "A simple prefork server framework",
+   "author" : [
+      "Kazuho Oku"
+   ],
+   "dynamic_config" : 0,
+   "generated_by" : "Module::Install version 1.00, CPAN::Meta::Converter version 2.131560",
+   "license" : [
+      "perl_5"
+   ],
+   "meta-spec" : {
+      "url" : "http://search.cpan.org/perldoc?CPAN::Meta::Spec",
+      "version" : "2"
+   },
+   "name" : "Parallel-Prefork",
+   "no_index" : {
+      "directory" : [
+         "inc",
+         "t"
+      ]
+   },
+   "prereqs" : {
+      "build" : {
+         "requires" : {
+            "ExtUtils::MakeMaker" : "6.42",
+            "Test::Requires" : "0",
+            "Test::SharedFork" : "0"
+         }
+      },
+      "configure" : {
+         "requires" : {
+            "ExtUtils::MakeMaker" : "6.42"
+         }
+      },
+      "runtime" : {
+         "requires" : {
+            "Class::Accessor::Lite" : "0.04",
+            "List::MoreUtils" : "0",
+            "Proc::Wait3" : "0.03",
+            "Scope::Guard" : "0",
+            "Signal::Mask" : "0",
+            "perl" : "5.008001"
+         }
+      }
+   },
+   "release_status" : "stable",
+   "resources" : {
+      "license" : [
+         "http://dev.perl.org/licenses/"
+      ]
+   },
+   "version" : "0.16",
+   "x_module_name" : "Parallel::Prefork"
+}
@@ -0,0 +1,32 @@
+---
+abstract: 'A simple prefork server framework'
+author:
+  - 'Kazuho Oku'
+build_requires:
+  ExtUtils::MakeMaker: 6.42
+  Test::Requires: 0
+  Test::SharedFork: 0
+configure_requires:
+  ExtUtils::MakeMaker: 6.42
+dynamic_config: 0
+generated_by: 'Module::Install version 1.00, CPAN::Meta::Converter version 2.131560'
+license: perl
+meta-spec:
+  url: http://module-build.sourceforge.net/META-spec-v1.4.html
+  version: 1.4
+name: Parallel-Prefork
+no_index:
+  directory:
+    - inc
+    - t
+requires:
+  Class::Accessor::Lite: 0.04
+  List::MoreUtils: 0
+  Proc::Wait3: 0.03
+  Scope::Guard: 0
+  Signal::Mask: 0
+  perl: 5.008001
+resources:
+  license: http://dev.perl.org/licenses/
+version: 0.16
+x_module_name: Parallel::Prefork
@@ -7,6 +7,7 @@ requires 'Class::Accessor::Lite' => 0.04;
 requires 'List::MoreUtils';
 requires 'Proc::Wait3' => 0.03;
 requires 'Scope::Guard';
+requires 'Signal::Mask';
 test_requires 'Test::Requires';
 test_requires 'Test::SharedFork';
 
@@ -83,9 +83,11 @@ METHODS
   signal_all_children
     Sends signal to all worker processes. Only usable from manager process.
 
-  wait_all_children
-    Blocks until all worker processes exit. Only usable from manager
-    process.
+  wait_all_children()
+  wait_all_children($timeout)
+    Waits until all worker processes exit or timeout (given as an optional
+    argument in seconds) exceeds. The method returns the number of the
+    worker processes still running.
 
 AUTHOR
     Kazuho Oku
@@ -7,6 +7,7 @@ use Fcntl qw(:DEFAULT :flock);
 use File::Temp qw();
 use POSIX qw(SEEK_SET);
 use Scope::Guard;
+use Signal::Mask;
 
 use Parallel::Prefork::SpareWorkers qw(:status);
 
@@ -37,6 +38,8 @@ sub new {
 }
 
 sub get_statuses {
+    local ($Signal::Mask{CHLD}, $Signal::Mask{TERM}, $Signal::Mask{INT}) = (1, 1, 1);
+
     my $self = shift;
     sysseek $self->{fh}, 0, SEEK_SET
         or die "seek failed:$!";
@@ -49,6 +52,8 @@ sub get_statuses {
 }
 
 sub clear_child {
+    local ($Signal::Mask{CHLD}, $Signal::Mask{TERM}, $Signal::Mask{INT}) = (1, 1, 1);
+
     my ($self, $pid) = @_;
     my $lock = $self->_lock_file;
     sysseek $self->{fh}, 0, SEEK_SET
@@ -70,6 +75,8 @@ sub clear_child {
 }
 
 sub child_start {
+    local ($Signal::Mask{CHLD}, $Signal::Mask{TERM}, $Signal::Mask{INT}) = (1, 1, 1);
+
     my $self = shift;
     die "child_start cannot be called twite"
         if defined $self->{slot};
@@ -18,7 +18,7 @@ our %EXPORT_TAGS = (
 our @EXPORT_OK = uniq sort map { @$_ } values %EXPORT_TAGS;
 $EXPORT_TAGS{all} = \@EXPORT_OK;
 
-__PACKAGE__->mk_accessors(qw/min_spare_workers max_spare_workers scoreboard/);
+__PACKAGE__->mk_accessors(qw/min_spare_workers max_spare_workers scoreboard heartbeat/);
 
 sub new {
     my $klass = shift;
@@ -26,6 +26,7 @@ sub new {
     die "mandatory option min_spare_workers not set"
         unless $self->{min_spare_workers};
     $self->{max_spare_workers} ||= $self->max_workers;
+    $self->{heartbeat} ||= 0.25;
     $self->{scoreboard} ||= do {
         require 'Parallel/Prefork/SpareWorkers/Scoreboard.pm';
         Parallel::Prefork::SpareWorkers::Scoreboard->new(
@@ -76,6 +77,11 @@ sub _on_child_reap {
     $self->scoreboard->clear_child($exit_pid);
 }
 
+sub _max_wait {
+    my $self = shift;
+    return $self->{heartbeat};
+}
+
 1;
 __END__
 
@@ -133,6 +139,10 @@ minimum number of spare workers (mandatory)
 
 maxmum number of spare workers (default: max_workers)
 
+=head3 heartbeat
+
+a fractional period (in seconds) of child amount checking. Do not use very small numbers to avoid frequent use of CPU (default: 0.25)
+
 =head3 scoreboard_file
 
 filename of scoreboard.  If not set, C<Parallel::Prefork::SpareWorkers> will create a temporary file.
@@ -14,7 +14,7 @@ use Class::Accessor::Lite (
     rw => [ qw/max_workers spawn_interval err_respawn_interval trap_signals signal_received manager_pid on_child_reap before_fork after_fork/ ],
 );
 
-our $VERSION = '0.14';
+our $VERSION = '0.17';
 
 sub new {
     my $klass = shift;
@@ -189,15 +189,37 @@ sub _action_for {
 }
 
 sub wait_all_children {
-    my $self = shift;
+    my ($self, $timeout) = @_;
     $self->{_no_adjust_until} = undef;
-    while (%{$self->{worker_pids}}) {
-        if (my ($pid) = $self->_wait(1)) {
-            if (delete $self->{worker_pids}{$pid}) {
-                $self->_on_child_reap($pid, $?);
+
+    my $wait_loop = sub {
+        while (%{$self->{worker_pids}}) {
+            if (my ($pid) = $self->_wait(1)) {
+                if (delete $self->{worker_pids}{$pid}) {
+                    $self->_on_child_reap($pid, $?);
+                }
             }
         }
+    };
+
+    if ($timeout) {
+        local $@;
+        my $is_timeout = 0;
+        eval {
+            local $SIG{ALRM} = sub {
+                $is_timeout = 1;
+                die "timeout";
+            };
+            alarm($timeout);
+            $wait_loop->();
+            alarm(0);
+        };
+        die $@
+            if $@ && ! $is_timeout;
+    } else {
+        $wait_loop->();
     }
+    return $self->num_workers;
 }
 
 sub _update_spawn_delay {
@@ -220,6 +242,7 @@ sub _wait {
         my $sleep_secs = min grep { defined $_ } (
             $delayed_task_sleep,
             $delayed_fork_sleep,
+            $self->_max_wait(),
         );
         if (defined $sleep_secs) {
             # wait max sleep_secs or until signalled
@@ -236,6 +259,10 @@ sub _wait {
     }
 }
 
+sub _max_wait {
+    return undef;
+}
+
 1;
 
 __END__
@@ -321,9 +348,12 @@ Child processes (when executed by a zero-argument call to C<start>) should call
 
 Sends signal to all worker processes.  Only usable from manager process.
 
-=head2 wait_all_children
+=head2 wait_all_children()
+
+=head2 wait_all_children($timeout)
 
-Blocks until all worker processes exit.  Only usable from manager process.
+Waits until all worker processes exit or timeout (given as an optional argument in seconds) exceeds.
+The method returns the number of the worker processes still running.
 
 =head1 AUTHOR
 
@@ -2,7 +2,6 @@ use strict;
 use warnings;
 
 use File::Temp qw();
-use Time::HiRes qw(sleep);
 use Test::More tests => 8;
 
 use_ok('Parallel::Prefork::SpareWorkers');
@@ -21,31 +20,37 @@ my $pm = Parallel::Prefork::SpareWorkers->new({
 is $pm->num_active_workers, 0, 'no active workers';
 
 my @tests = (
-    wait_and_test(
-        sub {
-            is $pm->num_workers, 3, 'min_spare_workers';
-            is $pm->num_active_workers, 0, 'no active workers';
-            open my $fh, '>', "$tempdir/active"
-                    or die "failed to touch file $tempdir/active:$!";
-            close $fh;
-        },
-    ),
-    wait_and_test(
-        sub {
-            is $pm->num_workers, 10, 'max_workers';
-            is $pm->num_active_workers, 10, 'all workers active';
-            unlink "$tempdir/active"
-                or die "failed to unlink file $tempdir/active:$!";
-        },
-    ),
-    wait_and_test(
-        sub {
-            is $pm->num_workers, 5, 'max_spare_workers';
-            is $pm->num_active_workers, 0, 'no active workers';
-        },
-    ),
+    sub {
+        is $pm->num_workers, 3, 'min_spare_workers';
+        is $pm->num_active_workers, 0, 'no active workers';
+        open my $fh, '>', "$tempdir/active"
+                or die "failed to touch file $tempdir/active:$!";
+        close $fh;
+    },
+    sub {
+        is $pm->num_workers, 10, 'max_workers';
+        is $pm->num_active_workers, 10, 'all workers active';
+        unlink "$tempdir/active"
+            or die "failed to unlink file $tempdir/active:$!";
+    },
+    sub {
+        is $pm->num_workers, 5, 'max_spare_workers';
+        is $pm->num_active_workers, 0, 'no active workers';
+    },
 );
-next_test();
+
+my $SLEEP_SECS = 3; # 1 second until all clients update their state, plus 10 invocations to min/max the process, plus 1 second bonus
+
+$SIG{ALRM} = sub {
+    my $test = shift @tests;
+    $test->();
+    if (@tests) {
+	alarm $SLEEP_SECS;
+    } else {
+        $pm->signal_received('TERM');
+    }
+};
+alarm $SLEEP_SECS;
 
 while ($pm->signal_received ne 'TERM') {
     $pm->start and next;
@@ -59,24 +64,3 @@ while ($pm->signal_received ne 'TERM') {
 }
 
 $pm->wait_all_children;
-
-sub wait_and_test {
-    my $check_func = shift;
-    my $cnt = 0;
-    return sub {
-        sleep 0.1;
-        $cnt++;
-        return if $cnt < 30; # 1 second until all clients update their state, plus 10 invocations to min/max the process, plus 1 second bonus
-        $check_func->();
-        next_test();
-    };
-}
-
-sub next_test {
-    if (@tests) {
-        $pm->{__dbg_callback} = shift @tests;
-    } else {
-        $pm->{__dbg_callback} = sub {};
-        $pm->signal_received('TERM');
-    }
-}