The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/env perl
###############################################################################
##
## FASTA Reader; parallel callback mode using mce_flow_f.
##
## Synopsis
##   fasta_rdr3.pl [ /path/to/fastafile.fa [ trim_seq ] ]
##
##   CKSIZE=64m perl fasta_rdr3.pl hg19.fa   0   run with chunk_size => '64m'
##   CKSIZE=4m  perl fasta_rdr3.pl uniref.fa 1   run with chunk_size => '4m'
##
##   time NPROCS=1 CKSIZE=4m ./fasta_rdr3.pl uniref.fa 0 | wc -l
##   time NPROCS=8 CKSIZE=4m ./fasta_rdr3.pl uniref.fa 1 | wc -l
##
##   cat uniref100.fasta | NPROCS=8 CKSIZE=4m perl fasta_rdr3.pl - 0
##
###############################################################################

use strict;
use warnings;

use Cwd 'abs_path';
use lib abs_path($0 =~ m{^(.*)[\\/]} && $1 || abs_path) . '/../../lib';
use lib abs_path($0 =~ m{^(.*)[\\/]} && $1 || abs_path) . '/include';

use Time::HiRes qw( time );
use MCE::Flow;
use Fasta;

my $nprocs = $ENV{NPROCS} || 2;
my $cksize = $ENV{CKSIZE} || '2m';

## Process file.

my $file     = shift || \*STDIN;  $file = \*STDIN if $file eq '-';
my $trim_seq = shift || 0;
my $start    = time;

mce_flow_f {
   max_workers => $nprocs, chunk_size => $cksize,
   RS => "\n>", use_slurpio => 1,
   gather => output_iter(),

}, \&user_func, $file;

printf {*STDERR} "\n## Compute time: %0.03f\n\n", time - $start;

exit;

###############################################################################
##
## Manager function(s).
##
###############################################################################

{
   ## Iterator for preserving output order.
 
   sub output_iter {

      ## One can have this receive 2 arguments; $chunk_id and $chunk_data.
      ## However, MCE->freeze is called when more than 1 argument is sent.
      ## For performance, $chunk_id is attached to the end of $_[0].

      my %tmp; my $order_id = 1;
 
      return sub {
         my $chunk_id = substr($_[0], rindex($_[0], ':') + 1);
         my $chop_len = length($chunk_id) + 1;

         substr($_[0], -$chop_len, $chop_len, '');   ## trim out chunk_id

         if ($chunk_id == $order_id && keys %tmp == 0) {
            ## no need to save in cache if orderly
            print $_[0];
            $order_id++;
         }
         else {
            ## hold temporarily otherwise
            $tmp{$chunk_id} = $_[0];
            while (1) {
               last unless exists $tmp{$order_id};
               print delete $tmp{$order_id++};
            }
         }
 
         return;
      };
   }
}

###############################################################################
##
## Worker function(s).
##
###############################################################################

{
   use constant { ID => 0, DESC => 1, SEQ => 2 };

   ## Callback function for reader called once per each sequence.

   my $output = '';                                  ## unique, not shared

   sub reader_cb {
    # my ($id, $desc, $seq) = @_;                    ## this passes by value,
    # $output .= "$id\t$desc\n";                     ## consuming extra memory

    # $output .= ">$_[ID]\n". $_[SEQ] ."\n";         ## hash-like access, least
      $output .=  "$_[ID]\t$_[DESC]\n";              ## memory consumption

      return;
   }

   ## The user_func block is called once per each input_data chunk.

   sub user_func {
      my ($mce, $slurp_ref, $chunk_id) = @_;

      ## run, calls reader_cb for each sequence
      Fasta::Reader($slurp_ref, $trim_seq, \&reader_cb);

      ## send output to the manager process
      MCE->gather($output .':'. $chunk_id);          ## attach chunk_id value

      $output = '';                                  ## reset output buffer

      return;
   }
}