The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
###############################################################################
## ----------------------------------------------------------------------------
## MCE::Core::Input::Request - Array_ref and Glob_ref input reader.
##
## This package provides the request chunk method used internally by the worker
## process. Distribution follows a bank-queuing model.
##
## There is no public API.
##
###############################################################################

package MCE::Core::Input::Request;

use strict;
use warnings;

our $VERSION = '1.522';

## Items below are folded into MCE.

package MCE;

## Warnings are disabled to minimize bits of noise when user or OS signals
## the script to exit. e.g. MCE_script.pl < infile | head

no warnings 'threads'; no warnings 'uninitialized';

###############################################################################
## ----------------------------------------------------------------------------
## Worker process -- Request chunk.
##
###############################################################################

sub _worker_request_chunk {

   my $self = $_[0]; my $_proc_type = $_[1];

   @_ = ();

   die 'Private method called' unless (caller)[0]->isa( ref $self );

   _croak('MCE::_worker_request_chunk: (user_func) is not specified')
      unless (defined $self->{user_func});

   my $_chn         = $self->{_chn};
   my $_DAT_LOCK    = $self->{_dat_lock};
   my $_DAT_W_SOCK  = $self->{_dat_w_sock}->[0];
   my $_DAU_W_SOCK  = $self->{_dat_w_sock}->[$_chn];
   my $_lock_chn    = $self->{_lock_chn};
   my $_single_dim  = $self->{_single_dim};
   my $_chunk_size  = $self->{chunk_size};
   my $_use_slurpio = $self->{use_slurpio};
   my $_RS          = $self->{RS} || $/;
   my $_RS_FLG      = (!$_RS || $_RS ne $LF);
   my $_I_FLG       = (!$/ || $/ ne $LF);
   my $_wuf         = $self->{_wuf};

   my ($_chunk_id, $_len, $_chunk_ref);
   my ($_output_tag, @_records);

   if ($_proc_type == REQUEST_ARRAY) {
      $_output_tag = OUTPUT_A_ARY;
   } else {
      $_output_tag = OUTPUT_S_GLB;
      @_records    = ();
   }

   ## -------------------------------------------------------------------------

   $self->{_next_jmp} = sub { goto _WORKER_REQUEST_CHUNK__NEXT; };
   $self->{_last_jmp} = sub { goto _WORKER_REQUEST_CHUNK__LAST; };

   _WORKER_REQUEST_CHUNK__NEXT:

   while (1) {

      ## Don't declare $_buffer with other vars above, instead it's done here.
      ## Doing so will fail with Perl 5.8.0 under Solaris 5.10 on large files.

      my $_buffer;

      ## Obtain the next chunk of data.
      {
         local $\ = undef if (defined $\); local $/ = $LF if ($_I_FLG);

         flock $_DAT_LOCK, LOCK_EX if ($_lock_chn);
         print {$_DAT_W_SOCK} $_output_tag . $LF . $_chn . $LF;
         chomp($_len = <$_DAU_W_SOCK>);

         unless ($_len) {
            flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);
            return;
         }

         chomp($_chunk_id = <$_DAU_W_SOCK>);
         read $_DAU_W_SOCK, $_buffer, $_len;

         flock $_DAT_LOCK, LOCK_UN if ($_lock_chn);
      }

      ## Call user function.
      if ($_proc_type == REQUEST_ARRAY) {
         if ($_single_dim && $_chunk_size == 1) {
            local $_ = $_buffer;
            $_wuf->($self, [ $_buffer ], $_chunk_id);
         }
         else {
            $_chunk_ref = $self->{thaw}($_buffer); undef $_buffer;
            local $_ = ($_chunk_size == 1) ? $_chunk_ref->[0] : $_chunk_ref;
            $_wuf->($self, $_chunk_ref, $_chunk_id);
         }
      }
      else {
         if ($_use_slurpio) {
            local $_ = \$_buffer;
            $_wuf->($self, \$_buffer, $_chunk_id);
         }
         else {
            if ($_chunk_size == 1) {
               local $_ = $_buffer;
               $_wuf->($self, [ $_buffer ], $_chunk_id);
            }
            else {
               {
                  local $/ = $_RS if ($_RS_FLG);
                  _sync_buffer_to_array(\$_buffer, \@_records);
               }

               local $_ = \@_records;
               $_wuf->($self, \@_records, $_chunk_id);
            }
         }
      }
   }

   _WORKER_REQUEST_CHUNK__LAST:

   return;
}

1;