###############################################################################
## ----------------------------------------------------------------------------
## Core methods for the manager process.
##
## This package provides the loop and relevant methods used internally by the
## manager process.
##
## There is no public API.
##
###############################################################################
package MCE::Core::Manager;
use strict;
use warnings;
our $VERSION = '1.831';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (TestingAndDebugging::ProhibitNoStrict)
## Items below are folded into MCE.
package # hide from rpm
MCE;
no warnings qw( threads recursion uninitialized );
## The POSIX module has many symbols. Try not loading it simply
## to have WNOHANG. The following covers most platforms.
use constant {
_WNOHANG => ( $INC{'POSIX.pm'} )
? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
};
use bytes;
###############################################################################
## ----------------------------------------------------------------------------
## Call on task_end after task completion.
##
###############################################################################
sub _task_end {
my ($self, $_task_id) = @_;
@_ = ();
if (defined $self->{user_tasks}) {
my $_task_end = (exists $self->{user_tasks}->[$_task_id]->{task_end})
? $self->{user_tasks}->[$_task_id]->{task_end}
: $self->{task_end};
if (defined $_task_end) {
my $_task_name = (exists $self->{user_tasks}->[$_task_id]->{task_name})
? $self->{user_tasks}->[$_task_id]->{task_name}
: $self->{task_name};
$_task_end->($self, $_task_id, $_task_name);
}
}
return;
}
###############################################################################
## ----------------------------------------------------------------------------
## Process output.
##
## Awaits and processes events from workers. The sendto/do methods tag the
## output accordingly. The hash structure below is key-driven.
##
###############################################################################
sub _output_loop {
my ( $self, $_input_data, $_input_glob, $_plugin_function,
$_plugin_loop_begin, $_plugin_loop_end ) = @_;
@_ = ();
my (
$_aborted, $_eof_flag, $_max_retries, $_syn_flag, %_sendto_fhs,
$_cb, $_chunk_id, $_chunk_size, $_fd, $_file, $_flush_file, $_wa,
@_is_c_ref, @_is_h_ref, @_is_q_ref, $_on_post_exit, $_on_post_run,
$_has_user_tasks, $_sess_dir, $_task_id, $_user_error, $_user_output,
$_input_size, $_offset_pos, $_single_dim, @_gather, $_cs_one_flag,
$_exit_id, $_exit_pid, $_exit_status, $_exit_wid, $_len, $_sync_cnt,
$_BSB_W_SOCK, $_BSE_W_SOCK, $_DAT_R_SOCK, $_DAU_R_SOCK, $_MCE_STDERR,
$_I_FLG, $_O_FLG, $_I_SEP, $_O_SEP, $_RS, $_RS_FLG, $_MCE_STDOUT,
@_delay_wid, $_size_completed, $_win32_ipc
);
## -------------------------------------------------------------------------
## Callback return.
my $_cb_reply = sub {
local $\ = $_O_SEP if ($_O_FLG);
local $/ = $_I_SEP if ($_I_FLG);
no strict 'refs';
if ( $_wa == WANTS_UNDEF ) {
$_cb->(@_);
return;
}
elsif ( $_wa == WANTS_ARRAY ) {
my @_ret = $_cb->(@_);
my $_buf = $self->{freeze}(\@_ret);
return print {$_DAU_R_SOCK} length($_buf).'1'.$LF, $_buf;
}
my $_ret = $_cb->(@_);
return print {$_DAU_R_SOCK} length($_ret).'0'.$LF, $_ret
if ( !ref $_ret && defined $_ret && !looks_like_number $_ret );
my $_buf = $self->{freeze}([ $_ret ]);
return print {$_DAU_R_SOCK} length($_buf).'1'.$LF, $_buf;
};
## -------------------------------------------------------------------------
## Create hash structure containing various output functions.
my %_core_output_function = (
OUTPUT_W_ABT.$LF => sub { # Worker has aborted
$_aborted = 1;
return;
},
OUTPUT_W_DNE.$LF => sub { # Worker has completed
chomp($_task_id = <$_DAU_R_SOCK>);
$self->{_total_running} -= 1;
if ($_has_user_tasks && $_task_id >= 0) {
$self->{_task}->[$_task_id]->{_total_running} -= 1;
}
my $_total_running = ($_has_user_tasks)
? $self->{_task}->[$_task_id]->{_total_running}
: $self->{_total_running};
if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) {
if ($_sync_cnt == $_total_running) {
for my $_i (1 .. $_total_running) {
1 until syswrite($_BSB_W_SOCK, $LF) || ($! && !$!{'EINTR'});
}
undef $_syn_flag;
}
}
_task_end($self, $_task_id) unless ($_total_running);
return;
},
## ----------------------------------------------------------------------
OUTPUT_W_EXT.$LF => sub { # Worker has exited
chomp($_task_id = <$_DAU_R_SOCK>);
$self->{_total_exited} += 1;
$self->{_total_running} -= 1;
$self->{_total_workers} -= 1;
if ($_has_user_tasks && $_task_id >= 0) {
$self->{_task}->[$_task_id]->{_total_running} -= 1;
$self->{_task}->[$_task_id]->{_total_workers} -= 1;
}
my $_total_running = ($_has_user_tasks)
? $self->{_task}->[$_task_id]->{_total_running}
: $self->{_total_running};
if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) {
if ($_sync_cnt == $_total_running) {
for my $_i (1 .. $_total_running) {
1 until syswrite($_BSB_W_SOCK, $LF) || ($! && !$!{'EINTR'});
}
undef $_syn_flag;
}
}
my ($_exit_msg, $_retry_buf) = ('', '');
chomp($_exit_wid = <$_DAU_R_SOCK>),
chomp($_exit_pid = <$_DAU_R_SOCK>),
chomp($_exit_status = <$_DAU_R_SOCK>),
chomp($_exit_id = <$_DAU_R_SOCK>),
chomp($_len = <$_DAU_R_SOCK>);
read($_DAU_R_SOCK, $_exit_msg, $_len) if ($_len);
chomp($_len = <$_DAU_R_SOCK>);
read($_DAU_R_SOCK, $_retry_buf, $_len) if ($_len);
if (abs($_exit_status) > abs($self->{_wrk_status})) {
$self->{_wrk_status} = $_exit_status;
}
## Reap child/thread. Note: Win32 uses negative PIDs.
if ($_exit_pid =~ /^PID_(-?\d+)/) {
my $_pid = $1; my $_list = $self->{_pids};
for my $i (0 .. @{ $_list }) {
if ($_list->[$i] && $_list->[$i] == $_pid) {
waitpid $_pid, 0;
$self->{_pids}->[$i] = undef;
last;
}
}
}
elsif ($_exit_pid =~ /^TID_(\d+)/) {
my $_tid = $1; my $_list = $self->{_tids};
for my $i (0 .. @{ $_list }) {
if ($_list->[$i] && $_list->[$i] == $_tid) {
$self->{_thrs}->[$i]->join();
$self->{_thrs}->[$i] = undef;
$self->{_tids}->[$i] = undef;
last;
}
}
}
## Call on_post_exit callback if defined. Otherwise, append status
## information if on_post_run is defined for later retrieval.
if (defined $_on_post_exit) {
$self->{_exited_wid} = $_exit_wid;
if (length($_retry_buf)) {
$self->{_retry} = $self->{thaw}($_retry_buf);
my $_retry_cnt = $_max_retries - $self->{_retry}[2] - 1;
$_on_post_exit->($self, {
wid => $_exit_wid, pid => $_exit_pid, status => $_exit_status,
msg => $_exit_msg, id => $_exit_id
}, $_retry_cnt);
delete $self->{_retry};
}
else {
$_on_post_exit->($self, {
wid => $_exit_wid, pid => $_exit_pid, status => $_exit_status,
msg => $_exit_msg, id => $_exit_id
}, $_max_retries || 0 );
}
delete $self->{_exited_wid};
}
elsif (defined $_on_post_run) {
push @{ $self->{_status} }, {
wid => $_exit_wid, pid => $_exit_pid, status => $_exit_status,
msg => $_exit_msg, id => $_exit_id
};
}
_task_end($self, $_task_id) unless ($_total_running);
return;
},
## ----------------------------------------------------------------------
OUTPUT_A_REF.$LF => sub { # Input << Array ref
my $_buf;
if ($_offset_pos >= $_input_size || $_aborted) {
local $\ = undef if (defined $\);
print {$_DAU_R_SOCK} '0'.$LF;
return;
}
if ($_single_dim && $_cs_one_flag) {
$_buf = $_input_data->[$_offset_pos];
}
else {
if ($_offset_pos + $_chunk_size - 1 < $_input_size) {
$_buf = $self->{freeze}( [ @{ $_input_data }[
$_offset_pos .. $_offset_pos + $_chunk_size - 1
] ] );
}
else {
$_buf = $self->{freeze}( [ @{ $_input_data }[
$_offset_pos .. $_input_size - 1
] ] );
}
}
$_len = length $_buf; local $\ = undef if (defined $\);
print {$_DAU_R_SOCK} $_len.$LF . (++$_chunk_id).$LF, $_buf;
$_offset_pos += $_chunk_size;
return;
},
OUTPUT_G_REF.$LF => sub { # Input << Glob ref
my $_buf = '';
## The logic below honors ('Ctrl/Z' in Windows, 'Ctrl/D' in Unix)
## when reading from standard input. No output will be lost as
## far as what was previously read into the buffer.
if ($_eof_flag || $_aborted) {
local $\ = undef if (defined $\);
print {$_DAU_R_SOCK} '0'.$LF;
return;
}
{
local $/ = $_RS if ($_RS_FLG);
if ($_chunk_size <= MAX_RECS_SIZE) {
if ($_chunk_size == 1) {
$_buf = <$_input_glob>;
$_eof_flag = 1 unless (length $_buf);
}
else {
my $_last_len = 0;
for (1 .. $_chunk_size) {
$_buf .= <$_input_glob>;
$_len = length $_buf;
if ($_len == $_last_len) {
$_eof_flag = 1;
last;
}
$_last_len = $_len;
}
}
}
else {
if (read($_input_glob, $_buf, $_chunk_size) == $_chunk_size) {
$_buf .= <$_input_glob>;
$_eof_flag = 1 if (length $_buf == $_chunk_size);
}
else {
$_eof_flag = 1;
}
}
}
$_len = length $_buf; local $\ = undef if (defined $\);
if ($_len) {
print {$_DAU_R_SOCK} $_len.$LF . (++$_chunk_id).$LF, $_buf;
} else {
print {$_DAU_R_SOCK} '0'.$LF;
}
return;
},
OUTPUT_H_REF.$LF => sub { # Input << Hash ref
my @_pairs;
if ($_offset_pos >= $_input_size || $_aborted) {
local $\ = undef if (defined $\);
print {$_DAU_R_SOCK} '0'.$LF;
return;
}
if ($_offset_pos + $_chunk_size - 1 < $_input_size) {
for my $_i ($_offset_pos .. $_offset_pos + $_chunk_size - 1) {
push @_pairs, each %{ $_input_data };
}
}
else {
for my $_i ($_offset_pos .. $_input_size - 1) {
push @_pairs, each %{ $_input_data };
}
}
my $_buf = $self->{freeze}(\@_pairs);
$_len = length $_buf; local $\ = undef if (defined $\);
print {$_DAU_R_SOCK} $_len.$LF . (++$_chunk_id).$LF, $_buf;
$_offset_pos += $_chunk_size;
return;
},
OUTPUT_I_REF.$LF => sub { # Input << Iter ref
my $_buf;
if ($_aborted) {
local $\ = undef if (defined $\);
print {$_DAU_R_SOCK} '-1'.$LF;
return;
}
my @_ret_a = $_input_data->($_chunk_size);
if (scalar @_ret_a > 1 || ref $_ret_a[0]) {
$_buf = $self->{freeze}([ @_ret_a ]);
$_len = length $_buf; local $\ = undef if (defined $\);
print {$_DAU_R_SOCK} $_len.'1'.$LF . (++$_chunk_id).$LF, $_buf;
return;
}
elsif (defined $_ret_a[0]) {
$_len = length $_ret_a[0]; local $\ = undef if (defined $\);
print {$_DAU_R_SOCK} $_len.'0'.$LF . (++$_chunk_id).$LF, $_ret_a[0];
return;
}
local $\ = undef if (defined $\);
print {$_DAU_R_SOCK} '-1'.$LF;
$_aborted = 1;
return;
},
## ----------------------------------------------------------------------
OUTPUT_A_CBK.$LF => sub { # Callback w/ multiple args
chomp($_wa = <$_DAU_R_SOCK>),
chomp($_cb = <$_DAU_R_SOCK>),
chomp($_len = <$_DAU_R_SOCK>);
read $_DAU_R_SOCK, my($_buf), $_len;
my $_data_ref = $self->{thaw}($_buf); undef $_buf;
return $_cb_reply->(@{ $_data_ref });
},
OUTPUT_S_CBK.$LF => sub { # Callback w/ 1 scalar arg
chomp($_wa = <$_DAU_R_SOCK>),
chomp($_cb = <$_DAU_R_SOCK>),
chomp($_len = <$_DAU_R_SOCK>);
read $_DAU_R_SOCK, my($_buf), $_len;
return $_cb_reply->($_buf);
},
OUTPUT_N_CBK.$LF => sub { # Callback w/ no args
chomp($_wa = <$_DAU_R_SOCK>),
chomp($_cb = <$_DAU_R_SOCK>);
return $_cb_reply->();
},
## ----------------------------------------------------------------------
OUTPUT_A_GTR.$LF => sub { # Gather array/ref
chomp($_task_id = <$_DAU_R_SOCK>),
chomp($_len = <$_DAU_R_SOCK>);
read $_DAU_R_SOCK, my($_buf), $_len;
if ($_is_c_ref[$_task_id]) {
local $_ = $self->{thaw}($_buf);
$_gather[$_task_id]->(@{ $_ });
}
elsif ($_is_h_ref[$_task_id]) {
local $_ = $self->{thaw}($_buf);
while (1) {
my $_key = shift @{ $_ }; my $_val = shift @{ $_ };
$_gather[$_task_id]->{$_key} = $_val;
last unless (@{ $_ });
}
}
elsif ($_is_q_ref[$_task_id]) {
$_gather[$_task_id]->enqueue(@{ $self->{thaw}($_buf) });
}
else {
push @{ $_gather[$_task_id] }, @{ $self->{thaw}($_buf) };
}
return;
},
OUTPUT_S_GTR.$LF => sub { # Gather scalar
local $_;
chomp($_task_id = <$_DAU_R_SOCK>),
chomp($_len = <$_DAU_R_SOCK>);
read $_DAU_R_SOCK, $_, $_len if ($_len >= 0);
if ($_is_c_ref[$_task_id]) {
$_gather[$_task_id]->($_);
}
elsif ($_is_h_ref[$_task_id]) {
$_gather[$_task_id]->{$_} = undef;
}
elsif ($_is_q_ref[$_task_id]) {
$_gather[$_task_id]->enqueue($_);
}
else {
push @{ $_gather[$_task_id] }, $_;
}
return;
},
## ----------------------------------------------------------------------
OUTPUT_O_SND.$LF => sub { # Send >> STDOUT
chomp($_len = <$_DAU_R_SOCK>);
read $_DAU_R_SOCK, my($_buf), $_len;
if (defined $_user_output) {
$_user_output->($_buf);
} else {
print {$_MCE_STDOUT} $_buf;
}
return;
},
OUTPUT_E_SND.$LF => sub { # Send >> STDERR
chomp($_len = <$_DAU_R_SOCK>);
read $_DAU_R_SOCK, my($_buf), $_len;
if (defined $_user_error) {
$_user_error->($_buf);
} else {
print {$_MCE_STDERR} $_buf;
}
return;
},
OUTPUT_F_SND.$LF => sub { # Send >> File
my ($_buf, $_OUT_FILE);
chomp($_file = <$_DAU_R_SOCK>),
chomp($_len = <$_DAU_R_SOCK>);
read $_DAU_R_SOCK, $_buf, $_len;
unless (exists $_sendto_fhs{$_file}) {
open $_sendto_fhs{$_file}, '>>', $_file
or _croak "Cannot open file for writing ($_file): $!";
binmode $_sendto_fhs{$_file};
## Select new FH, turn on autoflush, restore the old FH.
if ($_flush_file) {
local $!;
# IO::Handle->autoflush not available in older Perl.
select(( select($_sendto_fhs{$_file}), $| = 1 )[0]);
}
}
$_OUT_FILE = $_sendto_fhs{$_file};
print {$_OUT_FILE} $_buf;
return;
},
OUTPUT_D_SND.$LF => sub { # Send >> File descriptor
my ($_buf, $_OUT_FILE);
chomp($_fd = <$_DAU_R_SOCK>),
chomp($_len = <$_DAU_R_SOCK>);
read $_DAU_R_SOCK, $_buf, $_len;
unless (exists $_sendto_fhs{$_fd}) {
require IO::Handle unless $INC{'IO/Handle.pm'};
$_sendto_fhs{$_fd} = IO::Handle->new();
$_sendto_fhs{$_fd}->fdopen($_fd, 'w')
or _croak "Cannot open file descriptor ($_fd): $!";
binmode $_sendto_fhs{$_fd};
## Select new FH, turn on autoflush, restore the old FH.
if ($_flush_file) {
local $!;
# IO::Handle->autoflush not available in older Perl.
select(( select($_sendto_fhs{$_fd}), $| = 1 )[0]);
}
}
$_OUT_FILE = $_sendto_fhs{$_fd};
print {$_OUT_FILE} $_buf;
return;
},
## ----------------------------------------------------------------------
OUTPUT_B_SYN.$LF => sub { # Barrier sync - begin
if (!defined $_sync_cnt || $_sync_cnt == 0) {
$_syn_flag = 1, $_sync_cnt = 0;
}
my $_total_running = ($_has_user_tasks)
? $self->{_task}->[0]->{_total_running}
: $self->{_total_running};
if (++$_sync_cnt == $_total_running) {
for my $_i (1 .. $_total_running) {
1 until syswrite($_BSB_W_SOCK, $LF) || ($! && !$!{'EINTR'});
}
undef $_syn_flag;
}
return;
},
OUTPUT_E_SYN.$LF => sub { # Barrier sync - end
if (--$_sync_cnt == 0) {
my $_total_running = ($_has_user_tasks)
? $self->{_task}->[0]->{_total_running}
: $self->{_total_running};
for my $_i (1 .. $_total_running) {
1 until syswrite($_BSE_W_SOCK, $LF) || ($! && !$!{'EINTR'});
}
}
return;
},
OUTPUT_S_IPC.$LF => sub { # Change to win32 IPC
1 until syswrite($_DAT_R_SOCK, $LF) || ($! && !$!{'EINTR'});
$_win32_ipc = 1, goto _LOOP unless $_win32_ipc;
return;
},
OUTPUT_P_NFY.$LF => sub { # Progress notification
chomp($_len = <$_DAU_R_SOCK>);
$self->{progress}->( $_size_completed += $_len );
return;
},
OUTPUT_S_DIR.$LF => sub { # Make/get sess_dir
print {$_DAU_R_SOCK} $self->sess_dir().$LF;
return;
},
OUTPUT_T_DIR.$LF => sub { # Make/get tmp_dir
print {$_DAU_R_SOCK} $self->tmp_dir().$LF;
return;
},
OUTPUT_I_DLY.$LF => sub { # Interval delay
my $_tasks = $_has_user_tasks ? $self->{user_tasks} : undef;
chomp($_task_id = <$_DAU_R_SOCK>);
my $_interval = ($_tasks && exists $_tasks->[$_task_id]{interval})
? $_tasks->[$_task_id]{interval}
: $self->{interval};
if ( $_interval ) {
my $_max_workers = ($_tasks)
? $_tasks->[$_task_id]{max_workers}
: $self->{max_workers};
$_delay_wid[$_task_id] = 1
if (++$_delay_wid[$_task_id] > $_max_workers);
my $_nodes = $_interval->{max_nodes};
my $_id = $_interval->{node_id};
my $_delay = $_interval->{delay} * $_nodes;
my $_app_tb = $_delay * $_max_workers;
my $_app_st = $_interval->{_time} + ($_delay / $_nodes * $_id);
my $_wrk_st = ($_delay_wid[$_task_id] - 1) * $_delay + $_app_st;
$_delay = $_wrk_st - time;
if ($_delay < 0.0 && $_app_tb) {
my $_count = int($_delay * -1 / $_app_tb + 0.5) + 1;
$_delay += $_app_tb * $_count;
$_interval->{_time} = time if ($_count > 2e9);
}
($_delay > 0.0)
? print {$_DAU_R_SOCK} $_delay.$LF
: print {$_DAU_R_SOCK} '0'.$LF;
}
else {
print {$_DAU_R_SOCK} '0'.$LF;
}
return;
},
);
## -------------------------------------------------------------------------
local ($!, $?, $_);
$_aborted = $_chunk_id = $_eof_flag = $_size_completed = 0;
$_has_user_tasks = (defined $self->{user_tasks}) ? 1 : 0;
$_cs_one_flag = ($self->{chunk_size} == 1) ? 1 : 0;
$_max_retries = $self->{max_retries} || 0;
$_on_post_exit = $self->{on_post_exit};
$_on_post_run = $self->{on_post_run};
$_chunk_size = $self->{chunk_size};
$_flush_file = $self->{flush_file};
$_user_output = $self->{user_output};
$_user_error = $self->{user_error};
$_single_dim = $self->{_single_dim};
$_sess_dir = $self->{_sess_dir};
if ($_max_retries && !$_on_post_exit) {
$_on_post_exit = sub {
my ($self, $_e, $_retry_cnt) = @_;
my ($_cnt, $_msg) = ($_retry_cnt + 1, "Error: chunk $_e->{id} failed");
($_retry_cnt < $_max_retries)
? print {*STDERR} "$_msg, retrying chunk attempt # ${_cnt}\n"
: print {*STDERR} "$_msg\n";
$self->restart_worker;
};
}
if ($_has_user_tasks && $self->{user_tasks}->[0]->{chunk_size}) {
$_chunk_size = $self->{user_tasks}->[0]->{chunk_size};
}
if ($_has_user_tasks) {
for my $_i (0 .. @{ $self->{user_tasks} } - 1) {
$_gather[$_i] = (defined $self->{user_tasks}->[$_i]->{gather})
? $self->{user_tasks}->[$_i]->{gather} : $self->{gather};
$_is_c_ref[$_i] = ( ref $_gather[$_i] eq 'CODE' ) ? 1 : 0;
$_is_h_ref[$_i] = ( ref $_gather[$_i] eq 'HASH' ) ? 1 : 0;
$_is_q_ref[$_i] = (
ref $_gather[$_i] eq 'MCE::Queue' ||
ref $_gather[$_i] eq 'Thread::Queue' ) ? 1 : 0;
}
}
if (defined $self->{gather}) {
$_gather[0] = $self->{gather};
$_is_c_ref[0] = ( ref $_gather[0] eq 'CODE' ) ? 1 : 0;
$_is_h_ref[0] = ( ref $_gather[0] eq 'HASH' ) ? 1 : 0;
$_is_q_ref[0] = (
ref $_gather[0] eq 'MCE::Queue' ||
ref $_gather[0] eq 'Thread::Queue' ) ? 1 : 0;
}
if (defined $_input_data && ref $_input_data eq 'ARRAY') {
$_input_size = @{ $_input_data };
$_offset_pos = 0;
}
elsif (defined $_input_data && ref $_input_data eq 'HASH') {
$_input_size = scalar( keys %{ $_input_data } );
$_offset_pos = 0;
}
else {
$_input_size = $_offset_pos = 0;
}
## Set STDOUT/STDERR to user parameters.
if (defined $self->{stdout_file}) {
open $_MCE_STDOUT, '>>', $self->{stdout_file}
or die $self->{stdout_file} . ": $!\n";
binmode $_MCE_STDOUT;
}
else {
$_MCE_STDOUT = \*STDOUT;
binmode $_MCE_STDOUT;
}
if (defined $self->{stderr_file}) {
open $_MCE_STDERR, '>>', $self->{stderr_file}
or die $self->{stderr_file} . ": $!\n";
binmode $_MCE_STDERR;
}
else {
$_MCE_STDERR = \*STDERR;
binmode $_MCE_STDERR;
}
## Autoflush STDERR-STDOUT handles if requested.
## Make MCE_STDOUT the default handle.
my $_old_hndl = select $_MCE_STDOUT;
{
local $!;
# IO::Handle->autoflush not available in older Perl.
select($_MCE_STDERR), $| = 1 if ($self->{flush_stderr});
select($_MCE_STDOUT), $| = 1 if ($self->{flush_stdout});
select($_MCE_STDOUT);
}
## -------------------------------------------------------------------------
## Output event loop.
my $_func; my $_channels = $self->{_dat_r_sock};
$_win32_ipc = ( $ENV{'PERL_MCE_IPC'} eq 'win32' || $INC{'MCE/Hobo.pm'} );
$_BSB_W_SOCK = $self->{_bsb_w_sock};
$_BSE_W_SOCK = $self->{_bse_w_sock};
$_DAT_R_SOCK = $self->{_dat_r_sock}->[0];
$_RS = $self->{RS} || $/;
$_O_SEP = $\; local $\ = undef;
$_I_SEP = $/; local $/ = $LF;
$_RS_FLG = (!$_RS || $_RS ne $LF) ? 1 : 0;
$_O_FLG = (defined $_O_SEP) ? 1 : 0;
$_I_FLG = (!$_I_SEP || $_I_SEP ne $LF) ? 1 : 0;
## Call module's loop_begin routine for modules plugged into MCE.
for my $_p (@{ $_plugin_loop_begin }) {
$_p->($self, \$_DAU_R_SOCK);
}
## Wait on requests *with* timeout capability. Exit loop when all workers
## have completed processing or exited prematurely.
_LOOP:
if ($self->{loop_timeout} && @{ $self->{_tids} } == 0 && $^O ne 'MSWin32') {
my ($_list, $_timeout) = ($self->{_pids}, $self->{loop_timeout});
my ($_DAT_W_SOCK, $_pid) = ($self->{_dat_w_sock}->[0]);
$_timeout = 5 if $_timeout < 5;
local $SIG{ALRM} = sub {
alarm 0;
for my $i (0 .. @{ $_list }) {
if ($_pid = $_list->[$i]) {
if (waitpid($_pid, _WNOHANG)) {
$self->{_total_exited} += 1;
$self->{_total_running} -= 1;
$self->{_total_workers} -= 1;
$_list->[$i] = undef;
}
}
}
print {$_DAT_W_SOCK} 'NOOP'.$LF . '0'.$LF;
};
while ( $self->{_total_running} ) {
alarm $_timeout;
$_func = <$_DAT_R_SOCK>;
$_DAU_R_SOCK = $_channels->[ <$_DAT_R_SOCK> ];
alarm 0;
if (exists $_core_output_function{$_func}) {
$_core_output_function{$_func}();
} elsif (exists $_plugin_function->{$_func}) {
$_plugin_function->{$_func}();
}
}
}
## Wait on requests *without* timeout capability.
elsif ($^O eq 'MSWin32' && $_win32_ipc) {
# The normal loop hangs on Windows when processes/threads start/exit.
# Using ioctl() properly, http://www.perlmonks.org/?node_id=780083
my $_val_bytes = "\x00\x00\x00\x00";
my $_ptr_bytes = unpack( 'I', pack('P', $_val_bytes) );
my ($_done, $_count, $_nbytes, $_start) = (0);
while (!$_done) {
$_start = time, $_count = 1;
# MSWin32 FIONREAD
IOCTL: ioctl($_DAT_R_SOCK, 0x4004667f, $_ptr_bytes);
unless ($_nbytes = unpack('I', $_val_bytes)) {
if ($_count) {
# delay after a while to not consume a CPU core
$_count = 0 if ++$_count % 50 == 0 && time - $_start > 0.030;
} else {
sleep 0.030;
}
goto IOCTL;
}
do {
sysread($_DAT_R_SOCK, $_func, 8);
$_done = 1, last() unless length($_func) == 8;
$_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ];
if (exists $_core_output_function{$_func}) {
$_core_output_function{$_func}();
} elsif (exists $_plugin_function->{$_func}) {
$_plugin_function->{$_func}();
}
} while (($_nbytes -= 8) >= 8);
last unless $self->{_total_running};
}
}
elsif ($^O eq 'MSWin32') {
while ($self->{_total_running}) {
sysread($_DAT_R_SOCK, $_func, 8);
last() unless length($_func) == 8;
$_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ];
if (exists $_core_output_function{$_func}) {
$_core_output_function{$_func}();
} elsif (exists $_plugin_function->{$_func}) {
$_plugin_function->{$_func}();
}
}
}
else {
while ($self->{_total_running}) {
$_func = <$_DAT_R_SOCK>;
last() unless length($_func) == 6;
$_DAU_R_SOCK = $_channels->[ <$_DAT_R_SOCK> ];
if (exists $_core_output_function{$_func}) {
$_core_output_function{$_func}();
} elsif (exists $_plugin_function->{$_func}) {
$_plugin_function->{$_func}();
}
}
}
## Call module's loop_end routine for modules plugged into MCE.
for my $_p (@{ $_plugin_loop_end }) {
$_p->($self);
}
## Call on_post_run callback.
$_on_post_run->($self, $self->{_status}) if (defined $_on_post_run);
## Close opened sendto file handles.
for my $_p (keys %_sendto_fhs) {
close $_sendto_fhs{$_p};
undef $_sendto_fhs{$_p};
delete $_sendto_fhs{$_p};
}
## Restore the default handle. Close MCE STDOUT/STDERR handles.
select $_old_hndl;
eval q{
close $_MCE_STDOUT if (fileno $_MCE_STDOUT > 2);
close $_MCE_STDERR if (fileno $_MCE_STDERR > 2);
};
return;
}
1;