The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
###############################################################################
## ----------------------------------------------------------------------------
## Core methods for the manager process.
##
## This package provides the loop and relevant methods used internally by the
## manager process.
##
## There is no public API.
##
###############################################################################

package MCE::Core::Manager;

use strict;
use warnings;

our $VERSION = '1.815';

## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (TestingAndDebugging::ProhibitNoStrict)

## Items below are folded into MCE.

package # hide from rpm
   MCE;

no warnings qw( threads recursion uninitialized );

## POSIX.pm is a big module. The following constant covers most platforms.
use constant { _WNOHANG => $^O eq 'solaris' ? 64 : 1 };

use bytes;

###############################################################################
## ----------------------------------------------------------------------------
## Call on task_end after task completion.
##
###############################################################################

sub _task_end {

   my ($self, $_task_id) = @_;

   @_ = ();

   if (defined $self->{user_tasks}) {
      my $_task_end = (exists $self->{user_tasks}->[$_task_id]->{task_end})
         ? $self->{user_tasks}->[$_task_id]->{task_end}
         : $self->{task_end};

      if (defined $_task_end) {
         my $_task_name = (exists $self->{user_tasks}->[$_task_id]->{task_name})
            ? $self->{user_tasks}->[$_task_id]->{task_name}
            : $self->{task_name};

         $_task_end->($self, $_task_id, $_task_name);
      }
   }

   return;
}

###############################################################################
## ----------------------------------------------------------------------------
## Process output.
##
## Awaits and processes events from workers. The sendto/do methods tag the
## output accordingly. The hash structure below is key-driven.
##
###############################################################################

