The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
###############################################################################
## ----------------------------------------------------------------------------
## MCE::Core::Worker - Core methods for the worker process.
##
## This package provides main, loop, and relevant methods used internally by
## the worker process.
##
## There is no public API.
##
###############################################################################

package MCE::Core::Worker;

use strict;
use warnings;

our $VERSION = '1.600';

## Items below are folded into MCE.

package MCE;

use Time::HiRes qw( sleep time );
use bytes;

## Warnings are disabled to minimize bits of noise when user or OS signals
## the script to exit. e.g. MCE_script.pl < infile | head

no warnings 'threads';
no warnings 'uninitialized';

###############################################################################
## ----------------------------------------------------------------------------
## Internal do, gather and send related functions for serializing data to
## destination. User functions for handling gather, queue or void.
##
###############################################################################

{
   my (
      $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_tag, $_value, $_want_id,
      $_chn, $_dest, $_len, $_lock_chn, $_task_id, $_user_func
   );

   ## Create array structure containing various send functions.
   my @_dest_function = ();

   $_dest_function[SENDTO_FILEV2] = sub {         ## Content >> File

      return unless (defined $_value);
      local $\ = undef if (defined $\);

      if (length ${ $_[0] }) {
         flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
         print {$_DAT_W_SOCK} OUTPUT_F_SND . $LF . $_chn . $LF;
         print {$_DAU_W_SOCK} $_value . $LF . length(${ $_[0] }) . $LF;
         print {$_DAU_W_SOCK} ${ $_[0] };
         flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);
      }

      return;
   };

   $_dest_function[SENDTO_FD] = sub {             ## Content >> File descriptor

      return unless (defined $_value);
      local $\ = undef if (defined $\);

      if (length ${ $_[0] }) {
         flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
         print {$_DAT_W_SOCK} OUTPUT_D_SND . $LF . $_chn . $LF;
         print {$_DAU_W_SOCK} $_value . $LF . length(${ $_[0] }) . $LF;
         print {$_DAU_W_SOCK} ${ $_[0] };
         flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);
      }

      return;
   };

   $_dest_function[SENDTO_STDOUT] = sub {         ## Content >> STDOUT

      local $\ = undef if (defined $\);

      if (length ${ $_[0] }) {
         flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
         print {$_DAT_W_SOCK} OUTPUT_O_SND . $LF . $_chn . $LF;
         print {$_DAU_W_SOCK} length(${ $_[0] }) . $LF;
         print {$_DAU_W_SOCK} ${ $_[0] };
         flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);
      }

      return;
   };

   $_dest_function[SENDTO_STDERR] = sub {         ## Content >> STDERR

      local $\ = undef if (defined $\);

      if (length ${ $_[0] }) {
         flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
         print {$_DAT_W_SOCK} OUTPUT_E_SND . $LF . $_chn . $LF;
         print {$_DAU_W_SOCK} length(${ $_[0] }) . $LF;
         print {$_DAU_W_SOCK} ${ $_[0] };
         flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);
      }

      return;
   };

   ## -------------------------------------------------------------------------

   sub _do_callback {

      my $_buf; my $self = shift;

      $_value = shift;

      unless (defined wantarray) {
         $_want_id = WANTS_UNDEF;
      } elsif (wantarray) {
         $_want_id = WANTS_ARRAY;
      } else {
         $_want_id = WANTS_SCALAR;
      }

      ## Crossover: Send arguments

      if (scalar @_ > 0) {                        ## Multiple Args >> Callback
         if (scalar @_ > 1 || ref $_[0]) {
            $_tag = OUTPUT_A_CBK;
            $_buf = $self->{freeze}(\@_);
            $_len = length $_buf; local $\ = undef if (defined $\);

            flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
            print {$_DAT_W_SOCK} $_tag . $LF . $_chn . $LF;
            print {$_DAU_W_SOCK} $_want_id . $LF . $_value . $LF . $_len . $LF;
            print {$_DAU_W_SOCK} $_buf;

         }
         else {                                   ## Scalar >> Callback
            $_tag = OUTPUT_S_CBK;
            $_len = length $_[0]; local $\ = undef if (defined $\);

            flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
            print {$_DAT_W_SOCK} $_tag . $LF . $_chn . $LF;
            print {$_DAU_W_SOCK} $_want_id . $LF . $_value . $LF . $_len . $LF;
            print {$_DAU_W_SOCK} $_[0];
         }

         @_ = ();
      }
      else {                                      ## No Args >> Callback
         $_tag = OUTPUT_N_CBK;
         local $\ = undef if (defined $\);

         flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
         print {$_DAT_W_SOCK} $_tag . $LF . $_chn . $LF;
         print {$_DAU_W_SOCK} $_want_id . $LF . $_value . $LF;
      }

      ## Crossover: Receive return value

      if ($_want_id == WANTS_UNDEF) {
         flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);
         return;
      }
      elsif ($_want_id == WANTS_ARRAY) {
         local $/ = $LF if (!$/ || $/ ne $LF);
         chomp($_len = <$_DAU_W_SOCK>);

         read($_DAU_W_SOCK, $_buf, $_len || 0);
         flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);

         return @{ $self->{thaw}($_buf) };
      }
      else {
         local $/ = $LF if (!$/ || $/ ne $LF);
         chomp($_want_id = <$_DAU_W_SOCK>);
         chomp($_len     = <$_DAU_W_SOCK>);

         if ($_len >= 0) {
            read($_DAU_W_SOCK, $_buf, $_len || 0);
            flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);

            return $_buf if ($_want_id == WANTS_SCALAR);
            return $self->{thaw}($_buf);
         }
         else {
            flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);
            return;
         }
      }
   }

   ## -------------------------------------------------------------------------

   sub _do_gather {

      my $_buf; my $self = shift;

      return unless (scalar @_);

      if (scalar @_ > 1) {
         $_tag = OUTPUT_A_GTR;
         $_buf = $self->{freeze}(\@_);
         $_len = length $_buf;
      }
      elsif (ref $_[0]) {
         $_tag = OUTPUT_R_GTR;
         $_buf = $self->{freeze}($_[0]);
         $_len = length $_buf;
      }
      else {
         $_tag = OUTPUT_S_GTR;
         if (defined $_[0]) {
            $_len = length $_[0]; local $\ = undef if (defined $\);

            flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
            print {$_DAT_W_SOCK} $_tag . $LF . $_chn . $LF;
            print {$_DAU_W_SOCK} $_task_id . $LF . $_len . $LF;
            print {$_DAU_W_SOCK} $_[0];
            flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);

            return;
         }
         else {
            $_buf = '';
            $_len = -1;
         }
      }

      local $\ = undef if (defined $\);

      flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
      print {$_DAT_W_SOCK} $_tag . $LF . $_chn . $LF;
      print {$_DAU_W_SOCK} $_task_id . $LF . $_len . $LF;
      print {$_DAU_W_SOCK} $_buf if (length $_buf);
      flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);

      return;
   }

   ## -------------------------------------------------------------------------

   sub _do_send {

      my $_data_ref; my $self = shift;

      $_dest = shift; $_value = shift;

      if (scalar @_ > 1) {
         $_data_ref = \join('', @_);
      }
      elsif (my $_ref = ref $_[0]) {
         if ($_ref eq 'SCALAR') {
            $_data_ref = $_[0];
         }
         elsif ($_ref eq 'ARRAY') {
            $_data_ref = \join('', @{ $_[0] });
         }
         elsif ($_ref eq 'HASH') {
            $_data_ref = \join('', %{ $_[0] });
         }
         else {
            $_data_ref = \join('', @_);
         }
      }
      else {
         $_data_ref = \$_[0];
      }

      $_dest_function[$_dest]($_data_ref);

      return;
   }

   sub _do_send_glob {

      my ($self, $_glob, $_fd, $_data_ref) = @_;

      if ($self->{_wid} > 0) {
         if ($_fd == 1) {
            _do_send($self, SENDTO_STDOUT, undef, $_data_ref);
         }
         elsif ($_fd == 2) {
            _do_send($self, SENDTO_STDERR, undef, $_data_ref);
         }
         else {
            _do_send($self, SENDTO_FD, $_fd, $_data_ref);
         }
      }
      else {
         my $_fh = qualify_to_ref($_glob, caller);
         local $\ = undef if (defined $\);
         print {$_fh} ${ $_data_ref };
      }

      return;
   }

   sub _do_send_init {

      my ($self) = @_;

      die 'Private method called' unless (caller)[0]->isa( ref $self );

      $_chn        = $self->{_chn};
      $_DAT_LOCK   = $self->{_dat_lock};
      $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
      $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
      $_lock_chn   = $self->{_lock_chn};
      $_task_id    = $self->{_task_id};

      return;
   }

   ## -------------------------------------------------------------------------

   sub _do_user_func {

      my ($self, $_chunk, $_chunk_id) = @_;

      $self->{_chunk_id} = $_chunk_id;
      $_user_func->($self, $_chunk, $_chunk_id);

      return;
   }

   sub _do_user_func_init {

      my ($self) = @_;

      $_user_func = $self->{user_func};

      return;
   }
}

