The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
###############################################################################
## ----------------------------------------------------------------------------
## MCE::Grep - Parallel grep model similar to the native grep function.
##
###############################################################################

package MCE::Grep;

use strict;
use warnings;

## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
## no critic (TestingAndDebugging::ProhibitNoStrict)

use Scalar::Util qw( looks_like_number );

use MCE;

our $VERSION  = '1.608';

our @CARP_NOT = qw( MCE );

###############################################################################
## ----------------------------------------------------------------------------
## Import routine.
##
###############################################################################

my $MAX_WORKERS = 'auto';
my $CHUNK_SIZE  = 'auto';

my ($_MCE, $_loaded); my ($_params, $_prev_c); my $_tag = 'MCE::Grep';

sub import {

   my $_class = shift; return if ($_loaded++);

   ## Process module arguments.
   while (my $_argument = shift) {
      my $_arg = lc $_argument;

      $MAX_WORKERS = shift and next if ( $_arg eq 'max_workers' );
      $CHUNK_SIZE  = shift and next if ( $_arg eq 'chunk_size' );

      $MCE::FREEZE = $MCE::MCE->{freeze} = shift and next
         if ( $_arg eq 'freeze' );
      $MCE::THAW = $MCE::MCE->{thaw} = shift and next
         if ( $_arg eq 'thaw' );

      if ( $_arg eq 'sereal' ) {
         if (shift eq '1') {
            local $@; eval 'use Sereal qw(encode_sereal decode_sereal)';
            unless ($@) {
               $MCE::FREEZE = $MCE::MCE->{freeze} = \&encode_sereal;
               $MCE::THAW = $MCE::MCE->{thaw} = \&decode_sereal;
            }
         }
         next;
      }

      if ( $_arg eq 'tmp_dir' ) {
         $MCE::TMP_DIR = $MCE::MCE->{tmp_dir} = shift;
         my $_e1 = 'is not a directory or does not exist';
         my $_e2 = 'is not writeable';
         _croak($_tag."::import: ($MCE::TMP_DIR) $_e1") unless -d $MCE::TMP_DIR;
         _croak($_tag."::import: ($MCE::TMP_DIR) $_e2") unless -w $MCE::TMP_DIR;
         next;
      }

      _croak($_tag."::import: ($_argument) is not a valid module argument");
   }

   $MAX_WORKERS = MCE::Util::_parse_max_workers($MAX_WORKERS);
   _validate_number($MAX_WORKERS, 'MAX_WORKERS');

   _validate_number($CHUNK_SIZE, 'CHUNK_SIZE')
      unless ($CHUNK_SIZE eq 'auto');

   ## Import functions.
   no strict 'refs'; no warnings 'redefine';
   my $_pkg = caller;

   *{ $_pkg.'::mce_grep_f' } = \&run_file;
   *{ $_pkg.'::mce_grep_s' } = \&run_seq;
   *{ $_pkg.'::mce_grep'   } = \&run;

   return;
}

END {
   return if (defined $_MCE && $_MCE->wid);

   finish();
}

###############################################################################
## ----------------------------------------------------------------------------
## Gather callback for storing by chunk_id => chunk_ref into a hash.
##
###############################################################################

my ($_total_chunks, %_tmp);

sub _gather {

   my ($_chunk_id, $_data_ref) = @_;

   $_tmp{$_chunk_id} = $_data_ref;
   $_total_chunks++;

   return;
}

###############################################################################
## ----------------------------------------------------------------------------
## Init and finish routines.
##
###############################################################################

sub init (@) {

   shift if (defined $_[0] && $_[0] eq 'MCE::Grep');

   if (MCE->wid) {
      @_ = (); _croak(
         "$_tag: function cannot be called by the worker process"
      );
   }

   finish(); $_params = (ref $_[0] eq 'HASH') ? shift : { @_ };

   @_ = ();

   return;
}

sub finish () {

   if (defined $_MCE && $_MCE->{_spawned}) {
      MCE::_save_state; $_MCE->shutdown(); MCE::_restore_state;
   }

   $_prev_c = $_total_chunks = undef; undef %_tmp;

   return;
}

###############################################################################
## ----------------------------------------------------------------------------
## Parallel grep with MCE -- file.
##
###############################################################################

