###############################################################################
## ----------------------------------------------------------------------------
## MCE - Many-Core Engine for Perl providing parallel processing capabilities.
##
###############################################################################
package MCE;
use strict;
use warnings;
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (TestingAndDebugging::ProhibitNoStrict)
BEGIN {
## Forking is emulated under the Windows enviornment (excluding Cygwin).
## MCE 1.514+ will load the 'threads' module by default on Windows.
## Folks may specify use_threads => 0 if threads is not desired.
if ($^O eq 'MSWin32' && !defined $threads::VERSION) {
local $@; local $SIG{__DIE__} = \&_NOOP;
eval 'use threads; use threads::shared';
}
}
use Fcntl qw( :flock O_RDONLY );
use Socket qw( :crlf PF_UNIX PF_UNSPEC SOCK_STREAM );
use Symbol qw( qualify_to_ref );
use Storable qw( );
use Scalar::Util qw( looks_like_number );
use Time::HiRes qw( sleep time );
use MCE::Signal;
use bytes;
our $VERSION = '1.521';
our (%_valid_fields_new, %_params_allowed_args, %_valid_fields_task);
our ($_is_cygwin, $_is_mswin32, $_is_winenv);
our ($_que_read_size, $_que_template);
our $MCE; my $_prev_mce;
BEGIN {
## Configure pack/unpack template for writing to and reading from
## the queue. Each entry contains 2 positive numbers: chunk_id & msg_id.
## Attempt 64-bit size, otherwize fall back to host machine's word length.
{
local $@; local $SIG{__DIE__} = \&_NOOP;
eval { $_que_read_size = length pack('Q2', 0, 0); };
$_que_template = ($@) ? 'I2' : 'Q2';
$_que_read_size = length pack($_que_template, 0, 0);
}
## ** Attributes which are used internally.
## _abort_msg _chn _com_lock _dat_lock _i_app_st _i_app_tb _i_wrk_st _wuf
## _chunk_id _mce_sid _mce_tid _pids _run_mode _single_dim _thrs _tids _wid
## _exiting _exit_pid _total_exited _total_running _total_workers _task_wid
## _send_cnt _sess_dir _spawned _state _status _task _task_id _wrk_status
## _last_sref _init_total_workers
##
## _bsb_r_sock _bsb_w_sock _bse_r_sock _bse_w_sock _com_r_sock _com_w_sock
## _dat_r_sock _dat_w_sock _que_r_sock _que_w_sock _data_channels _lock_chn
%_valid_fields_new = map { $_ => 1 } qw(
max_workers tmp_dir use_threads user_tasks task_end task_name freeze thaw
chunk_size input_data sequence job_delay spawn_delay submit_delay RS
flush_file flush_stderr flush_stdout stderr_file stdout_file use_slurpio
interval user_args user_begin user_end user_func user_error user_output
bounds_only gather on_post_exit on_post_run parallel_io
);
%_params_allowed_args = map { $_ => 1 } qw(
chunk_size input_data sequence job_delay spawn_delay submit_delay RS
flush_file flush_stderr flush_stdout stderr_file stdout_file use_slurpio
interval user_args user_begin user_end user_func user_error user_output
bounds_only gather on_post_exit on_post_run parallel_io
);
%_valid_fields_task = map { $_ => 1 } qw(
max_workers chunk_size input_data interval sequence task_end task_name
bounds_only gather user_args user_begin user_end user_func use_threads
RS use_slurpio parallel_io
);
$_is_cygwin = ($^O eq 'cygwin');
$_is_mswin32 = ($^O eq 'MSWin32');
$_is_winenv = ($_is_cygwin || $_is_mswin32);
## Create accessor functions.
no strict 'refs'; no warnings 'redefine';
foreach my $_id (qw( chunk_size max_workers task_name tmp_dir user_args )) {
*{ $_id } = sub () {
my $x = shift; my $self = ref($x) ? $x : $MCE;
return $self->{$_id};
};
}
foreach my $_id (qw( chunk_id sess_dir task_id task_wid wid )) {
*{ $_id } = sub () {
my $x = shift; my $self = ref($x) ? $x : $MCE;
return $self->{"_$_id"};
};
}
foreach my $_id (qw( freeze thaw )) {
*{ $_id } = sub () {
my $x = shift; my $self = ref($x) ? $x : $MCE;
return $self->{$_id}(@_);
};
}
## PDL + MCE (spawning as threads) is not stable. Thanks to David Mertens
## for reporting on how he fixed it for his PDL::Parallel::threads module.
sub PDL::CLONE_SKIP { return 1; }
return;
}
###############################################################################
## ----------------------------------------------------------------------------
## Import routine.
##
###############################################################################
use constant { SELF => 0, CHUNK => 1, CID => 2 };
our $_MCE_LOCK : shared = 1;
our $_has_threads;
our $MAX_WORKERS = 1;
our $CHUNK_SIZE = 1;
our $TMP_DIR = $MCE::Signal::tmp_dir;
our $FREEZE = \&Storable::freeze;
our $THAW = \&Storable::thaw;
my $_loaded;
sub import {
my $_class = shift; return if ($_loaded++);
## Process module arguments.
while (my $_argument = shift) {
my $_arg = lc $_argument;
$MAX_WORKERS = shift and next if ( $_arg eq 'max_workers' );
$CHUNK_SIZE = shift and next if ( $_arg eq 'chunk_size' );
$TMP_DIR = shift and next if ( $_arg eq 'tmp_dir' );
$FREEZE = shift and next if ( $_arg eq 'freeze' );
$THAW = shift and next if ( $_arg eq 'thaw' );
if ( $_arg eq 'sereal' ) {
if (shift eq '1') {
local $@; eval 'use Sereal qw(encode_sereal decode_sereal)';
unless ($@) {
$MCE::FREEZE = \&encode_sereal;
$MCE::THAW = \&decode_sereal;
}
}
next;
}
if ( $_arg eq 'export_const' || $_arg eq 'const' ) {
if (shift eq '1') {
no strict 'refs'; no warnings 'redefine';
my $_package = caller;
*{ $_package . '::SELF' } = \&SELF;
*{ $_package . '::CHUNK' } = \&CHUNK;
*{ $_package . '::CID' } = \&CID;
}
next;
}
_croak("MCE::import: ($_argument) is not a valid module argument");
}
## Please include your threading library of choice prior to including
## the MCE library. This is only a requirement if wanting to use threads
## versus forking.
unless (defined $_has_threads) {
if (defined $threads::VERSION) {
unless (defined $threads::shared::VERSION) {
local $@; local $SIG{__DIE__} = \&_NOOP;
eval 'use threads::shared; threads::shared::share($_MCE_LOCK)';
}
$_has_threads = 1;
}
$_has_threads = $_has_threads || 0;
}
## Preload essential modules early on.
require MCE::Core::Validation;
require MCE::Core::Manager;
require MCE::Core::Worker;
require MCE::Util;
{
no strict 'refs'; no warnings 'redefine';
*{ 'MCE::_parse_max_workers' } = \&MCE::Util::_parse_max_workers;
}
## Instantiate a module-level instance.
$MCE = MCE->new( _module_instance => 1, max_workers => 0 );
return;
}
###############################################################################
## ----------------------------------------------------------------------------
## Define constants & variables.
##
###############################################################################
use constant {
DATA_CHANNELS => 8, ## Maximum IPC "DATA" channels
MAX_CHUNK_SIZE => 24 * 1024 * 1024, ## Maximum chunk size allowed
MAX_RECS_SIZE => 8192, ## Reads # of records if <= value
## Reads # of bytes if > value
OUTPUT_W_ABT => 'W~ABT', ## Worker has aborted
OUTPUT_W_DNE => 'W~DNE', ## Worker has completed
OUTPUT_W_EXT => 'W~EXT', ## Worker has exited
OUTPUT_A_ARY => 'A~ARY', ## Array << Array
OUTPUT_S_GLB => 'S~GLB', ## Scalar << Glob FH
OUTPUT_U_ITR => 'U~ITR', ## User << Iterator
OUTPUT_A_CBK => 'A~CBK', ## Callback w/ multiple args
OUTPUT_S_CBK => 'S~CBK', ## Callback w/ 1 scalar arg
OUTPUT_N_CBK => 'N~CBK', ## Callback w/ no args
OUTPUT_A_GTR => 'A~GTR', ## Gather w/ multiple args
OUTPUT_R_GTR => 'R~GTR', ## Gather w/ 1 reference arg
OUTPUT_S_GTR => 'S~GTR', ## Gather w/ 1 scalar arg
OUTPUT_O_SND => 'O~SND', ## Send >> STDOUT
OUTPUT_E_SND => 'E~SND', ## Send >> STDERR
OUTPUT_F_SND => 'F~SND', ## Send >> File
OUTPUT_D_SND => 'D~SND', ## Send >> File descriptor
OUTPUT_B_SYN => 'B~SYN', ## Barrier sync - begin
OUTPUT_E_SYN => 'E~SYN', ## Barrier sync - end
READ_FILE => 0, ## Worker reads file handle
READ_MEMORY => 1, ## Worker reads memory handle
REQUEST_ARRAY => 0, ## Worker requests next array chunk
REQUEST_GLOB => 1, ## Worker requests next glob chunk
SENDTO_FILEV1 => 0, ## Worker sends to 'file', $a, '/path'
SENDTO_FILEV2 => 1, ## Worker sends to 'file:/path', $a
SENDTO_STDOUT => 2, ## Worker sends to STDOUT
SENDTO_STDERR => 3, ## Worker sends to STDERR
SENDTO_FD => 4, ## Worker sends to file descriptor
WANTS_UNDEF => 0, ## Callee wants nothing
WANTS_ARRAY => 1, ## Callee wants list
WANTS_SCALAR => 2, ## Callee wants scalar
WANTS_REF => 3 ## Callee wants H/A/S ref
};
my $_mce_count = 0;
our %_mce_sess_dir = ();
our %_mce_spawned = ();
$MCE::Signal::mce_sess_dir_ref = \%_mce_sess_dir;
$MCE::Signal::mce_spawned_ref = \%_mce_spawned;
## Warnings are disabled to minimize bits of noise when user or OS signals
## the script to exit. e.g. MCE_script.pl < infile | head
no warnings 'threads'; no warnings 'uninitialized';
sub DESTROY { }
###############################################################################
## ----------------------------------------------------------------------------
## Plugin interface for external modules plugging into MCE, e.g. MCE::Queue.
##
###############################################################################
my (%_plugin_function, @_plugin_loop_begin, @_plugin_loop_end);
my (%_plugin_list, @_plugin_worker_init);
sub _attach_plugin {
my $_ext_module = caller;
unless (exists $_plugin_list{$_ext_module}) {
$_plugin_list{$_ext_module} = 1;
my $_ext_output_function = $_[0];
my $_ext_output_loop_begin = $_[1];
my $_ext_output_loop_end = $_[2];
my $_ext_worker_init = $_[3];
return unless (ref $_ext_output_function eq 'HASH');
return unless (ref $_ext_output_loop_begin eq 'CODE');
return unless (ref $_ext_output_loop_end eq 'CODE');
return unless (ref $_ext_worker_init eq 'CODE');
for (keys %{ $_ext_output_function }) {
$_plugin_function{$_} = $_ext_output_function->{$_}
unless (exists $_plugin_function{$_});
}
push @_plugin_loop_begin, $_ext_output_loop_begin;
push @_plugin_loop_end, $_ext_output_loop_end;
push @_plugin_worker_init, $_ext_worker_init;
}
@_ = ();
return;
}
## Functions for saving and restoring $MCE. This is mainly helpful for
## modules using MCE. e.g. MCE::Map.
sub _save_state {
$_prev_mce = $MCE;
return;
}
sub _restore_state {
$MCE = $_prev_mce; $_prev_mce = undef;
return;
}
###############################################################################
## ----------------------------------------------------------------------------
## New instance instantiation.
##
###############################################################################
sub new {
my ($class, %argv) = @_;
@_ = ();
my $self = {}; bless($self, ref($class) || $class);
## Public options.
$self->{max_workers} = (exists $argv{max_workers})
? $argv{max_workers} : $MCE::MAX_WORKERS;
$self->{chunk_size} = $argv{chunk_size} || $MCE::CHUNK_SIZE;
$self->{tmp_dir} = $argv{tmp_dir} || $MCE::TMP_DIR;
$self->{freeze} = $argv{freeze} || $MCE::FREEZE;
$self->{thaw} = $argv{thaw} || $MCE::THAW;
$self->{task_name} = $argv{task_name} || 'MCE';
if (exists $argv{_module_instance}) {
$self->{_spawned} = $self->{_task_id} = $self->{_task_wid} =
$self->{_chunk_id} = $self->{_wid} = $self->{_wrk_status} = 0;
return $MCE = $self;
}
if (exists $argv{use_threads} and defined $argv{use_threads}) {
$self->{use_threads} = $argv{use_threads};
if (!$_has_threads && $argv{use_threads} ne '0') {
my $_msg = "\n";
$_msg .= "## Please include threads support prior to loading MCE\n";
$_msg .= "## when specifying use_threads => $argv{use_threads}\n";
$_msg .= "\n";
_croak($_msg);
}
}
else {
$self->{use_threads} = ($_has_threads) ? 1 : 0;
}
$MCE::Signal::has_threads = 1
if ($self->{use_threads} && !$MCE::Signal::has_threads);
$self->{gather} = $argv{gather} if (exists $argv{gather});
$self->{interval} = $argv{interval} if (exists $argv{interval});
$self->{input_data} = $argv{input_data} if (exists $argv{input_data});
$self->{sequence} = $argv{sequence} if (exists $argv{sequence});
$self->{bounds_only} = $argv{bounds_only} if (exists $argv{bounds_only});
$self->{job_delay} = $argv{job_delay} if (exists $argv{job_delay});
$self->{spawn_delay} = $argv{spawn_delay} if (exists $argv{spawn_delay});
$self->{submit_delay} = $argv{submit_delay} if (exists $argv{submit_delay});
$self->{on_post_exit} = $argv{on_post_exit} if (exists $argv{on_post_exit});
$self->{on_post_run} = $argv{on_post_run} if (exists $argv{on_post_run});
$self->{user_args} = $argv{user_args} if (exists $argv{user_args});
$self->{user_begin} = $argv{user_begin} if (exists $argv{user_begin});
$self->{user_func} = $argv{user_func} if (exists $argv{user_func});
$self->{user_end} = $argv{user_end} if (exists $argv{user_end});
$self->{user_error} = $argv{user_error} if (exists $argv{user_error});
$self->{user_output} = $argv{user_output} if (exists $argv{user_output});
$self->{stderr_file} = $argv{stderr_file} if (exists $argv{stderr_file});
$self->{stdout_file} = $argv{stdout_file} if (exists $argv{stdout_file});
$self->{user_tasks} = $argv{user_tasks} if (exists $argv{user_tasks});
$self->{task_end} = $argv{task_end} if (exists $argv{task_end});
$self->{RS} = $argv{RS} if (exists $argv{RS});
$self->{flush_file} = $argv{flush_file} || 0;
$self->{flush_stderr} = $argv{flush_stderr} || 0;
$self->{flush_stdout} = $argv{flush_stdout} || 0;
$self->{use_slurpio} = $argv{use_slurpio} || 0;
$self->{parallel_io} = $argv{parallel_io} || 0;
## -------------------------------------------------------------------------
## Validation.
for (keys %argv) {
_croak("MCE::new: ($_) is not a valid constructor argument")
unless (exists $_valid_fields_new{$_});
}
_croak("MCE::new: ($self->{tmp_dir}) is not a directory or does not exist")
unless (-d $self->{tmp_dir});
_croak("MCE::new: ($self->{tmp_dir}) is not writeable")
unless (-w $self->{tmp_dir});
if (defined $self->{user_tasks}) {
_croak('MCE::new: (user_tasks) is not an ARRAY reference')
unless (ref $self->{user_tasks} eq 'ARRAY');
$self->{max_workers} = _parse_max_workers($self->{max_workers});
for my $_task (@{ $self->{user_tasks} }) {
for (keys %{ $_task }) {
_croak("MCE::new: ($_) is not a valid task constructor argument")
unless (exists $_valid_fields_task{$_});
}
$_task->{max_workers} = $self->{max_workers}
unless (defined $_task->{max_workers});
$_task->{use_threads} = $self->{use_threads}
unless (defined $_task->{use_threads});
bless($_task, ref($self) || $self);
}
## File locking fails among children and threads under Cygwin.
## Must be all children or all threads, not intermixed.
my (%_values, $_value);
for my $_task (@{ $self->{user_tasks} }) {
$_value = (defined $_task->{use_threads})
? $_task->{use_threads} : $self->{use_threads};
$_values{$_value} = '';
}
_croak('MCE::new: (cannot mix) use_threads => 0/1 under Cygwin')
if ($_is_cygwin && keys %_values > 1);
}
_validate_args($self); %argv = ();
## -------------------------------------------------------------------------
## Private options. Limit chunk_size.
$self->{_chunk_id} = 0; ## Chunk ID
$self->{_send_cnt} = 0; ## Number of times data was sent via send
$self->{_spawned} = 0; ## Have workers been spawned
$self->{_task_id} = 0; ## Task ID, starts at 0 (array index)
$self->{_task_wid} = 0; ## Task Worker ID, starts at 1 per task
$self->{_wid} = 0; ## MCE Worker ID, starts at 1 per MCE instance
$self->{_wrk_status} = 0; ## For saving exit status when worker exits
$self->{chunk_size} = MAX_CHUNK_SIZE
if ($self->{chunk_size} > MAX_CHUNK_SIZE);
my $_total_workers = 0;
if (defined $self->{user_tasks}) {
$_total_workers += $_->{max_workers}
for (@{ $self->{user_tasks} });
}
else {
$_total_workers = $self->{max_workers};
}
$self->{_last_sref} = (ref $self->{input_data} eq 'SCALAR')
? $self->{input_data} : 0;
$self->{_data_channels} = ($_total_workers < DATA_CHANNELS)
? $_total_workers : DATA_CHANNELS;
$self->{_lock_chn} = ($_total_workers > DATA_CHANNELS)
? 1 : 0;
return $MCE = $self;
}
###############################################################################
## ----------------------------------------------------------------------------
## Spawn method.
##
###############################################################################
sub spawn {
my $x = shift; my $self = ref($x) ? $x : $MCE;
## To avoid leaking (Scalars leaked: 1) messages (fixed in Perl 5.12.x).
@_ = ();
_croak('MCE::spawn: method cannot be called by the worker process')
if ($self->{_wid});
## Return if workers have already been spawned.
return $self if ($self->{_spawned});
$MCE = undef;
lock $_MCE_LOCK if ($_has_threads); ## Obtain MCE lock.
my $_die_handler = $SIG{__DIE__}; $SIG{__DIE__} = \&_die;
my $_warn_handler = $SIG{__WARN__}; $SIG{__WARN__} = \&_warn;
## Configure tid/sid for this instance here, not in the new method above.
## We want the actual thread id in which spawn was called under.
unless ($self->{_mce_tid}) {
$self->{_mce_tid} = ($_has_threads) ? threads->tid() : '';
$self->{_mce_tid} = '' unless (defined $self->{_mce_tid});
$self->{_mce_sid} = $$ .'.'. $self->{_mce_tid} .'.'. (++$_mce_count);
}
my $_mce_sid = $self->{_mce_sid};
my $_sess_dir = $self->{_sess_dir};
my $_tmp_dir = $self->{tmp_dir};
## Create temp dir.
unless ($_sess_dir) {
_croak("MCE::spawn: ($_tmp_dir) is not defined")
if (!defined $_tmp_dir || $_tmp_dir eq '');
_croak("MCE::spawn: ($_tmp_dir) is not a directory or does not exist")
unless (-d $_tmp_dir);
_croak("MCE::spawn: ($_tmp_dir) is not writeable")
unless (-w $_tmp_dir);
my $_cnt = 0; $_sess_dir = $self->{_sess_dir} = "$_tmp_dir/$_mce_sid";
$_sess_dir = $self->{_sess_dir} = "$_tmp_dir/$_mce_sid." . (++$_cnt)
while ( !(mkdir $_sess_dir, 0770) );
$_mce_sess_dir{$_sess_dir} = 1;
}
## Obtain lock.
open my $_COM_LOCK, '+>>:raw:stdio', "$_sess_dir/_com.lock"
or die "(M) open error $_sess_dir/_com.lock: $!\n";
flock $_COM_LOCK, LOCK_EX;
## -------------------------------------------------------------------------
my $_data_channels = $self->{_data_channels};
my $_max_workers = $self->{max_workers};
my $_use_threads = $self->{use_threads};
## Create socket pairs for IPC.
if (exists $self->{_dat_r_sock}) {
@{ $self->{_dat_r_sock} } = (); @{ $self->{_dat_w_sock} } = ();
} else {
$self->{_dat_r_sock} = []; $self->{_dat_w_sock} = [];
}
_create_socket_pair($self, '_bsb_r_sock', '_bsb_w_sock'); ## Sync
_create_socket_pair($self, '_bse_r_sock', '_bse_w_sock'); ## Sync
_create_socket_pair($self, '_com_r_sock', '_com_w_sock'); ## Core
_create_socket_pair($self, '_que_r_sock', '_que_w_sock'); ## Core
_create_socket_pair($self, '_dat_r_sock', '_dat_w_sock', 0);
_create_socket_pair($self, '_dat_r_sock', '_dat_w_sock', $_)
for (1 .. $_data_channels);
## Place 1 char in one socket to ensure Perl loads the required modules
## prior to spawning. The last worker spawned will perform the read.
syswrite $self->{_que_w_sock}, $LF;
## Preload the input module if required.
if (!defined $self->{user_tasks}) {
if (defined $self->{input_data}) {
my $_ref_type = ref $self->{input_data};
if ($_ref_type eq '' || $_ref_type eq 'SCALAR') {
require MCE::Core::Input::Handle
unless (defined $MCE::Core::Input::Handle::VERSION);
}
elsif ($_ref_type eq 'CODE') {
require MCE::Core::Input::Iterator
unless (defined $MCE::Core::Input::Iterator::VERSION);
}
else {
require MCE::Core::Input::Request
unless (defined $MCE::Core::Input::Request::VERSION);
}
}
elsif (defined $self->{sequence}) {
require MCE::Core::Input::Sequence
unless (defined $MCE::Core::Input::Sequence::VERSION);
}
}
## -------------------------------------------------------------------------
## Spawn workers.
$_mce_spawned{$_mce_sid} = $self;
$self->{_pids} = []; $self->{_thrs} = []; $self->{_tids} = [];
$self->{_status} = []; $self->{_state} = []; $self->{_task} = [];
if (!defined $self->{user_tasks}) {
$self->{_total_workers} = $_max_workers;
$self->{_init_total_workers} = $_max_workers;
if (defined $_use_threads && $_use_threads == 1) {
_dispatch_thread($self, $_) for (1 .. $_max_workers);
} else {
_dispatch_child($self, $_) for (1 .. $_max_workers);
}
$self->{_task}->[0] = { _total_workers => $_max_workers };
for (1 .. $_max_workers) {
keys(%{ $self->{_state}->[$_] }) = 5;
$self->{_state}->[$_] = {
_task => undef, _task_id => undef, _task_wid => undef,
_params => undef, _chn => $_ % $_data_channels + 1
}
}
}
else {
my ($_task_id, $_wid);
$_task_id = $_wid = $self->{_total_workers} = 0;
$self->{_total_workers} += $_->{max_workers}
for (@{ $self->{user_tasks} });
$self->{_init_total_workers} = $self->{_total_workers};
for my $_task (@{ $self->{user_tasks} }) {
my $_tsk_use_threads = $_task->{use_threads};
if (defined $_tsk_use_threads && $_tsk_use_threads == 1) {
_dispatch_thread($self, ++$_wid, $_task, $_task_id, $_)
for (1 .. $_task->{max_workers});
} else {
_dispatch_child($self, ++$_wid, $_task, $_task_id, $_)
for (1 .. $_task->{max_workers});
}
$_task_id++;
}
$_task_id = $_wid = 0;
for my $_task (@{ $self->{user_tasks} }) {
$self->{_task}->[$_task_id] = {
_total_running => 0, _total_workers => $_task->{max_workers}
};
for (1 .. $_task->{max_workers}) {
keys(%{ $self->{_state}->[++$_wid] }) = 5;
$self->{_state}->[$_wid] = {
_task => $_task, _task_id => $_task_id, _task_wid => $_,
_params => undef, _chn => $_wid % $_data_channels + 1
}
}
$_task_id++;
}
}
## -------------------------------------------------------------------------
$self->{_com_lock} = $_COM_LOCK;
$self->{_send_cnt} = 0;
$self->{_spawned} = 1;
## Await reply from the last worker spawned.
if ($self->{_total_workers} > 0) {
local $/ = $LF; local $!;
my $_COM_R_SOCK = $self->{_com_r_sock};
<$_COM_R_SOCK>;
}
## Release lock.
flock $_COM_LOCK, LOCK_UN;
$SIG{__DIE__} = $_die_handler;
$SIG{__WARN__} = $_warn_handler;
$MCE = $self;
return $self;
}
###############################################################################
## ----------------------------------------------------------------------------
## Forchunk, foreach, and forseq methods.
##
###############################################################################
sub forchunk {
my $x = shift; my $self = ref($x) ? $x : $MCE;
my $_input_data = $_[0];
_validate_runstate($self, 'MCE::forchunk');
my ($_user_func, $_params_ref);
if (ref $_[1] eq 'HASH') {
$_user_func = $_[2]; $_params_ref = $_[1];
} else {
$_user_func = $_[1]; $_params_ref = {};
}
@_ = ();
_croak('MCE::forchunk: (input_data) is not specified')
unless (defined $_input_data);
_croak('MCE::forchunk: (code_block) is not specified')
unless (defined $_user_func);
$_params_ref->{input_data} = $_input_data;
$_params_ref->{user_func} = $_user_func;
$self->run(1, $_params_ref);
return $self;
}
sub foreach {
my $x = shift; my $self = ref($x) ? $x : $MCE;
my $_input_data = $_[0];
_validate_runstate($self, 'MCE::foreach');
my ($_user_func, $_params_ref);
if (ref $_[1] eq 'HASH') {
$_user_func = $_[2]; $_params_ref = $_[1];
} else {
$_user_func = $_[1]; $_params_ref = {};
}
@_ = ();
_croak('MCE::foreach: (input_data) is not specified')
unless (defined $_input_data);
_croak('MCE::foreach: (code_block) is not specified')
unless (defined $_user_func);
$_params_ref->{chunk_size} = 1;
$_params_ref->{input_data} = $_input_data;
$_params_ref->{user_func} = $_user_func;
$self->run(1, $_params_ref);
return $self;
}
sub forseq {
my $x = shift; my $self = ref($x) ? $x : $MCE;
my $_sequence = $_[0];
_validate_runstate($self, 'MCE::forseq');
my ($_user_func, $_params_ref);
if (ref $_[1] eq 'HASH') {
$_user_func = $_[2]; $_params_ref = $_[1];
} else {
$_user_func = $_[1]; $_params_ref = {};
}
@_ = ();
_croak('MCE::forseq: (sequence) is not specified')
unless (defined $_sequence);
_croak('MCE::forseq: (code_block) is not specified')
unless (defined $_user_func);
$_params_ref->{sequence} = $_sequence;
$_params_ref->{user_func} = $_user_func;
$self->run(1, $_params_ref);
return $self;
}
###############################################################################
## ----------------------------------------------------------------------------
## Process method.
##
###############################################################################
sub process {
my $x = shift; my $self = ref($x) ? $x : $MCE;
_validate_runstate($self, 'MCE::process');
my ($_input_data, $_params_ref);
if (ref $_[0] eq 'HASH') {
$_input_data = $_[1]; $_params_ref = $_[0];
} else {
$_input_data = $_[0]; $_params_ref = $_[1];
}
@_ = ();
## Set input data.
if (defined $_input_data) {
$_params_ref->{input_data} = $_input_data;
}
elsif ( !defined $_params_ref->{input_data} &&
!defined $_params_ref->{sequence} ) {
_croak('MCE::process: (input_data or sequence) is not specified');
}
## Pass 0 to "not" auto-shutdown after processing.
$self->run(0, $_params_ref);
return $self;
}
###############################################################################
## ----------------------------------------------------------------------------
## Restart worker method.
##
###############################################################################
sub restart_worker {
my $x = shift; my $self = ref($x) ? $x : $MCE;
@_ = ();
_croak('MCE::restart_worker: method cannot be called by the worker process')
if ($self->{_wid});
my $_wid = $self->{_exited_wid};
my $_params = $self->{_state}->[$_wid]->{_params};
my $_task_wid = $self->{_state}->[$_wid]->{_task_wid};
my $_task_id = $self->{_state}->[$_wid]->{_task_id};
my $_task = $self->{_state}->[$_wid]->{_task};
my $_chn = $self->{_state}->[$_wid]->{_chn};
$_params->{_chn} = $_chn;
my $_use_threads = (defined $_task_id)
? $_task->{use_threads} : $self->{use_threads};
$self->{_task}->[$_task_id]->{_total_running} += 1 if (defined $_task_id);
$self->{_task}->[$_task_id]->{_total_workers} += 1 if (defined $_task_id);
$self->{_total_running} += 1;
$self->{_total_workers} += 1;
if (defined $_use_threads && $_use_threads == 1) {
_dispatch_thread($self, $_wid, $_task, $_task_id, $_task_wid, $_params);
} else {
_dispatch_child($self, $_wid, $_task, $_task_id, $_task_wid, $_params);
}
sleep 0.001;
return;
}
###############################################################################
## ----------------------------------------------------------------------------
## Run method.
##
###############################################################################
sub run {
my $x = shift; my $self = ref($x) ? $x : $MCE;
_croak('MCE::run: method cannot be called by the worker process')
if ($self->{_wid});
my ($_auto_shutdown, $_params_ref);
if (ref $_[0] eq 'HASH') {
$_auto_shutdown = (defined $_[1]) ? $_[1] : 1;
$_params_ref = $_[0];
} else {
$_auto_shutdown = (defined $_[0]) ? $_[0] : 1;
$_params_ref = $_[1];
}
@_ = ();
my $_has_user_tasks = (defined $self->{user_tasks});
my $_requires_shutdown = 0;
## Unset params if workers have been sent user_data via send.
$_params_ref = undef if ($self->{_send_cnt});
## Set user_func to NOOP if not specified.
$self->{user_func} = \&_NOOP
if (!defined $self->{user_func} && !defined $_params_ref->{user_func});
## Set user specified params if specified.
if (defined $_params_ref && ref $_params_ref eq 'HASH') {
$_requires_shutdown = _sync_params($self, $_params_ref);
_validate_args($self);
}
## Shutdown workers if determined by _sync_params or if processing a
## scalar reference. Workers need to be restarted in order to pick up
## on the new code blocks and/or scalar reference.
if ($_has_user_tasks) {
$self->{input_data} = $self->{user_tasks}->[0]->{input_data}
if ($self->{user_tasks}->[0]->{input_data});
$self->{use_slurpio} = $self->{user_tasks}->[0]->{use_slurpio}
if ($self->{user_tasks}->[0]->{use_slurpio});
$self->{parallel_io} = $self->{user_tasks}->[0]->{parallel_io}
if ($self->{user_tasks}->[0]->{parallel_io});
$self->{RS} = $self->{user_tasks}->[0]->{RS}
if ($self->{user_tasks}->[0]->{RS});
}
$self->shutdown() if ($_requires_shutdown);
if (ref $self->{input_data} eq 'SCALAR') {
$self->shutdown()
unless $self->{_last_sref} == $self->{input_data};
$self->{_last_sref} = $self->{input_data};
}
## -------------------------------------------------------------------------
$self->{_wrk_status} = 0;
## Spawn workers.
$self->spawn() unless ($self->{_spawned});
return $self unless ($self->{_total_workers});
local $SIG{__DIE__} = \&_die;
local $SIG{__WARN__} = \&_warn;
$MCE = $self;
my ($_input_data, $_input_file, $_input_glob, $_seq);
my ($_abort_msg, $_first_msg, $_run_mode, $_single_dim);
my $_chunk_size = $self->{chunk_size};
$_seq = ($_has_user_tasks && $self->{user_tasks}->[0]->{sequence})
? $self->{user_tasks}->[0]->{sequence}
: $self->{sequence};
## Determine run mode for workers.
if (defined $_seq) {
my ($_begin, $_end, $_step, $_fmt) = (ref $_seq eq 'ARRAY')
? @{ $_seq } : ($_seq->{begin}, $_seq->{end}, $_seq->{step});
$_chunk_size = $self->{user_tasks}->[0]->{chunk_size}
if ($_has_user_tasks && $self->{user_tasks}->[0]->{chunk_size});
$_run_mode = 'sequence';
$_abort_msg = int(($_end - $_begin) / $_step / $_chunk_size) + 1;
$_first_msg = 0;
}
elsif (defined $self->{input_data}) {
my $_ref = ref $self->{input_data};
if ($_ref eq 'ARRAY') { ## Array mode.
$_run_mode = 'array';
$_input_data = $self->{input_data};
$_input_file = $_input_glob = undef;
$_single_dim = 1 if (ref $_input_data->[0] eq '');
$_abort_msg = 0; ## Flag: Has Data: No
$_first_msg = 1; ## Flag: Has Data: Yes
if (@{ $_input_data } == 0) {
return $self->shutdown() if ($_auto_shutdown == 1);
}
}
elsif ($_ref eq 'GLOB' || $_ref =~ /^IO::/) { ## Glob mode.
$_run_mode = 'glob';
$_input_glob = $self->{input_data};
$_input_data = $_input_file = undef;
$_abort_msg = 0; ## Flag: Has Data: No
$_first_msg = 1; ## Flag: Has Data: Yes
}
elsif ($_ref eq 'CODE') { ## Iterator mode.
$_run_mode = 'iterator';
$_input_data = $self->{input_data};
$_input_file = $_input_glob = undef;
$_abort_msg = 0; ## Flag: Has Data: No
$_first_msg = 1; ## Flag: Has Data: Yes
}
elsif ($_ref eq '') { ## File mode.
$_run_mode = 'file';
$_input_file = $self->{input_data};
$_input_data = $_input_glob = undef;
$_abort_msg = (-s $_input_file) + 1;
$_first_msg = 0; ## Begin at offset position
if ((-s $_input_file) == 0) {
return $self->shutdown() if ($_auto_shutdown == 1);
}
}
elsif ($_ref eq 'SCALAR') { ## Memory mode.
$_run_mode = 'memory';
$_input_data = $_input_file = $_input_glob = undef;
$_abort_msg = length(${ $self->{input_data} }) + 1;
$_first_msg = 0; ## Begin at offset position
if (length(${ $self->{input_data} }) == 0) {
return $self->shutdown() if ($_auto_shutdown == 1);
}
}
else {
_croak('MCE::run: (input_data) is not valid');
}
}
else { ## Nodata mode.
$_run_mode = 'nodata';
$_abort_msg = undef;
}
## -------------------------------------------------------------------------
my $_COM_LOCK = $self->{_com_lock};
my $_bounds_only = $self->{bounds_only};
my $_interval = $self->{interval};
my $_sequence = $self->{sequence};
my $_user_args = $self->{user_args};
my $_use_slurpio = $self->{use_slurpio};
my $_parallel_io = $self->{parallel_io};
my $_sess_dir = $self->{_sess_dir};
my $_total_workers = $self->{_total_workers};
my $_send_cnt = $self->{_send_cnt};
my $_RS = $self->{RS};
## Begin processing.
unless ($_send_cnt) {
my %_params = (
'_abort_msg' => $_abort_msg, '_run_mode' => $_run_mode,
'_chunk_size' => $_chunk_size, '_single_dim' => $_single_dim,
'_input_file' => $_input_file, '_interval' => $_interval,
'_sequence' => $_sequence, '_bounds_only' => $_bounds_only,
'_use_slurpio' => $_use_slurpio, '_parallel_io' => $_parallel_io,
'_user_args' => $_user_args, '_RS' => $_RS,
);
my %_params_nodata = (
'_abort_msg' => undef, '_run_mode' => 'nodata',
'_chunk_size' => $_chunk_size, '_single_dim' => $_single_dim,
'_input_file' => $_input_file, '_interval' => $_interval,
'_sequence' => $_sequence, '_bounds_only' => $_bounds_only,
'_use_slurpio' => $_use_slurpio, '_parallel_io' => $_parallel_io,
'_user_args' => $_user_args, '_RS' => $_RS,
);
local $\ = undef; local $/ = $LF;
lock $_MCE_LOCK if ($_has_threads); ## Obtain MCE lock.
my ($_wid, %_task0_wids);
my $_BSE_W_SOCK = $self->{_bse_w_sock};
my $_COM_R_SOCK = $self->{_com_r_sock};
my $_submit_delay = $self->{submit_delay};
my $_frozen_params = $self->{freeze}(\%_params);
my $_frozen_nodata;
$_frozen_nodata = $self->{freeze}(\%_params_nodata) if ($_has_user_tasks);
if ($_has_user_tasks) { for (1 .. @{ $self->{_state} } - 1) {
$_task0_wids{$_} = 1 unless ($self->{_state}->[$_]->{_task_id});
}}
## Insert the first message into the queue if defined.
if (defined $_first_msg) {
my $_QUE_W_SOCK = $self->{_que_w_sock};
syswrite $_QUE_W_SOCK, pack($_que_template, 0, $_first_msg);
}
## Submit params data to workers.
for (1 .. $_total_workers) {
print {$_COM_R_SOCK} $_ . $LF;
chomp($_wid = <$_COM_R_SOCK>);
if (!$_has_user_tasks || exists $_task0_wids{$_wid}) {
print {$_COM_R_SOCK} length($_frozen_params) . $LF . $_frozen_params;
$self->{_state}->[$_wid]->{_params} = \%_params;
} else {
print {$_COM_R_SOCK} length($_frozen_nodata) . $LF . $_frozen_nodata;
$self->{_state}->[$_wid]->{_params} = \%_params_nodata;
}
<$_COM_R_SOCK>;
sleep 0.003 if ($_is_winenv);
if (defined $_submit_delay && $_submit_delay > 0.0) {
sleep $_submit_delay;
}
}
sleep 0.005 if ($_is_winenv);
## Obtain lock.
flock $_COM_LOCK, LOCK_EX;
syswrite $_BSE_W_SOCK, $LF for (1 .. $_total_workers);
if (($self->{_mce_tid} ne '' && $self->{_mce_tid} ne '0') || $_is_winenv) {
sleep 0.002;
}
}
## -------------------------------------------------------------------------
$self->{_total_exited} = 0;
if ($_send_cnt) {
$self->{_total_running} = $_send_cnt;
$self->{_task}->[0]->{_total_running} = $_send_cnt;
}
else {
$self->{_total_running} = $_total_workers;
if (defined $self->{user_tasks}) {
$_->{_total_running} = $_->{_total_workers} for (@{ $self->{_task} });
}
}
## Call the output function.
if ($self->{_total_running} > 0) {
$self->{_abort_msg} = $_abort_msg;
$self->{_run_mode} = $_run_mode;
$self->{_single_dim} = $_single_dim;
_output_loop( $self, $_input_data, $_input_glob,
\%_plugin_function, \@_plugin_loop_begin, \@_plugin_loop_end
);
undef $self->{_abort_msg};
undef $self->{_run_mode};
undef $self->{_single_dim};
}
unless ($_send_cnt) {
## Remove the last message from the queue.
unless ($_run_mode eq 'nodata') {
if (defined $self->{_que_r_sock}) {
my $_next; my $_QUE_R_SOCK = $self->{_que_r_sock};
sysread $_QUE_R_SOCK, $_next, $_que_read_size;
}
}
## Release lock.
flock $_COM_LOCK, LOCK_UN;
}
$self->{_send_cnt} = 0;
## Shutdown workers (shutdown as well, if any workers have exited).
$self->shutdown() if ($_auto_shutdown == 1 || $self->{_total_exited} > 0);
return $self;
}
###############################################################################
## ----------------------------------------------------------------------------
## Send method.
##
###############################################################################
sub send {
my $x = shift; my $self = ref($x) ? $x : $MCE;
_croak('MCE::send: method cannot be called by the worker process')
if ($self->{_wid});
_croak('MCE::send: method cannot be called while running')
if ($self->{_total_running});
_croak('MCE::send: method cannot be used with input_data or sequence')
if (defined $self->{input_data} || defined $self->{sequence});
_croak('MCE::send: method cannot be used with user_tasks')
if (defined $self->{user_tasks});
my $_data_ref;
if (ref $_[0] eq 'ARRAY' || ref $_[0] eq 'HASH' || ref $_[0] eq 'PDL') {
$_data_ref = $_[0];
} else {
_croak('MCE::send: ARRAY, HASH, or a PDL reference is not specified');
}
@_ = ();
$self->{_send_cnt} = 0 unless (defined $self->{_send_cnt});
## -------------------------------------------------------------------------
## Spawn workers.
$self->spawn() unless ($self->{_spawned});
_croak('MCE::send: Sending greater than # of workers is not allowed')
if ($self->{_send_cnt} >= $self->{_task}->[0]->{_total_workers});
local $SIG{__DIE__} = \&_die;
local $SIG{__WARN__} = \&_warn;
## Begin data submission.
{
local $\ = undef; local $/ = $LF;
my $_COM_R_SOCK = $self->{_com_r_sock};
my $_sess_dir = $self->{_sess_dir};
my $_submit_delay = $self->{submit_delay};
my $_frozen_data = $self->{freeze}($_data_ref);
## Submit data to worker.
print {$_COM_R_SOCK} '_data' . $LF;
<$_COM_R_SOCK>;
print {$_COM_R_SOCK} length($_frozen_data) . $LF . $_frozen_data;
<$_COM_R_SOCK>;
if (defined $_submit_delay && $_submit_delay > 0.0) {
sleep $_submit_delay;
}
sleep 0.002 if ($_is_cygwin);
}
$self->{_send_cnt} += 1;
return $self;
}
###############################################################################
## ----------------------------------------------------------------------------
## Shutdown method.
##
###############################################################################
sub shutdown {
my $x = shift; my $self = ref($x) ? $x : $MCE;
@_ = ();
_validate_runstate($self, 'MCE::shutdown');
## Return if workers have not been spawned or have already been shutdown.
return unless ($self->{_spawned});
## Wait for workers to complete processing before shutting down.
$self->run(0) if ($self->{_send_cnt});
local $SIG{__DIE__} = \&_die;
local $SIG{__WARN__} = \&_warn;
lock $_MCE_LOCK if ($_has_threads); ## Obtain MCE lock.
my $_is_mce_thr = ($self->{_mce_tid} ne '' && $self->{_mce_tid} ne '0');
my $_COM_R_SOCK = $self->{_com_r_sock};
my $_data_channels = $self->{_data_channels};
my $_mce_sid = $self->{_mce_sid};
my $_sess_dir = $self->{_sess_dir};
my $_total_workers = $self->{_total_workers};
my $_lock_chn = $self->{_lock_chn};
## Delete entry.
delete $_mce_spawned{$_mce_sid};
## Notify workers to exit loop.
local $\ = undef; local $/ = $LF; local $!; local $?;
for (1 .. $_total_workers) {
print {$_COM_R_SOCK} '_exit' . $LF;
<$_COM_R_SOCK>;
}
CORE::shutdown $self->{_bse_w_sock}, 2; ## Barrier end channels
CORE::shutdown $self->{_bse_r_sock}, 2;
## Reap children/threads.
if ( $self->{_pids} && @{ $self->{_pids} } > 0 ) {
my $_list = $self->{_pids};
for my $i (0 .. @{ $_list }) {
waitpid $_list->[$i], 0 if ($_list->[$i]);
}
}
elsif ( $self->{_thrs} && @{ $self->{_thrs} } > 0 ) {
my $_list = $self->{_thrs};
for my $i (0 .. @{ $_list }) {
${ $_list->[$i] }->join() if ($_list->[$i]);
}
}
close $self->{_com_lock}; undef $self->{_com_lock};
## -------------------------------------------------------------------------
## Close sockets.
CORE::shutdown $self->{_bsb_w_sock}, 2; ## Barrier begin channels
CORE::shutdown $self->{_bsb_r_sock}, 2;
CORE::shutdown $self->{_com_w_sock}, 2; ## Communication channels
CORE::shutdown $self->{_com_r_sock}, 2;
CORE::shutdown $self->{_que_w_sock}, 2; ## Queue channels
CORE::shutdown $self->{_que_r_sock}, 2;
CORE::shutdown $self->{_dat_w_sock}->[0], 2; ## Data channels
CORE::shutdown $self->{_dat_r_sock}->[0], 2;
for (1 .. $_data_channels) {
CORE::shutdown $self->{_dat_w_sock}->[$_], 2;
CORE::shutdown $self->{_dat_r_sock}->[$_], 2;
}
for (
qw( _bsb_w_sock _bsb_r_sock _bse_w_sock _bse_r_sock _com_w_sock
_com_r_sock _que_w_sock _que_r_sock _dat_w_sock _dat_r_sock )
) {
if (ref $self->{$_} eq 'ARRAY') {
for my $_s (@{ $self->{$_} }) {
close $_s; undef $_s;
}
} else {
close $self->{$_}; undef $self->{$_};
}
}
## -------------------------------------------------------------------------
## Remove the session directory.
if (defined $_sess_dir) {
unlink "$_sess_dir/_dat.lock.e"
if (-e "$_sess_dir/_dat.lock.e");
if ($_lock_chn) {
unlink "$_sess_dir/_dat.lock.$_" for (1 .. $_data_channels);
}
unlink "$_sess_dir/_com.lock";
rmdir "$_sess_dir";
delete $_mce_sess_dir{$_sess_dir};
}
## Reset instance.
@{$self->{_pids}} = (); @{$self->{_thrs}} = (); @{$self->{_tids}} = ();
@{$self->{_status}} = (); @{$self->{_state}} = (); @{$self->{_task}} = ();
$self->{_mce_sid} = $self->{_mce_tid} = $self->{_sess_dir} = undef;
$self->{_chunk_id} = $self->{_send_cnt} = $self->{_spawned} = 0;
sleep($_is_winenv ? 0.082 : 0.008) if ($_is_mce_thr);
$self->{_total_running} = $self->{_total_workers} = 0;
$self->{_total_exited} = $self->{_last_sref} = 0;
return;
}
###############################################################################
## ----------------------------------------------------------------------------
## Barrier sync method.
##
###############################################################################
sub sync {
my $x = shift; my $self = ref($x) ? $x : $MCE;
@_ = ();
_croak('MCE::sync: method cannot be called by the manager process')
unless ($self->{_wid});
## Barrier synchronization is supported for task 0 at this time.
## Note: Workers are assigned task_id 0 when omitting user_tasks.
return if ($self->{_task_id} > 0);
my $_chn = $self->{_chn};
my $_DAT_LOCK = $self->{_dat_lock};
my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
my $_BSB_R_SOCK = $self->{_bsb_r_sock};
my $_BSE_R_SOCK = $self->{_bse_r_sock};
my $_lock_chn = $self->{_lock_chn};
my $_buffer;
local $\ = undef if (defined $\); local $/ = $LF if (!$/ || $/ ne $LF);
## Notify the manager process (begin).
flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
print {$_DAT_W_SOCK} OUTPUT_B_SYN . $LF . $_chn . $LF;
flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);
## Wait here until all workers (task_id 0) have synced.
sysread $_BSB_R_SOCK, $_buffer, 1;
## Notify the manager process (end).
flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
print {$_DAT_W_SOCK} OUTPUT_E_SYN . $LF . $_chn . $LF;
flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);
## Wait here until all workers (task_id 0) have un-synced.
sysread $_BSE_R_SOCK, $_buffer, 1;
return;
}
###############################################################################
## ----------------------------------------------------------------------------
## Yield method.
##
###############################################################################
sub yield {
my $x = shift; my $self = ref($x) ? $x : $MCE;
@_ = ();
return unless ($self->{_i_wrk_st});
return unless ($self->{_task_wid});
my $_delay = $self->{_i_wrk_st} - time;
my $_count;
if ($_delay < 0.0) {
$_count = int($_delay * -1 / $self->{_i_app_tb} + 0.5) + 1;
$_delay += $self->{_i_app_tb} * $_count;
}
sleep $_delay if ($_delay > 0.0);
if ($_count && $_count > 2_000_000_000) {
$self->{_i_wrk_st} = time;
}
return;
}
###############################################################################
## ----------------------------------------------------------------------------
## Miscellaneous methods: abort exit last next status.
##
###############################################################################
## Abort current job.
sub abort {
my $x = shift; my $self = ref($x) ? $x : $MCE;
@_ = ();
my $_QUE_R_SOCK = $self->{_que_r_sock};
my $_QUE_W_SOCK = $self->{_que_w_sock};
my $_abort_msg = $self->{_abort_msg};
my $_lock_chn = $self->{_lock_chn};
if (defined $_abort_msg) {
local $\ = undef;
if ($_abort_msg > 0) {
my $_next; sysread $_QUE_R_SOCK, $_next, $_que_read_size;
syswrite $_QUE_W_SOCK, pack($_que_template, 0, $_abort_msg);
}
if ($self->{_wid} > 0) {
my $_chn = $self->{_chn};
my $_DAT_LOCK = $self->{_dat_lock};
my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
print {$_DAT_W_SOCK} OUTPUT_W_ABT . $LF . $_chn . $LF;
flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);
}
}
return;
}
## Worker exits from MCE.
sub exit {
my $x = shift; my $self = ref($x) ? $x : $MCE;
my $_exit_status = (defined $_[0]) ? $_[0] : $?;
my $_exit_msg = (defined $_[1]) ? $_[1] : '';
my $_exit_id = (defined $_[2]) ? $_[2] : '';
@_ = ();
_croak('MCE::exit: method cannot be called by the manager process')
unless ($self->{_wid});
delete $_mce_spawned{ $self->{_mce_sid} };
my $_chn = $self->{_chn};
my $_COM_LOCK = $self->{_com_lock};
my $_DAT_LOCK = $self->{_dat_lock};
my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
my $_lock_chn = $self->{_lock_chn};
my $_task_id = $self->{_task_id};
my $_sess_dir = $self->{_sess_dir};
unless ($self->{_exiting}) {
$self->{_exiting} = 1;
local $\ = undef if (defined $\);
my $_len = length $_exit_msg;
$_exit_id =~ s/[\r\n][\r\n]*/ /mg;
open my $_DAE_LOCK, '+>>:raw:stdio', "$_sess_dir/_dat.lock.e"
or die "(W) open error $_sess_dir/_dat.lock.e: $!\n";
flock $_DAE_LOCK, LOCK_EX;
sleep 0.05 if ($_is_winenv);
flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
print {$_DAT_W_SOCK} OUTPUT_W_EXT . $LF . $_chn . $LF;
print {$_DAU_W_SOCK}
$_task_id . $LF . $self->{_wid} . $LF . $self->{_exit_pid} . $LF .
$_exit_status . $LF . $_exit_id . $LF . $_len . $LF . $_exit_msg
;
flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);
flock $_DAE_LOCK, LOCK_UN;
close $_DAE_LOCK; undef $_DAE_LOCK;
}
## Exit thread/child process.
$SIG{__DIE__} = $SIG{__WARN__} = sub { };
select STDERR; $| = 1;
select STDOUT; $| = 1;
if ($_lock_chn) {
close $_DAT_LOCK; undef $_DAT_LOCK;
}
close $_COM_LOCK; undef $_COM_LOCK;
threads->exit($_exit_status)
if ($_has_threads && threads->can('exit'));
CORE::kill(9, $$) unless $_is_winenv;
CORE::exit($_exit_status);
return;
}
## Worker immediately exits the chunking loop.
sub last {
my $x = shift; my $self = ref($x) ? $x : $MCE;
@_ = ();
_croak('MCE::last: method cannot be called by the manager process')
unless ($self->{_wid});
$self->{_last_jmp}() if (defined $self->{_last_jmp});
return;
}
## Worker starts the next iteration of the chunking loop.
sub next {
my $x = shift; my $self = ref($x) ? $x : $MCE;
@_ = ();
_croak('MCE::next: method cannot be called by the manager process')
unless ($self->{_wid});
$self->{_next_jmp}() if (defined $self->{_next_jmp});
return;
}
## Return the exit status. "_wrk_status" holds the greatest exit status
## among workers exiting.
sub status {
my $x = shift; my $self = ref($x) ? $x : $MCE;
@_ = ();
_croak('MCE::status: method cannot be called by the worker process')
if ($self->{_wid});
return (defined $self->{_wrk_status}) ? $self->{_wrk_status} : 0;
}
###############################################################################
## ----------------------------------------------------------------------------
## Methods for serializing data from workers to the main thread.
##
###############################################################################
## Do method. Additional arguments are optional.
sub do {
my $x = shift; my $self = ref($x) ? $x : $MCE;
my $_callback = shift;
_croak('MCE::do: method cannot be called by the manager process')
unless ($self->{_wid});
_croak('MCE::do: (callback) is not specified')
unless (defined $_callback);
$_callback = "main::$_callback" if (index($_callback, ':') < 0);
return _do_callback($self, $_callback, \@_);
}
## Gather method.
sub gather {
my $x = shift; my $self = ref($x) ? $x : $MCE;
_croak('MCE::gather: method cannot be called by the manager process')
unless ($self->{_wid});
return _do_gather($self, \@_);
}
## Sendto method.
{
my %_sendto_lkup = (
'file' => SENDTO_FILEV1, 'FILE' => SENDTO_FILEV1,
'file:' => SENDTO_FILEV2, 'FILE:' => SENDTO_FILEV2,
'stdout' => SENDTO_STDOUT, 'STDOUT' => SENDTO_STDOUT,
'stderr' => SENDTO_STDERR, 'STDERR' => SENDTO_STDERR,
'fd:' => SENDTO_FD, 'FD:' => SENDTO_FD,
);
my $_v2_regx = qr/^([^:]+:)(.+)/;
sub sendto {
my $x = shift; my $self = ref($x) ? $x : $MCE;
my $_to = shift;
_croak('MCE::sendto: method cannot be called by the manager process')
unless ($self->{_wid});
return unless (defined $_[0]);
my ($_dest, $_value);
$_dest = (exists $_sendto_lkup{$_to}) ? $_sendto_lkup{$_to} : undef;
if (!defined $_dest) {
if (defined (my $_fd = fileno($_to))) {
my $_data = (scalar @_) ? join('', @_) : $_;
_do_send_glob($self, $_to, $_fd, \$_data);
return;
}
if (defined $_to && $_to =~ /$_v2_regx/o) {
$_dest = (exists $_sendto_lkup{$1}) ? $_sendto_lkup{$1} : undef;
$_value = $2;
}
if ( !defined $_dest || ( !defined $_value && (
$_dest == SENDTO_FILEV2 || $_dest == SENDTO_FD
))) {
my $_msg = "\n";
$_msg .= "MCE::sendto: improper use of method\n";
$_msg .= "\n";
$_msg .= "## usage:\n";
$_msg .= "## ->sendto(\"stderr\", ...);\n";
$_msg .= "## ->sendto(\"stdout\", ...);\n";
$_msg .= "## ->sendto(\"file:/path/to/file\", ...);\n";
$_msg .= "## ->sendto(\"fd:2\", ...);\n";
$_msg .= "\n";
_croak($_msg);
}
}
if ($_dest == SENDTO_FILEV1) { ## sendto 'file', $a, $path
return if (!defined $_[1] || @_ > 2); ## Please switch to using V2
$_value = $_[1]; delete $_[1]; ## sendto 'file:/path', $a
$_dest = SENDTO_FILEV2;
}
return _do_send($self, $_dest, $_value, @_);
}
}
###############################################################################
## ----------------------------------------------------------------------------
## Functions for serializing print, printf and say statements.
##
###############################################################################
sub print {
my $x = shift; my $self = ref($x) ? $x : $MCE;
if (defined (my $_fd = fileno($_[0]))) {
my $_glob = shift;
my $_data = (scalar @_) ? join('', @_) : $_;
_do_send_glob($self, $_glob, $_fd, \$_data);
}
else {
my $_data = (scalar @_) ? join('', @_) : $_;
if ($self->{_wid}) {
$self->_do_send(SENDTO_STDOUT, undef, \$_data);
}
else {
local $\ = undef if (defined $\);
print $_data;
}
}
@_ = ();
return;
}
sub printf {
my $x = shift; my $self = ref($x) ? $x : $MCE;
if (defined (my $_fd = fileno($_[0]))) {
my $_glob = shift; my $_fmt = shift || '%s';
my $_data = (scalar @_) ? sprintf($_fmt, @_) : sprintf($_fmt, $_);
_do_send_glob($self, $_glob, $_fd, \$_data);
}
else {
my $_fmt = shift || '%s';
my $_data = (scalar @_) ? sprintf($_fmt, @_) : sprintf($_fmt, $_);
if ($self->{_wid}) {
$self->_do_send(SENDTO_STDOUT, undef, \$_data);
}
else {
local $\ = undef if (defined $\);
print $_data;
}
}
@_ = ();
return;
}
sub say {
my $x = shift; my $self = ref($x) ? $x : $MCE;
if (defined (my $_fd = fileno($_[0]))) {
my $_glob = shift;
my $_data = (scalar @_) ? join("\n", @_) . "\n" : $_ . "\n";
_do_send_glob($self, $_glob, $_fd, \$_data);
}
else {
my $_data = (scalar @_) ? join("\n", @_) . "\n" : $_ . "\n";
if ($self->{_wid}) {
$self->_do_send(SENDTO_STDOUT, undef, \$_data);
}
else {
local $\ = undef if (defined $\);
print $_data;
}
}
@_ = ();
return;
}
###############################################################################
## ----------------------------------------------------------------------------
## Private methods.
##
###############################################################################
sub _die { return MCE::Signal->_die_handler(@_); }
sub _warn { return MCE::Signal->_warn_handler(@_); }
sub _croak {
$SIG{__DIE__} = \&MCE::_die;
$SIG{__WARN__} = \&MCE::_warn;
$\ = undef; require Carp;
goto &Carp::croak;
}
sub _NOOP { }
###############################################################################
## ----------------------------------------------------------------------------
## Create socket pair.
##
###############################################################################
sub _create_socket_pair {
my ($self, $_r_sock, $_w_sock, $_i) = @_;
@_ = (); local $!;
die 'Private method called' unless (caller)[0]->isa( ref $self );
if (defined $_i) {
socketpair( $self->{$_r_sock}->[$_i], $self->{$_w_sock}->[$_i],
PF_UNIX, SOCK_STREAM, PF_UNSPEC ) or die "socketpair: $!\n";
binmode $self->{$_r_sock}->[$_i];
binmode $self->{$_w_sock}->[$_i];
## Autoflush handles.
my $_old_hndl = select $self->{$_r_sock}->[$_i]; $| = 1;
select $self->{$_w_sock}->[$_i]; $| = 1;
select $_old_hndl;
}
else {
socketpair( $self->{$_r_sock}, $self->{$_w_sock},
PF_UNIX, SOCK_STREAM, PF_UNSPEC ) or die "socketpair: $!\n";
binmode $self->{$_r_sock};
binmode $self->{$_w_sock};
## Autoflush handles.
my $_old_hndl = select $self->{$_r_sock}; $| = 1;
select $self->{$_w_sock}; $| = 1;
select $_old_hndl;
}
return;
}
###############################################################################
## ----------------------------------------------------------------------------
## Sync methods.
##
###############################################################################
sub _sync_buffer_to_array {
my $_cnt = 0; my ($_buffer_ref, $_array_ref) = @_;
@_ = ();
open my $_MEM_FILE, '<', $_buffer_ref;
binmode $_MEM_FILE;
$_array_ref->[$_cnt++] = $_ while (<$_MEM_FILE>);
close $_MEM_FILE; undef $_MEM_FILE;
if ($_cnt < @{ $_array_ref }) {
delete @{ $_array_ref }[$_cnt .. @{ $_array_ref } - 1];
}
return;
}
sub _sync_params {
my ($self, $_params_ref) = @_;
@_ = ();
die 'Private method called' unless (caller)[0]->isa( ref $self );
my $_requires_shutdown = 0;
for (qw( user_begin user_func user_end )) {
if (defined $_params_ref->{$_}) {
$self->{$_} = $_params_ref->{$_};
delete $_params_ref->{$_};
$_requires_shutdown = 1;
}
}
for (keys %{ $_params_ref }) {
_croak("MCE::_sync_params: ($_) is not a valid params argument")
unless (exists $_params_allowed_args{$_});
$self->{$_} = $_params_ref->{$_};
}
return ($self->{_spawned}) ? $_requires_shutdown : 0;
}
###############################################################################
## ----------------------------------------------------------------------------
## Worker process -- Wrap.
##
###############################################################################
sub _worker_wrap {
$MCE = $_[0];
return _worker_main(@_, \@_plugin_worker_init);
}
###############################################################################
## ----------------------------------------------------------------------------
## Dispatch thread.
##
###############################################################################
sub _dispatch_thread {
my ($self, $_wid, $_task, $_task_id, $_task_wid, $_params) = @_;
@_ = ();
die 'Private method called' unless (caller)[0]->isa( ref $self );
my $_thr = threads->create( \&_worker_wrap,
$self, $_wid, $_task, $_task_id, $_task_wid, $_params
);
_croak("MCE::_dispatch_thread: Failed to spawn worker $_wid: $!")
unless (defined $_thr);
if (defined $_thr) {
## Store into an available slot, otherwise append to arrays.
if (defined $_params) { for (0 .. @{ $self->{_tids} } - 1) {
unless (defined $self->{_tids}->[$_]) {
$self->{_thrs}->[$_] = \$_thr;
$self->{_tids}->[$_] = $_thr->tid();
return;
}
}}
push @{ $self->{_thrs} }, \$_thr;
push @{ $self->{_tids} }, $_thr->tid();
}
if (defined $self->{spawn_delay} && $self->{spawn_delay} > 0.0) {
sleep $self->{spawn_delay};
}
return;
}
###############################################################################
## ----------------------------------------------------------------------------
## Dispatch child.
##
###############################################################################
sub _dispatch_child {
my ($self, $_wid, $_task, $_task_id, $_task_wid, $_params) = @_;
@_ = ();
die 'Private method called' unless (caller)[0]->isa( ref $self );
my $_pid = fork();
_croak("MCE::_dispatch_child: Failed to spawn worker $_wid: $!")
unless (defined $_pid);
unless ($_pid) {
_worker_wrap($self, $_wid, $_task, $_task_id, $_task_wid, $_params);
CORE::kill(9, $$) unless $_is_winenv;
CORE::exit(0);
}
if (defined $_pid) {
## Store into an available slot, otherwise append to array.
if (defined $_params) { for (0 .. @{ $self->{_pids} } - 1) {
unless (defined $self->{_pids}->[$_]) {
$self->{_pids}->[$_] = $_pid;
return;
}
}}
push @{ $self->{_pids} }, $_pid;
}
if (defined $self->{spawn_delay} && $self->{spawn_delay} > 0.0) {
sleep $self->{spawn_delay};
}
return;
}
1;