###############################################################################
## ----------------------------------------------------------------------------
## Worker process -- Do.
##
###############################################################################

sub _worker_do {

   my ($self, $_params_ref) = @_;

   @_ = ();

   die 'Private method called' unless (caller)[0]->isa( ref $self );

   ## Set options.
   $self->{_abort_msg}  = $_params_ref->{_abort_msg};
   $self->{_run_mode}   = $_params_ref->{_run_mode};
   $self->{_single_dim} = $_params_ref->{_single_dim};
   $self->{use_slurpio} = $_params_ref->{_use_slurpio};
   $self->{parallel_io} = $_params_ref->{_parallel_io};
   $self->{RS}          = $_params_ref->{_RS};

   _do_user_func_init($self);

   ## Init local vars.
   my $_chn        = $self->{_chn};
   my $_DAT_LOCK   = $self->{_dat_lock};
   my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
   my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
   my $_lock_chn   = $self->{_lock_chn};
   my $_run_mode   = $self->{_run_mode};
   my $_task_id    = $self->{_task_id};
   my $_task_name  = $self->{task_name};

   ## Do not override params if defined in user_tasks during instantiation.
   for (qw(bounds_only chunk_size interval sequence user_args)) {
      if (defined $_params_ref->{"_$_"}) {
         $self->{$_} = $_params_ref->{"_$_"}
            unless (defined $self->{_task}->{$_});
      }
   }

   ## Assign user function.
   $self->{_wuf} = \&_do_user_func;

   ## Set time_block & start_time values for interval.
   if (defined $self->{interval}) {
      my $_i     = $self->{interval};
      my $_delay = $_i->{delay} * $_i->{max_nodes};

      $self->{_i_app_tb} = $_delay * $self->{max_workers};

      $self->{_i_app_st} =
         $_i->{_time} + ($_delay / $_i->{max_nodes} * $_i->{node_id});

      $self->{_i_wrk_st} =
         ($self->{_task_wid} - 1) * $_delay + $self->{_i_app_st};
   }

   ## Call user_begin if defined.
   if (defined $self->{user_begin}) {
      $self->{user_begin}($self, $_task_id, $_task_name);
   }

   ## Call worker function.
   if ($_run_mode eq 'sequence') {
      require MCE::Core::Input::Sequence
         unless (defined $MCE::Core::Input::Sequence::VERSION);
      _worker_sequence_queue($self);
   }
   elsif (defined $self->{_task}->{sequence}) {
      require MCE::Core::Input::Generator
         unless (defined $MCE::Core::Input::Generator::VERSION);
      _worker_sequence_generator($self);
   }
   elsif ($_run_mode eq 'array') {
      require MCE::Core::Input::Request
         unless (defined $MCE::Core::Input::Request::VERSION);
      _worker_request_chunk($self, REQUEST_ARRAY);
   }
   elsif ($_run_mode eq 'glob') {
      require MCE::Core::Input::Request
         unless (defined $MCE::Core::Input::Request::VERSION);
      _worker_request_chunk($self, REQUEST_GLOB);
   }
   elsif ($_run_mode eq 'iterator') {
      require MCE::Core::Input::Iterator
         unless (defined $MCE::Core::Input::Iterator::VERSION);
      _worker_user_iterator($self);
   }
   elsif ($_run_mode eq 'file') {
      require MCE::Core::Input::Handle
         unless (defined $MCE::Core::Input::Handle::VERSION);
      _worker_read_handle($self, READ_FILE, $_params_ref->{_input_file});
   }
   elsif ($_run_mode eq 'memory') {
      require MCE::Core::Input::Handle
         unless (defined $MCE::Core::Input::Handle::VERSION);
      _worker_read_handle($self, READ_MEMORY, $self->{input_data});
   }
   elsif (defined $self->{user_func}) {
      $self->{_chunk_id} = $self->{_task_wid};
      $self->{user_func}->($self);
   }

   undef $self->{_next_jmp} if (defined $self->{_next_jmp});
   undef $self->{_last_jmp} if (defined $self->{_last_jmp});
   undef $self->{user_data} if (defined $self->{user_data});

   ## Call user_end if defined.
   if (defined $self->{user_end}) {
      $self->{user_end}($self, $_task_id, $_task_name);
   }

   ## Notify the main process a worker has completed.
   local $\ = undef if (defined $\);

   flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);

   if (exists $self->{_rla_return}) {
      print {$_DAT_W_SOCK} OUTPUT_W_RLA . $LF . $_chn . $LF;
      print {$_DAU_W_SOCK} (delete $self->{_rla_return}) . $LF;
   }

   print {$_DAT_W_SOCK} OUTPUT_W_DNE . $LF . $_chn . $LF;
   print {$_DAU_W_SOCK} $_task_id . $LF;

   flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);

   return;
}