sub run_file (&@) {

   shift if (defined $_[0] && $_[0] eq 'MCE::Grep');

   my $_code = shift; my $_file = shift;

   if (defined $_params) {
      delete $_params->{input_data} if (exists $_params->{input_data});
      delete $_params->{sequence}   if (exists $_params->{sequence});
   }
   else {
      $_params = {};
   }

   if (defined $_file && ref $_file eq '' && $_file ne '') {
      _croak("$_tag: ($_file) does not exist") unless (-e $_file);
      _croak("$_tag: ($_file) is not readable") unless (-r $_file);
      _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
      $_params->{_file} = $_file;
   }
   elsif (ref $_file eq 'GLOB' || ref $_file eq 'SCALAR' || ref($_file) =~ /^IO::/) {
      $_params->{_file} = $_file;
   }
   else {
      _croak("$_tag: (file) is not specified or valid");
   }

   @_ = ();

   return run($_code);
}

###############################################################################
## ----------------------------------------------------------------------------
## Parallel grep with MCE -- sequence.
##
###############################################################################

sub run_seq (&@) {

   shift if (defined $_[0] && $_[0] eq 'MCE::Grep');

   my $_code = shift;

   if (defined $_params) {
      delete $_params->{input_data} if (exists $_params->{input_data});
      delete $_params->{_file}      if (exists $_params->{_file});
   }
   else {
      $_params = {};
   }

   my ($_begin, $_end);

   if (ref $_[0] eq 'HASH') {
      $_begin = $_[0]->{begin}; $_end = $_[0]->{end};
      $_params->{sequence} = $_[0];
   }
   elsif (ref $_[0] eq 'ARRAY') {
      $_begin = $_[0]->[0]; $_end = $_[0]->[1];
      $_params->{sequence} = $_[0];
   }
   elsif (ref $_[0] eq '') {
      $_begin = $_[0]; $_end = $_[1];
      $_params->{sequence} = [ @_ ];
   }
   else {
      _croak("$_tag: (sequence) is not specified or valid");
   }

   _croak("$_tag: (begin) is not specified for sequence")
      unless (defined $_begin);

   _croak("$_tag: (end) is not specified for sequence")
      unless (defined $_end);

   $_params->{sequence_run} = 1;

   @_ = ();

   return run($_code);
}

###############################################################################
## ----------------------------------------------------------------------------
## Parallel grep with MCE.
##
###############################################################################