sub _output_loop {

   my ( $self, $_input_data, $_input_glob, $_plugin_function,
        $_plugin_loop_begin, $_plugin_loop_end ) = @_;

   @_ = ();

   my (
      $_aborted, $_eof_flag, $_max_retries, $_syn_flag, %_sendto_fhs,
      $_cb, $_chunk_id, $_chunk_size, $_fd, $_file, $_flush_file, $_wa,
      @_is_c_ref, @_is_h_ref, @_is_q_ref, $_on_post_exit, $_on_post_run,
      $_has_user_tasks, $_sess_dir, $_task_id, $_user_error, $_user_output,
      $_input_size, $_offset_pos, $_single_dim, @_gather, $_cs_one_flag,
      $_exit_id, $_exit_pid, $_exit_status, $_exit_wid, $_len, $_sync_cnt,
      $_BSB_W_SOCK, $_BSE_W_SOCK, $_DAT_R_SOCK, $_DAU_R_SOCK, $_MCE_STDERR,
      $_I_FLG, $_O_FLG, $_I_SEP, $_O_SEP, $_RS, $_RS_FLG, $_MCE_STDOUT,
      @_delay_wid, $_size_completed, $_win32_ipc
   );

   ## -------------------------------------------------------------------------
   ## Callback return.

   my $_cb_ret_a = sub {                          # CBK return array

      my $_buf = $self->{freeze}($_[0]);
         $_len = length $_buf; local $\ = undef if (defined $\);

      print {$_DAU_R_SOCK} $_len.$LF, $_buf;

      return;
   };

   my $_cb_ret_r = sub {                          # CBK return reference

      my $_buf = $self->{freeze}($_[0]);
         $_len = length $_buf; local $\ = undef if (defined $\);

      print {$_DAU_R_SOCK} WANTS_REF.$LF . $_len.$LF, $_buf;

      return;
   };

   my $_cb_ret_s = sub {                          # CBK return scalar

      $_len = (defined $_[0]) ? length $_[0] : -1;
      local $\ = undef if (defined $\);

      print {$_DAU_R_SOCK} WANTS_SCALAR.$LF . $_len.$LF, $_[0];

      return;
   };

   ## -------------------------------------------------------------------------
   ## Create hash structure containing various output functions.

   my %_core_output_function = (

      OUTPUT_W_ABT.$LF => sub {                   # Worker has aborted
         $_aborted = 1;
         return;
      },

      OUTPUT_W_DNE.$LF => sub {                   # Worker has completed
         chomp($_task_id = <$_DAU_R_SOCK>);
         $self->{_total_running} -= 1;

         if ($_has_user_tasks && $_task_id >= 0) {
            $self->{_task}->[$_task_id]->{_total_running} -= 1;
         }

         my $_total_running = ($_has_user_tasks)
            ? $self->{_task}->[$_task_id]->{_total_running}
            : $self->{_total_running};

         if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) {
            if ($_sync_cnt == $_total_running) {
               for (1 .. $_total_running) { syswrite $_BSB_W_SOCK, $LF }
               undef $_syn_flag;
            }
         }

         _task_end($self, $_task_id) unless ($_total_running);

         return;
      },

      ## ----------------------------------------------------------------------

      OUTPUT_W_EXT.$LF => sub {                   # Worker has exited
         chomp($_task_id = <$_DAU_R_SOCK>);

         $self->{_total_exited}  += 1;
         $self->{_total_running} -= 1;
         $self->{_total_workers} -= 1;

         if ($_has_user_tasks && $_task_id >= 0) {
            $self->{_task}->[$_task_id]->{_total_running} -= 1;
            $self->{_task}->[$_task_id]->{_total_workers} -= 1;
         }

         my $_total_running = ($_has_user_tasks)
            ? $self->{_task}->[$_task_id]->{_total_running}
            : $self->{_total_running};

         if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) {
            if ($_sync_cnt == $_total_running) {
               for (1 .. $_total_running) { syswrite $_BSB_W_SOCK, $LF }
               undef $_syn_flag;
            }
         }

         my ($_exit_msg, $_retry_buf) = ('', '');

         chomp($_exit_wid    = <$_DAU_R_SOCK>),
         chomp($_exit_pid    = <$_DAU_R_SOCK>),
         chomp($_exit_status = <$_DAU_R_SOCK>),
         chomp($_exit_id     = <$_DAU_R_SOCK>),
         chomp($_len         = <$_DAU_R_SOCK>);

         read($_DAU_R_SOCK, $_exit_msg, $_len) if ($_len);

         chomp($_len = <$_DAU_R_SOCK>);

         read($_DAU_R_SOCK, $_retry_buf, $_len) if ($_len);

         if (abs($_exit_status) > abs($self->{_wrk_status})) {
            $self->{_wrk_status} = $_exit_status;
         }

         ## Reap child/thread. Note: Win32 uses negative PIDs.

         if ($_exit_pid =~ /^PID_(-?\d+)/) {
            my $_pid = $1; my $_list = $self->{_pids};
            for my $i (0 .. @{ $_list }) {
               if ($_list->[$i] && $_list->[$i] == $_pid) {
                  waitpid $_pid, 0;
                  $self->{_pids}->[$i] = undef;
                  last;
               }
            }
         }
         elsif ($_exit_pid =~ /^TID_(\d+)/) {
            my $_tid = $1; my $_list = $self->{_tids};
            for my $i (0 .. @{ $_list }) {
               if ($_list->[$i] && $_list->[$i] == $_tid) {
                  $self->{_thrs}->[$i]->join();
                  $self->{_thrs}->[$i] = undef;
                  $self->{_tids}->[$i] = undef;
                  last;
               }
            }
         }

         ## Call on_post_exit callback if defined. Otherwise, append status
         ## information if on_post_run is defined for later retrieval.

         if (defined $_on_post_exit) {
            $self->{_exited_wid} = $_exit_wid;

            if (length($_retry_buf)) {
               $self->{_retry} = $self->{thaw}($_retry_buf);
               my $_retry_cnt  = $_max_retries - $self->{_retry}[2] - 1;

               $_on_post_exit->($self, {
                  wid => $_exit_wid, pid => $_exit_pid, status => $_exit_status,
                  msg => $_exit_msg, id  => $_exit_id
               }, $_retry_cnt);

               delete $self->{_retry};
            }
            else {
               $_on_post_exit->($self, {
                  wid => $_exit_wid, pid => $_exit_pid, status => $_exit_status,
                  msg => $_exit_msg, id  => $_exit_id
               }, $_max_retries || 0 );
            }

            delete $self->{_exited_wid};
         }
         elsif (defined $_on_post_run) {
            push @{ $self->{_status} }, {
               wid => $_exit_wid, pid => $_exit_pid, status => $_exit_status,
               msg => $_exit_msg, id  => $_exit_id
            };
         }

         _task_end($self, $_task_id) unless ($_total_running);

         return;
      },

      ## ----------------------------------------------------------------------

      OUTPUT_A_ARY.$LF => sub {                   # Array << Array
         my $_buf;

         if ($_offset_pos >= $_input_size || $_aborted) {
            local $\ = undef if (defined $\);
            print {$_DAU_R_SOCK} '0'.$LF;

            return;
         }

         if ($_single_dim && $_cs_one_flag) {
            $_buf = $_input_data->[$_offset_pos];
         }
         else {
            if ($_offset_pos + $_chunk_size - 1 < $_input_size) {
               $_buf = $self->{freeze}( [ @{ $_input_data }[
                  $_offset_pos .. $_offset_pos + $_chunk_size - 1
               ] ] );
            }
            else {
               $_buf = $self->{freeze}( [ @{ $_input_data }[
                  $_offset_pos .. $_input_size - 1
               ] ] );
            }
         }

         $_len = length $_buf; local $\ = undef if (defined $\);
         print {$_DAU_R_SOCK} $_len.$LF . (++$_chunk_id).$LF, $_buf;
         $_offset_pos += $_chunk_size;

         return;
      },

      OUTPUT_S_GLB.$LF => sub {                   # Scalar << Glob FH
         my $_buf = '';

         ## The logic below honors ('Ctrl/Z' in Windows, 'Ctrl/D' in Unix)
         ## when reading from standard input. No output will be lost as
         ## far as what was previously read into the buffer.

         if ($_eof_flag || $_aborted) {
            local $\ = undef if (defined $\);
            print {$_DAU_R_SOCK} '0'.$LF;

            return;
         }

         {
            local $/ = $_RS if ($_RS_FLG);

            if ($_chunk_size <= MAX_RECS_SIZE) {
               if ($_chunk_size == 1) {
                  $_buf = <$_input_glob>;
                  $_eof_flag = 1 unless (length $_buf);
               }
               else {
                  my $_last_len = 0;
                  for (1 .. $_chunk_size) {
                     $_buf .= <$_input_glob>;
                     $_len  = length $_buf;
                     if ($_len == $_last_len) {
                        $_eof_flag = 1;
                        last;
                     }
                     $_last_len = $_len;
                  }
               }
            }
            else {
               if (read($_input_glob, $_buf, $_chunk_size) == $_chunk_size) {
                  $_buf .= <$_input_glob>;
                  $_eof_flag = 1 if (length $_buf == $_chunk_size);
               }
               else {
                  $_eof_flag = 1;
               }
            }
         }

         $_len = length $_buf; local $\ = undef if (defined $\);

         if ($_len) {
            print {$_DAU_R_SOCK} $_len.$LF . (++$_chunk_id).$LF, $_buf;
         } else {
            print {$_DAU_R_SOCK} '0'.$LF;
         }

         return;
      },

      OUTPUT_U_ITR.$LF => sub {                   # User << Iterator
         my $_buf;

         if ($_aborted) {
            local $\ = undef if (defined $\);
            print {$_DAU_R_SOCK} '-1'.$LF;

            return;
         }

         my @_ret_a = $_input_data->($_chunk_size);

         if (scalar @_ret_a > 1 || ref $_ret_a[0]) {
            $_buf = $self->{freeze}( [ @_ret_a ] );
            $_len = length $_buf; local $\ = undef if (defined $\);
            print {$_DAU_R_SOCK} $_len.'1'.$LF . (++$_chunk_id).$LF, $_buf;

            return;
         }
         elsif (defined $_ret_a[0]) {
            $_len = length $_ret_a[0]; local $\ = undef if (defined $\);
            print {$_DAU_R_SOCK} $_len.'0'.$LF . (++$_chunk_id).$LF, $_ret_a[0];

            return;
         }

         local $\ = undef if (defined $\);
         print {$_DAU_R_SOCK} '-1'.$LF;
         $_aborted = 1;

         return;
      },

      ## ----------------------------------------------------------------------

      OUTPUT_A_CBK.$LF => sub {                   # Callback w/ multiple args
         my ($_buf, $_data_ref);

         chomp($_wa  = <$_DAU_R_SOCK>),
         chomp($_cb  = <$_DAU_R_SOCK>),
         chomp($_len = <$_DAU_R_SOCK>);

         read $_DAU_R_SOCK, $_buf, $_len;
         $_data_ref = $self->{thaw}($_buf); undef $_buf;

         local $\ = $_O_SEP if ($_O_FLG); local $/ = $_I_SEP if ($_I_FLG);
         no strict 'refs';

         if ($_wa == WANTS_UNDEF) {
            $_cb->(@{ $_data_ref });
         }
         elsif ($_wa == WANTS_ARRAY) {
            my @_ret_a = $_cb->(@{ $_data_ref });
            $_cb_ret_a->(\@_ret_a);
         }
         else {
            my  $_ret_s = $_cb->(@{ $_data_ref });
            ref $_ret_s ? $_cb_ret_r->($_ret_s) : $_cb_ret_s->($_ret_s);
         }

         return;
      },

      OUTPUT_S_CBK.$LF => sub {                   # Callback w/ 1 scalar arg
         my $_buf;

         chomp($_wa  = <$_DAU_R_SOCK>),
         chomp($_cb  = <$_DAU_R_SOCK>),
         chomp($_len = <$_DAU_R_SOCK>);

         read $_DAU_R_SOCK, $_buf, $_len;

         local $\ = $_O_SEP if ($_O_FLG); local $/ = $_I_SEP if ($_I_FLG);
         no strict 'refs';

         if ($_wa == WANTS_UNDEF) {
            $_cb->($_buf);
         }
         elsif ($_wa == WANTS_ARRAY) {
            my @_ret_a = $_cb->($_buf);
            $_cb_ret_a->(\@_ret_a);
         }
         else {
            my  $_ret_s = $_cb->($_buf);
            ref $_ret_s ? $_cb_ret_r->($_ret_s) : $_cb_ret_s->($_ret_s);
         }

         return;
      },

      OUTPUT_N_CBK.$LF => sub {                   # Callback w/ no args

         chomp($_wa = <$_DAU_R_SOCK>),
         chomp($_cb = <$_DAU_R_SOCK>);

         local $\ = $_O_SEP if ($_O_FLG); local $/ = $_I_SEP if ($_I_FLG);
         no strict 'refs';

         if ($_wa == WANTS_UNDEF) {
            $_cb->();
         }
         elsif ($_wa == WANTS_ARRAY) {
            my @_ret_a = $_cb->();
            $_cb_ret_a->(\@_ret_a);
         }
         else {
            my  $_ret_s = $_cb->();
            ref $_ret_s ? $_cb_ret_r->($_ret_s) : $_cb_ret_s->($_ret_s);
         }

         return;
      },

      ## ----------------------------------------------------------------------

      OUTPUT_A_GTR.$LF => sub {                   # Gather array/ref
         my $_buf;

         chomp($_task_id = <$_DAU_R_SOCK>),
         chomp($_len     = <$_DAU_R_SOCK>);

         read $_DAU_R_SOCK, $_buf, $_len;

         if ($_is_c_ref[$_task_id]) {
            local $_ = $self->{thaw}($_buf);
            $_gather[$_task_id]->(@{ $_ });
         }
         elsif ($_is_h_ref[$_task_id]) {
            local $_ = $self->{thaw}($_buf);
            while (1) {
               my $_key = shift @{ $_ }; my $_val = shift @{ $_ };
               $_gather[$_task_id]->{$_key} = $_val;
               last unless (@{ $_ });
            }
         }
         elsif ($_is_q_ref[$_task_id]) {
            $_gather[$_task_id]->enqueue(@{ $self->{thaw}($_buf) });
         }
         else {
            push @{ $_gather[$_task_id] }, @{ $self->{thaw}($_buf) };
         }

         return;
      },

      OUTPUT_S_GTR.$LF => sub {                   # Gather scalar
         local $_;

         chomp($_task_id = <$_DAU_R_SOCK>),
         chomp($_len     = <$_DAU_R_SOCK>);

         read $_DAU_R_SOCK, $_, $_len if ($_len >= 0);

         if ($_is_c_ref[$_task_id]) {
            $_gather[$_task_id]->($_);
         }
         elsif ($_is_h_ref[$_task_id]) {
            $_gather[$_task_id]->{$_} = undef;
         }
         elsif ($_is_q_ref[$_task_id]) {
            $_gather[$_task_id]->enqueue($_);
         }
         else {
            push @{ $_gather[$_task_id] }, $_;
         }

         return;
      },

      ## ----------------------------------------------------------------------

      OUTPUT_O_SND.$LF => sub {                   # Send >> STDOUT
         my $_buf;

         chomp($_len = <$_DAU_R_SOCK>);
         read $_DAU_R_SOCK, $_buf, $_len;

         if (defined $_user_output) {
            $_user_output->($_buf);
         } else {
            print {$_MCE_STDOUT} $_buf;
         }

         return;
      },

      OUTPUT_E_SND.$LF => sub {                   # Send >> STDERR
         my $_buf;

         chomp($_len = <$_DAU_R_SOCK>);
         read $_DAU_R_SOCK, $_buf, $_len;

         if (defined $_user_error) {
            $_user_error->($_buf);
         } else {
            print {$_MCE_STDERR} $_buf;
         }

         return;
      },

      OUTPUT_F_SND.$LF => sub {                   # Send >> File
         my ($_buf, $_OUT_FILE);

         chomp($_file = <$_DAU_R_SOCK>),
         chomp($_len  = <$_DAU_R_SOCK>);

         read $_DAU_R_SOCK, $_buf, $_len;

         unless (exists $_sendto_fhs{$_file}) {
            open $_sendto_fhs{$_file}, '>>', $_file
               or _croak "Cannot open file for writing ($_file): $!";

            binmode $_sendto_fhs{$_file};

            ## Select new FH, turn on autoflush, restore the old FH.
            if ($_flush_file) {
               local $|; select((select($_sendto_fhs{$_file}), $| = 1)[0]);
            }
         }

         $_OUT_FILE = $_sendto_fhs{$_file};
         print {$_OUT_FILE} $_buf;

         return;
      },

      OUTPUT_D_SND.$LF => sub {                   # Send >> File descriptor
         my ($_buf, $_OUT_FILE);

         chomp($_fd  = <$_DAU_R_SOCK>),
         chomp($_len = <$_DAU_R_SOCK>);

         read $_DAU_R_SOCK, $_buf, $_len;

         unless (exists $_sendto_fhs{$_fd}) {
            require IO::Handle unless (defined $IO::Handle::VERSION);

            $_sendto_fhs{$_fd} = IO::Handle->new();
            $_sendto_fhs{$_fd}->fdopen($_fd, 'w')
               or _croak "Cannot open file descriptor ($_fd): $!";

            binmode $_sendto_fhs{$_fd};

            ## Select new FH, turn on autoflush, restore the old FH.
            if ($_flush_file) {
               local $|; select((select($_sendto_fhs{$_fd}), $| = 1)[0]);
            }
         }

         $_OUT_FILE = $_sendto_fhs{$_fd};
         print {$_OUT_FILE} $_buf;

         return;
      },

      ## ----------------------------------------------------------------------

      OUTPUT_B_SYN.$LF => sub {                   # Barrier sync - begin

         if (!defined $_sync_cnt || $_sync_cnt == 0) {
            $_syn_flag = 1, $_sync_cnt = 0;
         }

         my $_total_running = ($_has_user_tasks)
            ? $self->{_task}->[0]->{_total_running}
            : $self->{_total_running};

         if (++$_sync_cnt == $_total_running) {
            for (1 .. $_total_running) { syswrite $_BSB_W_SOCK, $LF }
            undef $_syn_flag;
         }

         return;
      },

      OUTPUT_E_SYN.$LF => sub {                   # Barrier sync - end

         if (--$_sync_cnt == 0) {
            my $_total_running = ($_has_user_tasks)
               ? $self->{_task}->[0]->{_total_running}
               : $self->{_total_running};

            for (1 .. $_total_running) { syswrite $_BSE_W_SOCK, $LF }
         }

         return;
      },

      OUTPUT_S_IPC.$LF => sub {                   # Change to win32 IPC

         syswrite $_DAT_R_SOCK, $LF;

         $_win32_ipc = 1, goto _LOOP unless $_win32_ipc;

         return;
      },

      OUTPUT_P_NFY.$LF => sub {                   # Progress notification

         chomp($_len = <$_DAU_R_SOCK>);

         $self->{progress}->( $_size_completed += $_len );

         return;
      },

      OUTPUT_I_DLY.$LF => sub {                   # Interval delay
         my $_tasks = $_has_user_tasks ? $self->{user_tasks} : undef;

         chomp($_task_id = <$_DAU_R_SOCK>);

         my $_interval = ($_tasks && exists $_tasks->[$_task_id]{interval})
            ? $_tasks->[$_task_id]{interval}
            : $self->{interval};

         if ( $_interval ) {
            my $_max_workers = ($_tasks)
               ? $_tasks->[$_task_id]{max_workers}
               : $self->{max_workers};

            $_delay_wid[$_task_id] = 1
               if (++$_delay_wid[$_task_id] > $_max_workers);

            my $_nodes  = $_interval->{max_nodes};
            my $_id     = $_interval->{node_id};
            my $_delay  = $_interval->{delay} * $_nodes;

            my $_app_tb = $_delay * $_max_workers;
            my $_app_st = $_interval->{_time} + ($_delay / $_nodes * $_id);
            my $_wrk_st = ($_delay_wid[$_task_id] - 1) * $_delay + $_app_st;

            $_delay = $_wrk_st - time;

            if ($_delay < 0.0 && $_app_tb) {
               my $_count = int($_delay * -1 / $_app_tb + 0.5) + 1;
               $_delay += $_app_tb * $_count;
               $_interval->{_time} = time if ($_count > 2e9);
            }

            ($_delay > 0.0)
               ? print {$_DAU_R_SOCK} $_delay.$LF
               : print {$_DAU_R_SOCK} '0'.$LF;
         }
         else {
            print {$_DAU_R_SOCK} '0'.$LF;
         }

         return;
      },

   );

   ## -------------------------------------------------------------------------

   local ($!, $?, $_);

   $_aborted = $_chunk_id = $_eof_flag = $_size_completed = 0;
   $_has_user_tasks = (defined $self->{user_tasks}) ? 1 : 0;
   $_cs_one_flag = ($self->{chunk_size} == 1) ? 1 : 0;

   $_max_retries  = $self->{max_retries};
   $_on_post_exit = $self->{on_post_exit};
   $_on_post_run  = $self->{on_post_run};
   $_chunk_size   = $self->{chunk_size};
   $_flush_file   = $self->{flush_file};
   $_user_output  = $self->{user_output};
   $_user_error   = $self->{user_error};
   $_single_dim   = $self->{_single_dim};
   $_sess_dir     = $self->{_sess_dir};

   if ($_max_retries && !$_on_post_exit) {
      $_on_post_exit = sub {
         my ($self, $_e, $_retry_cnt) = @_;
         my ($_cnt, $_msg) = ($_retry_cnt + 1, "Error: Chunk $_e->{id} failed");

         ($_retry_cnt < $_max_retries)
            ? print {*STDERR} "$_msg, retrying chunk attempt #${_cnt}\n"
            : print {*STDERR} "$_msg\n";

         $self->restart_worker;
      };
   }

   if ($_has_user_tasks && $self->{user_tasks}->[0]->{chunk_size}) {
      $_chunk_size = $self->{user_tasks}->[0]->{chunk_size};
   }

   if ($_has_user_tasks) {
      for my $_i (0 .. @{ $self->{user_tasks} } - 1) {
         $_gather[$_i] = (defined $self->{user_tasks}->[$_i]->{gather})
            ? $self->{user_tasks}->[$_i]->{gather} : $self->{gather};

         $_is_c_ref[$_i] = ( ref $_gather[$_i] eq 'CODE' ) ? 1 : 0;
         $_is_h_ref[$_i] = ( ref $_gather[$_i] eq 'HASH' ) ? 1 : 0;

         $_is_q_ref[$_i] = (
            ref $_gather[$_i] eq 'MCE::Queue' ||
            ref $_gather[$_i] eq 'Thread::Queue' ) ? 1 : 0;
      }
   }

   if (defined $self->{gather}) {
      $_gather[0] = $self->{gather};

      $_is_c_ref[0] = ( ref $_gather[0] eq 'CODE' ) ? 1 : 0;
      $_is_h_ref[0] = ( ref $_gather[0] eq 'HASH' ) ? 1 : 0;

      $_is_q_ref[0] = (
         ref $_gather[0] eq 'MCE::Queue' ||
         ref $_gather[0] eq 'Thread::Queue' ) ? 1 : 0;
   }

   if (defined $_input_data && ref $_input_data eq 'ARRAY') {
      $_input_size = @{ $_input_data };
      $_offset_pos = 0;
   } else {
      $_input_size = $_offset_pos = 0;
   }

   ## Set STDOUT/STDERR to user parameters.

   if (defined $self->{stdout_file}) {
      open $_MCE_STDOUT, '>>', $self->{stdout_file}
         or die $self->{stdout_file} . ": $!\n";
      binmode $_MCE_STDOUT;
   }
   else {
      $_MCE_STDOUT = \*STDOUT;
      binmode $_MCE_STDOUT;
   }

   if (defined $self->{stderr_file}) {
      open $_MCE_STDERR, '>>', $self->{stderr_file}
         or die $self->{stderr_file} . ": $!\n";
      binmode $_MCE_STDERR;
   }
   else {
      $_MCE_STDERR = \*STDERR;
      binmode $_MCE_STDERR;
   }

   ## Make MCE_STDOUT the default handle.
   ## Flush STDERR/STDOUT handles if requested.

   my $_old_hndl = select $_MCE_STDOUT;

   if ($self->{flush_stdout}) {
      local $|; select((select($_MCE_STDOUT), $| = 1)[0]);
   }
   if ($self->{flush_stderr}) {
      local $|; select((select($_MCE_STDERR), $| = 1)[0]);
   }

   ## -------------------------------------------------------------------------

   ## Output event loop.

   my $_func; my $_channels = $self->{_dat_r_sock};

   $_win32_ipc  = ( $ENV{'PERL_MCE_IPC'} eq 'win32' || $INC{'MCE/Hobo.pm'} );

   $_BSB_W_SOCK = $self->{_bsb_w_sock};
   $_BSE_W_SOCK = $self->{_bse_w_sock};
   $_DAT_R_SOCK = $self->{_dat_r_sock}->[0];

   $_RS     = $self->{RS} || $/;
   $_O_SEP  = $\; local $\ = undef;
   $_I_SEP  = $/; local $/ = $LF;

   $_RS_FLG = (!$_RS || $_RS ne $LF) ? 1 : 0;
   $_O_FLG  = (defined $_O_SEP) ? 1 : 0;
   $_I_FLG  = (!$_I_SEP || $_I_SEP ne $LF) ? 1 : 0;

   ## Call module's loop_begin routine for modules plugged into MCE.

   for my $_p (@{ $_plugin_loop_begin }) {
      $_p->($self, \$_DAU_R_SOCK);
   }

   ## Wait on requests *with* timeout capability. Exit loop when all workers
   ## have completed processing or exited prematurely.

   _LOOP:

   if ($self->{loop_timeout} && @{ $self->{_tids} } == 0 && $^O ne 'MSWin32') {
      my ($_list, $_timeout) = ($self->{_pids}, $self->{loop_timeout});
      my ($_DAT_W_SOCK, $_pid) = ($self->{_dat_w_sock}->[0]);

      $_timeout = 5 if $_timeout < 5;

      local $SIG{ALRM} = sub {
         alarm 0;
         for my $i (0 .. @{ $_list }) {
            if ($_pid = $_list->[$i]) {
               if (waitpid($_pid, _WNOHANG)) {
                  $self->{_total_exited}  += 1;
                  $self->{_total_running} -= 1;
                  $self->{_total_workers} -= 1;
                  $_list->[$i] = undef;
               }
            }
         }
         print {$_DAT_W_SOCK} 'NOOP'.$LF . '0'.$LF;
      };

      while ( $self->{_total_running} ) {
         alarm $_timeout;
         $_func = <$_DAT_R_SOCK>;
         $_DAU_R_SOCK = $_channels->[ <$_DAT_R_SOCK> ];

         alarm 0;
         if (exists $_core_output_function{$_func}) {
            $_core_output_function{$_func}();
         } elsif (exists $_plugin_function->{$_func}) {
            $_plugin_function->{$_func}();
         }
      }
   }

   ## Wait on requests *without* timeout capability.

   elsif ($^O eq 'MSWin32' && $_win32_ipc) {
      # The normal loop hangs on Windows when processes/threads start/exit.
      # Using ioctl() properly, http://www.perlmonks.org/?node_id=780083

      my $_val_bytes = "\x00\x00\x00\x00";
      my $_ptr_bytes = unpack( 'I', pack('P', $_val_bytes) );
      my ($_count, $_done, $_nbytes, $_start) = (1, 0);

      while (!$_done) {
         $_start = time;

         # MSWin32 FIONREAD
         IOCTL: ioctl($_DAT_R_SOCK, 0x4004667f, $_ptr_bytes);

         unless ($_nbytes = unpack('I', $_val_bytes)) {
            if ($_count) {
                # delay after a while to not consume a CPU core
                $_count = 0 if ++$_count % 50 == 0 && time - $_start > 0.030;
            } else {
                sleep 0.030;
            }
            goto IOCTL;
         }

         $_count = 1;

         do {
            sysread($_DAT_R_SOCK, $_func, 8);
            $_done = 1, last() unless length($_func) == 8;
            $_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ];

            if (exists $_core_output_function{$_func}) {
               $_core_output_function{$_func}();
            } elsif (exists $_plugin_function->{$_func}) {
               $_plugin_function->{$_func}();
            }

         } while (($_nbytes -= 8) >= 8);

         last unless $self->{_total_running};
      }
   }

   elsif ($^O eq 'MSWin32') {
      while ($self->{_total_running}) {
         sysread($_DAT_R_SOCK, $_func, 8);
         last() unless length($_func) == 8;
         $_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ];

         if (exists $_core_output_function{$_func}) {
            $_core_output_function{$_func}();
         } elsif (exists $_plugin_function->{$_func}) {
            $_plugin_function->{$_func}();
         }
      }
   }

   else {
      while ($self->{_total_running}) {
         $_func = <$_DAT_R_SOCK>;
         last() unless length($_func) == 6;
         $_DAU_R_SOCK = $_channels->[ <$_DAT_R_SOCK> ];

         if (exists $_core_output_function{$_func}) {
            $_core_output_function{$_func}();
         } elsif (exists $_plugin_function->{$_func}) {
            $_plugin_function->{$_func}();
         }
      }
   }

   ## Call module's loop_end routine for modules plugged into MCE.

   for my $_p (@{ $_plugin_loop_end }) {
      $_p->($self);
   }

   ## Call on_post_run callback.

   $_on_post_run->($self, $self->{_status}) if (defined $_on_post_run);

   ## Close opened sendto file handles.

   for my $_p (keys %_sendto_fhs) {
      close  $_sendto_fhs{$_p};
      undef  $_sendto_fhs{$_p};
      delete $_sendto_fhs{$_p};
   }

   ## Restore the default handle. Close MCE STDOUT/STDERR handles.

   select $_old_hndl;

   eval q{
      close $_MCE_STDOUT if (fileno $_MCE_STDOUT > 2);
      close $_MCE_STDERR if (fileno $_MCE_STDERR > 2);
   };

   return;
}

1;