###############################################################################
## ----------------------------------------------------------------------------
## Worker process -- Loop.
##
###############################################################################

sub _worker_loop {

   my ($self) = @_;

   @_ = ();

   die 'Private method called' unless (caller)[0]->isa( ref $self );

   my ($_response, $_len, $_buf, $_params_ref);

   my $_COM_LOCK   = $self->{_com_lock};
   my $_COM_W_SOCK = $self->{_com_w_sock};
   my $_job_delay  = $self->{job_delay};
   my $_wid        = $self->{_wid};

   while (1) {

      {
         local $\ = undef; local $/ = $LF;
         flock $_COM_LOCK, LOCK_EX;

         ## Wait until next job request.
         $_response = <$_COM_W_SOCK>;
         print {$_COM_W_SOCK} $_wid . $LF;

         last unless (defined $_response);
         chomp $_response;

         ## End loop if an invalid reply.
         last if ($_response !~ /\A(?:\d+|_data|_exit)\z/);

         if ($_response eq '_data') {
            ## Acquire and process user data.
            chomp($_len = <$_COM_W_SOCK>);
            read $_COM_W_SOCK, $_buf, $_len;

            print {$_COM_W_SOCK} $_wid . $LF;
            flock $_COM_LOCK, LOCK_UN;

            $self->{user_data} = $self->{thaw}($_buf);
            undef $_buf;

            if (defined $_job_delay && $_job_delay > 0.0) {
               sleep $_job_delay * $_wid;
            }

            _worker_do($self, { });
         }
         else {
            ## Return to caller if instructed to exit.
            if ($_response eq '_exit') {
               flock $_COM_LOCK, LOCK_UN;
               return 0;
            }

            ## Retrieve params data.
            chomp($_len = <$_COM_W_SOCK>);
            read $_COM_W_SOCK, $_buf, $_len;

            print {$_COM_W_SOCK} $_wid . $LF;
            flock $_COM_LOCK, LOCK_UN;

            $_params_ref = $self->{thaw}($_buf);
            undef $_buf;
         }
      }

      ## Start over if the last response was for processing user data.
      next if ($_response eq '_data');

      ## Wait until MCE completes params submission to all workers.
      my $_c; sysread $self->{_bse_r_sock}, $_c, 1;

      if (defined $_job_delay && $_job_delay > 0.0) {
         sleep $_job_delay * $_wid;
      }

      _worker_do($self, $_params_ref);
      undef $_params_ref;
   }

   ## Notify the main process a worker has ended. The following is executed
   ## when an invalid reply was received above (not likely to occur).

   flock $_COM_LOCK, LOCK_UN;

   die "worker $self->{_wid} has ended prematurely";
}