sub run (&@) {

   shift if (defined $_[0] && $_[0] eq 'MCE::Grep');

   my $_code = shift;   $_total_chunks = 0; undef %_tmp;

   if (MCE->wid) {
      @_ = (); _croak(
         "$_tag: function cannot be called by the worker process"
      );
   }

   my $_input_data; my $_max_workers = $MAX_WORKERS; my $_r = ref $_[0];

   if ($_r eq 'ARRAY' || $_r eq 'CODE' || $_r eq 'GLOB' || $_r eq 'SCALAR' || $_r =~ /^IO::/) {
      $_input_data = shift;
   }

   if (defined $_params) { my $_p = $_params;
      $_max_workers = MCE::Util::_parse_max_workers($_p->{max_workers})
         if (exists $_p->{max_workers});

      delete $_p->{sequence}    if (defined $_input_data || scalar @_);
      delete $_p->{user_func}   if (exists $_p->{user_func});
      delete $_p->{user_tasks}  if (exists $_p->{user_tasks});
      delete $_p->{use_slurpio} if (exists $_p->{use_slurpio});
      delete $_p->{bounds_only} if (exists $_p->{bounds_only});
      delete $_p->{gather}      if (exists $_p->{gather});
   }

   my $_chunk_size = MCE::Util::_parse_chunk_size(
      $CHUNK_SIZE, $_max_workers, $_params, $_input_data, scalar @_
   );

   if (defined $_params) {
      if (exists $_params->{_file}) {
         $_input_data = delete $_params->{_file};
      }
      else {
         $_input_data = $_params->{input_data} if exists $_params->{input_data};
      }
   }

   MCE::_save_state;

   ## -------------------------------------------------------------------------

   if (!defined $_prev_c || $_prev_c != $_code) {
      $_MCE->shutdown() if (defined $_MCE);
      $_prev_c = $_code;

      my %_options = (
         max_workers => $_max_workers, task_name => $_tag,
         user_func => sub {

            my ($_mce, $_chunk_ref, $_chunk_id) = @_;
            my $_wantarray = $_mce->{user_args}[0];

            if ($_wantarray) {
               my @_a;

               if (ref $_chunk_ref eq 'SCALAR') {
                  local $/ = $_mce->{RS} if defined $_mce->{RS};
                  open my  $_MEM_FH, '<', $_chunk_ref;
                  while ( <$_MEM_FH> ) { push (@_a, $_) if &{ $_code }; }
                  close    $_MEM_FH;
               }
               else {
                  if (ref $_chunk_ref) {
                     push @_a, grep { &{ $_code } } @{ $_chunk_ref };
                  } else {
                     push @_a, grep { &{ $_code } } $_chunk_ref;
                  }
               }

               MCE->gather($_chunk_id, \@_a);
            }
            else {
               my $_cnt = 0;

               if (ref $_chunk_ref eq 'SCALAR') {
                  local $/ = $_mce->{RS} if defined $_mce->{RS};
                  open my  $_MEM_FH, '<', $_chunk_ref;
                  while ( <$_MEM_FH> ) { $_cnt++ if &{ $_code }; }
                  close    $_MEM_FH;
               }
               else {
                  if (ref $_chunk_ref) {
                     $_cnt += grep { &{ $_code } } @{ $_chunk_ref };
                  } else {
                     $_cnt += grep { &{ $_code } } $_chunk_ref;
                  }
               }

               MCE->gather($_cnt) if defined $_wantarray;
            }
         },
      );

      if (defined $_params) {
         for my $_p (keys %{ $_params }) {
            next if ($_p eq 'sequence_run');
            next if ($_p eq 'input_data');
            next if ($_p eq 'chunk_size');

            _croak("MCE::Grep: ($_p) is not a valid constructor argument")
               unless (exists $MCE::_valid_fields_new{$_p});

            $_options{$_p} = $_params->{$_p};
         }
      }

      $_MCE = MCE->new(%_options);
   }

   ## -------------------------------------------------------------------------

   my $_cnt = 0; my $_wantarray = wantarray;

   $_MCE->{use_slurpio} = ($_chunk_size > MCE::MAX_RECS_SIZE) ? 1 : 0;
   $_MCE->{user_args} = [ $_wantarray ];

   $_MCE->{gather} = $_wantarray
      ? \&_gather : sub { $_cnt += $_[0]; return; };

   if (defined $_input_data) {
      @_ = ();
      $_MCE->process({ chunk_size => $_chunk_size }, $_input_data);
      delete $_MCE->{input_data};
   }
   elsif (scalar @_) {
      $_MCE->process({ chunk_size => $_chunk_size }, \@_);
      delete $_MCE->{input_data};
   }
   else {
      if (defined $_params && exists $_params->{sequence}) {
         $_MCE->run({
            chunk_size => $_chunk_size, sequence => $_params->{sequence}
         }, 0);
         if (exists $_params->{sequence_run}) {
            delete $_params->{sequence_run};
            delete $_params->{sequence};
         }
         delete $_MCE->{sequence};
      }
   }

   MCE::_restore_state;

   if (exists $_MCE->{_rla_return}) {
      $MCE::MCE->{_rla_return} = delete $_MCE->{_rla_return};
   }

   finish() if ($^S);   ## shutdown if in eval state

   if ($_wantarray) {
      return map { @{ $_ } } delete @_tmp{ 1 .. $_total_chunks };
   }
   elsif (defined $_wantarray) {
      return $_cnt;
   }

   return;
}

###############################################################################
## ----------------------------------------------------------------------------
## Private methods.
##
###############################################################################

sub _croak {

   goto &MCE::_croak;
}

sub _validate_number {

   my ($_n, $_key) = @_;

   _croak("$_tag: ($_key) is not valid") if (!defined $_n);

   $_n =~ s/K\z//i; $_n =~ s/M\z//i;

   if (!looks_like_number($_n) || int($_n) != $_n || $_n < 1) {
      _croak("$_tag: ($_key) is not valid");
   }

   return;
}

1;

__END__

