The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
###############################################################################
## ----------------------------------------------------------------------------
## MCE::Core::Input::Handle - File_path and Scalar_ref input reader.
##
## This package provides the read handle method used internally by the worker
## process. Distribution follows a bank-queuing model.
##
## There is no public API.
##
###############################################################################

package MCE::Core::Input::Handle;

our $VERSION = '1.516'; $VERSION = eval $VERSION;

## Items below are folded into MCE.

package MCE;

use strict;
use warnings;

use bytes;

## Warnings are disabled to minimize bits of noise when user or OS signals
## the script to exit. e.g. MCE_script.pl < infile | head

no warnings 'threads'; no warnings 'uninitialized';

my $_que_read_size = $MCE::_que_read_size;
my $_que_template  = $MCE::_que_template;

###############################################################################
## ----------------------------------------------------------------------------
## Worker process -- Read handle.
##
###############################################################################

sub _worker_read_handle {

   my MCE $self = $_[0]; my $_proc_type = $_[1]; my $_input_data = $_[2];

   @_ = ();

   die "Private method called" unless (caller)[0]->isa( ref($self) );

   _croak("MCE::_worker_read_handle: 'user_func' is not specified")
      unless (defined $self->{user_func});

   my $_QUE_R_SOCK  = $self->{_que_r_sock};
   my $_QUE_W_SOCK  = $self->{_que_w_sock};
   my $_chunk_size  = $self->{chunk_size};
   my $_use_slurpio = $self->{use_slurpio};
   my $_parallel_io = $self->{parallel_io};
   my $_RS          = $self->{RS} || $/;
   my $_RS_FLG      = (!$_RS || $_RS ne $LF);
   my $_wuf         = $self->{_wuf};

   my ($_data_size, $_next, $_chunk_id, $_offset_pos, $_IN_FILE, $_tmp_cs);
   my @_records = (); $_chunk_id = $_offset_pos = 0;

   $_data_size = ($_proc_type == READ_MEMORY)
      ? length $$_input_data : -s $_input_data;

   if ($_chunk_size <= MAX_RECS_SIZE || $_proc_type == READ_MEMORY) {
      open    $_IN_FILE, '<', $_input_data or die "$_input_data: $!\n";
      binmode $_IN_FILE;
   } else {
      sysopen $_IN_FILE, $_input_data, O_RDONLY or die "$_input_data: $!\n";
   }

   ## -------------------------------------------------------------------------

   $self->{_next_jmp} = sub { goto _WORKER_READ_HANDLE__NEXT; };
   $self->{_last_jmp} = sub { goto _WORKER_READ_HANDLE__LAST; };

   _WORKER_READ_HANDLE__NEXT:

   while (1) {

      ## Don't declare $_buffer with other vars above, instead it's done here.
      ## Doing so will fail with Perl 5.8.0 under Solaris 5.10 on large files.

      my $_buffer;

      ## Obtain the next chunk_id and offset position.
      sysread $_QUE_R_SOCK, $_next, $_que_read_size;
      ($_chunk_id, $_offset_pos) = unpack($_que_template, $_next);

      if ($_offset_pos >= $_data_size) {
         syswrite $_QUE_W_SOCK, pack($_que_template, 0, $_offset_pos);
         close $_IN_FILE; undef $_IN_FILE;
         return;
      }

      $_chunk_id++;

      ## Read data.
      if ($_chunk_size <= MAX_RECS_SIZE) {        ## One or many records.
         local $/ = $_RS if ($_RS_FLG);
         seek $_IN_FILE, $_offset_pos, 0;

         if ($_chunk_size == 1) {
            $_buffer = <$_IN_FILE>;
         }
         else {
            if ($_use_slurpio) {
               $_buffer .= <$_IN_FILE> for (0 .. $_chunk_size - 1);
            }
            else {
               for (0 .. $_chunk_size - 1) {
                  $_records[$_] = <$_IN_FILE>;
                  unless (defined $_records[$_]) {
                     delete @_records[$_ .. $_chunk_size - 1];
                     last;
                  }
               }
            }
         }

         syswrite $_QUE_W_SOCK,
            pack($_que_template, $_chunk_id, tell $_IN_FILE);
      }
      else {                                      ## Large chunk.
         local $/ = $_RS if ($_RS_FLG);

         if ($_proc_type == READ_MEMORY) {
            if ($_parallel_io) {
               syswrite $_QUE_W_SOCK,
                  pack($_que_template, $_chunk_id, $_offset_pos + $_chunk_size);

               $_tmp_cs = $_chunk_size;
               seek $_IN_FILE, $_offset_pos, 0;

               if ($_offset_pos) {
                  $_tmp_cs -= length <$_IN_FILE> || 0;
               }

               if (read($_IN_FILE, $_buffer, $_tmp_cs) == $_tmp_cs) {
                  $_buffer .= <$_IN_FILE>;
               }
            }
            else {
               seek $_IN_FILE, $_offset_pos, 0;

               if (read($_IN_FILE, $_buffer, $_chunk_size) == $_chunk_size) {
                  $_buffer .= <$_IN_FILE>;
               }

               syswrite $_QUE_W_SOCK,
                  pack($_que_template, $_chunk_id, tell $_IN_FILE);
            }
         }
         else {
            if ($_parallel_io) {
               syswrite $_QUE_W_SOCK,
                  pack($_que_template, $_chunk_id, $_offset_pos + $_chunk_size);

               $_tmp_cs = $_chunk_size;

               if ($_offset_pos) {
                  seek $_IN_FILE, $_offset_pos, 0;
                  $_tmp_cs -= length <$_IN_FILE> || 0;
                  sysseek $_IN_FILE, tell $_IN_FILE, 0;
               }
               else {
                  sysseek $_IN_FILE, $_offset_pos, 0;
               }

               if (sysread($_IN_FILE, $_buffer, $_tmp_cs) == $_tmp_cs) {
                  seek $_IN_FILE, sysseek($_IN_FILE, 0, 1), 0;
                  $_buffer .= <$_IN_FILE>;
               }
            }
            else {
               sysseek $_IN_FILE, $_offset_pos, 0;

               if (sysread($_IN_FILE, $_buffer, $_chunk_size) == $_chunk_size) {
                  seek $_IN_FILE, sysseek($_IN_FILE, 0, 1), 0;
                  $_buffer .= <$_IN_FILE>;
               }
               else {
                  seek $_IN_FILE, sysseek($_IN_FILE, 0, 1), 0;
               }

               syswrite $_QUE_W_SOCK,
                  pack($_que_template, $_chunk_id, tell $_IN_FILE);
            }
         }
      }

      ## Call user function.
      if ($_use_slurpio) {
         local $_ = \$_buffer;
         $_wuf->($self, \$_buffer, $_chunk_id);
      }
      else {
         if ($_chunk_size == 1) {
            local $_ = $_buffer;
            $_wuf->($self, [ $_buffer ], $_chunk_id);
         }
         else {
            if ($_chunk_size > MAX_RECS_SIZE) {
               local $/ = $_RS if ($_RS_FLG);
               _sync_buffer_to_array(\$_buffer, \@_records);
            }
            local $_ = \@_records;
            $_wuf->($self, \@_records, $_chunk_id);
         }
      }
   }

   _WORKER_READ_HANDLE__LAST:

   close $_IN_FILE; undef $_IN_FILE;

   return;
}

1;