###############################################################################
## ----------------------------------------------------------------------------
## Worker process -- Main.
##
###############################################################################

sub _worker_main {

   my ( $self, $_wid, $_task, $_task_id, $_task_wid, $_params,
        $_plugin_worker_init, $_has_threads, $_is_winenv ) = @_;

   @_ = ();

   ## Commented out -- fails with the 'forks' module under FreeBSD.
   ## die 'Private method called' unless (caller)[0]->isa( ref $self );

   if (exists $self->{input_data}) {
      my $_ref = ref $self->{input_data};
      delete $self->{input_data} if ($_ref && $_ref ne 'SCALAR');
   }

   ## Define status ID.
   my $_use_threads = (defined $_task->{use_threads})
      ? $_task->{use_threads} : $self->{use_threads};

   if ($_has_threads && $_use_threads) {
      $self->{_exit_pid} = 'TID_' . threads->tid();
   } else {
      $self->{_exit_pid} = 'PID_' . $$;
   }

   ## Define DIE handler.
   local $SIG{__DIE__} = sub {
      if (!defined $^S || $^S) {                          ## Perl state
         my $_lmsg = Carp::longmess();
         if ($_lmsg =~ /^[^\n]+\n\teval /) {              ## In eval?
            CORE::die(@_);
         }
      }
      my $_die_msg = (defined $_[0]) ? $_[0] : '';
      local $SIG{__DIE__} = sub { }; local $\ = undef;
      print {*STDERR} $_die_msg;
      $self->exit(255, $_die_msg);
   };

   ## Use options from user_tasks if defined.
   $self->{max_workers} = $_task->{max_workers} if ($_task->{max_workers});
   $self->{chunk_size}  = $_task->{chunk_size}  if ($_task->{chunk_size});
   $self->{gather}      = $_task->{gather}      if ($_task->{gather});
   $self->{interval}    = $_task->{interval}    if ($_task->{interval});
   $self->{sequence}    = $_task->{sequence}    if ($_task->{sequence});
   $self->{task_name}   = $_task->{task_name}   if ($_task->{task_name});
   $self->{user_args}   = $_task->{user_args}   if ($_task->{user_args});
   $self->{user_begin}  = $_task->{user_begin}  if ($_task->{user_begin});
   $self->{user_func}   = $_task->{user_func}   if ($_task->{user_func});
   $self->{user_end}    = $_task->{user_end}    if ($_task->{user_end});

   ## Init runtime vars. Obtain handle to lock files.
   my $_mce_sid  = $self->{_mce_sid};
   my $_sess_dir = $self->{_sess_dir};

   if (defined $_params && exists $_params->{_chn}) {
      $self->{_chn} = delete $_params->{_chn};
   } else {
      $self->{_chn} = $_wid % $self->{_data_channels} + 1;
   }

   $self->{_task_id}    = (defined $_task_id ) ? $_task_id  : 0;
   $self->{_task_wid}   = (defined $_task_wid) ? $_task_wid : $_wid;
   $self->{_task}       = $_task;
   $self->{_wid}        = $_wid;

   ## Unset the need for channel locking if only worker riding the channel.
   if ($self->{_init_total_workers} < DATA_CHANNELS * 2) {
      if ($_wid > $self->{_init_total_workers} % DATA_CHANNELS) {
         $self->{_lock_chn} = 0 if ($_wid <= DATA_CHANNELS);
      }
   }

   my ($_COM_LOCK, $_DAT_LOCK);
   my $_lock_chn = $self->{_lock_chn};
   my $_chn      = $self->{_chn};

   for (1 .. $self->{_data_channels}) {
      $self->{_dat_r_sock}->[$_] = $self->{_dat_w_sock}->[$_] = undef
         unless ($_ == $_chn);
   }

   if ($_lock_chn) {
      open $_DAT_LOCK, '+>>:raw:stdio', "$_sess_dir/_dat.lock.$_chn"
         or die "(W) open error $_sess_dir/_dat.lock.$_chn: $!\n";
   }

   open $_COM_LOCK, '+>>:raw:stdio', "$_sess_dir/_com.lock"
      or die "(W) open error $_sess_dir/_com.lock: $!\n";

   $self->{_dat_lock} = $_DAT_LOCK;
   $self->{_com_lock} = $_COM_LOCK;

   ## Delete attributes no longer required after being spawned.
   delete @{ $self }{ qw(
      flush_file flush_stderr flush_stdout stderr_file stdout_file
      on_post_exit on_post_run user_data user_error user_output
      _pids _state _status _thrs _tids
   ) };

   MCE::_clean_sessions($_mce_sid);

   ## Call module's worker_init routine for modules plugged into MCE.
   $_->($self) for (@{ $_plugin_worker_init });

   _do_send_init($self);

   ## Begin processing if worker was added during processing. Otherwise,
   ## respond back to the main process if the last worker spawned.
   if (defined $_params) {
      sleep 0.002;
      _worker_do($self, $_params);
      undef $_params;
   }
   elsif ($self->{_wid} == $self->{_total_workers}) {
      my $_buf; my $_COM_W_SOCK = $self->{_com_w_sock};
      sysread $self->{_que_r_sock}, $_buf, 1;
      local $\ = undef; print {$_COM_W_SOCK} $LF;
   }

   ## Enter worker loop.
   my $_status = _worker_loop($self);
   MCE::_clear_session($_mce_sid);

   ## Wait until MCE completes exit notification.
   $SIG{__DIE__} = $SIG{__WARN__} = sub { };

   select STDERR; $| = 1;
   select STDOUT; $| = 1;

   local $@; eval {
      my $_c; sysread $self->{_bse_r_sock}, $_c, 1;
   };

   sleep 0.005 if ($_is_winenv);

   if ($_lock_chn) {
      close $_DAT_LOCK; undef $_DAT_LOCK;
   }

   close $_COM_LOCK; undef $_COM_LOCK;

   return;
}

1;