###############################################################################
## ----------------------------------------------------------------------------
## Module usage.
##
###############################################################################

=head1 NAME

MCE::Grep - Parallel grep model similar to the native grep function

=head1 VERSION

This document describes MCE::Grep version 1.608

=head1 SYNOPSIS

   ## Exports mce_grep, mce_grep_f, and mce_grep_s
   use MCE::Grep;

   ## Array or array_ref
   my @a = mce_grep { $_ % 5 == 0 } 1..10000;
   my @b = mce_grep { $_ % 5 == 0 } [ 1..10000 ];

   ## File_path, glob_ref, or scalar_ref
   my @c = mce_grep_f { /pattern/ } "/path/to/file";
   my @d = mce_grep_f { /pattern/ } $file_handle;
   my @e = mce_grep_f { /pattern/ } \$scalar;

   ## Sequence of numbers (begin, end [, step, format])
   my @f = mce_grep_s { %_ * 3 == 0 } 1, 10000, 5;
   my @g = mce_grep_s { %_ * 3 == 0 } [ 1, 10000, 5 ];

   my @h = mce_grep_s { %_ * 3 == 0 } {
      begin => 1, end => 10000, step => 5, format => undef
   };

=head1 DESCRIPTION

This module provides a parallel grep implementation via Many-Core Engine.
MCE incurs a small overhead due to passing of data. A fast code block will
run faster natively. However, the overhead will likely diminish as the
complexity increases for the code.

   my @m1 =     grep { $_ % 5 == 0 } 1..1000000;          ## 0.065 secs
   my @m2 = mce_grep { $_ % 5 == 0 } 1..1000000;          ## 0.194 secs

Chunking, enabled by default, greatly reduces the overhead behind the scene.
The time for mce_grep below also includes the time for data exchanges between
the manager and worker processes. More parallelization will be seen when the
code incurs additional CPU time.

   my @m1 =     grep { /[2357][1468][9]/ } 1..1000000;    ## 0.353 secs
   my @m2 = mce_grep { /[2357][1468][9]/ } 1..1000000;    ## 0.218 secs

Even faster is mce_grep_s; useful when input data is a range of numbers.
Workers generate sequences mathematically among themselves without any
interaction from the manager process. Two arguments are required for
mce_grep_s (begin, end). Step defaults to 1 if begin is smaller than end,
otherwise -1.

   my @m3 = mce_grep_s { /[2357][1468][9]/ } 1, 1000000;  ## 0.165 secs

Although this document is about MCE::Grep, the L<MCE::Stream|MCE::Stream>
module can write results immediately without waiting for all chunks to
complete. This is made possible by passing the reference to an array
(in this case @m4 and @m5).

   use MCE::Stream default_mode => 'grep';

   my @m4; mce_stream \@m4, sub { /[2357][1468][9]/ }, 1..1000000;

      ## Completed in 0.203 secs. This is amazing considering the
      ## overhead for passing data between the manager and workers.

   my @m5; mce_stream_s \@m5, sub { /[2357][1468][9]/ }, 1, 1000000;

      ## Completed in 0.120 secs. Like with mce_grep_s, specifying a
      ## sequence specification turns out to be faster due to lesser
      ## overhead for the manager process.

A common scenario is grepping for pattern(s) inside a massive log file.
Notice how parallelism increases as complexity increases for the pattern.
Testing was done against a 300 MB file containing 250k lines.

   use MCE::Grep;

   my @m; open my $LOG, "<", "/path/to/log/file" or die "$!\n";

   @m = grep { /pattern/ } <$LOG>;                      ##  0.756 secs
   @m = grep { /foobar|[2357][1468][9]/ } <$LOG>;       ## 24.681 secs

   ## Parallelism with mce_grep. This involves the manager process
   ## due to processing a file handle.

   @m = mce_grep { /pattern/ } <$LOG>;                  ##  0.997 secs
   @m = mce_grep { /foobar|[2357][1468][9]/ } <$LOG>;   ##  7.439 secs

   ## Even faster with mce_grep_f. Workers access the file directly
   ## with zero interaction from the manager process.

   my $LOG = "/path/to/file";
   @m = mce_grep_f { /pattern/ } $LOG;                  ##  0.112 secs
   @m = mce_grep_f { /foobar|[2357][1468][9]/ } $LOG;   ##  6.840 secs

=head1 OVERRIDING DEFAULTS

The following list 5 options which may be overridden when loading the module.

   use Sereal qw( encode_sereal decode_sereal );
   use CBOR::XS qw( encode_cbor decode_cbor );
   use JSON::XS qw( encode_json decode_json );

   use MCE::Grep
         max_workers => 4,               ## Default 'auto'
         chunk_size => 100,              ## Default 'auto'
         tmp_dir => "/path/to/app/tmp",  ## $MCE::Signal::tmp_dir
         freeze => \&encode_sereal,      ## \&Storable::freeze
         thaw => \&decode_sereal         ## \&Storable::thaw
   ;

There is a simpler way to enable Sereal with MCE 1.5. The following will
attempt to use Sereal if available, otherwise defaults to Storable for
serialization.

   use MCE::Grep Sereal => 1;

   ## Serialization is by the Sereal module if available.
   my @m2 = mce_grep { $_ % 5 == 0 } 1..10000;

=head1 CUSTOMIZING MCE

=over 3

=item MCE::Grep->init ( options )

=item MCE::Grep::init { options }

The init function accepts a hash of MCE options. The gather option, if
specified, is ignored due to being used internally by the module.

   use MCE::Grep;

   MCE::Grep::init {
      chunk_size => 1, max_workers => 4,

      user_begin => sub {
         print "## ", MCE->wid, " started\n";
      },

      user_end => sub {
         print "## ", MCE->wid, " completed\n";
      }
   };

   my @a = mce_grep { $_ % 5 == 0 } 1..100;

   print "\n", "@a", "\n";

   -- Output

   ## 2 started
   ## 3 started
   ## 1 started
   ## 4 started
   ## 3 completed
   ## 4 completed
   ## 1 completed
   ## 2 completed

   5 10 15 20 25 30 35 40 45 50 55 60 65 70 75 80 85 90 95 100

=back

=head1 API DOCUMENTATION

=over 3

=item MCE::Grep->run ( sub { code }, iterator )

=item mce_grep { code } iterator

An iterator reference can by specified for input_data. Iterators are described
under "SYNTAX for INPUT_DATA" at L<MCE::Core|MCE::Core>.

   my @a = mce_grep { $_ % 3 == 0 } make_iterator(10, 30, 2);

=item MCE::Grep->run ( sub { code }, list )

=item mce_grep { code } list

Input data can be defined using a list.

   my @a = mce_grep { /[2357]/ } 1..1000;
   my @b = mce_grep { /[2357]/ } [ 1..1000 ];

=item MCE::Grep->run_file ( sub { code }, file )

=item mce_grep_f { code } file

The fastest of these is the /path/to/file. Workers communicate the next offset
position among themselves without any interaction from the manager process.

   my @c = mce_grep_f { /pattern/ } "/path/to/file";
   my @d = mce_grep_f { /pattern/ } $file_handle;
   my @e = mce_grep_f { /pattern/ } \$scalar;

=item MCE::Grep->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )

=item mce_grep_s { code } $beg, $end [, $step, $fmt ]

Sequence can be defined as a list, an array reference, or a hash reference.
The functions require both begin and end values to run. Step and format are
optional. The format is passed to sprintf (% may be omitted below).

   my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");

   my @f = mce_grep_s { /[1234]\.[5678]/ } $beg, $end, $step, $fmt;
   my @g = mce_grep_s { /[1234]\.[5678]/ } [ $beg, $end, $step, $fmt ];

   my @h = mce_grep_s { /[1234]\.[5678]/ } {
      begin => $beg, end => $end, step => $step, format => $fmt
   };

=back

=head1 MANUAL SHUTDOWN

=over 3

=item MCE::Grep->finish

=item MCE::Grep::finish

Workers remain persistent as much as possible after running. Shutdown occurs
automatically when the script terminates. Call finish when workers are no
longer needed.

   use MCE::Grep;

   MCE::Grep::init {
      chunk_size => 20, max_workers => 'auto'
   };

   my @a = mce_grep { ... } 1..100;

   MCE::Grep::finish;

=back

=head1 INDEX

L<MCE|MCE>

=head1 AUTHOR

Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>

=cut