Mario Roy > MCE-1.306 > MCE

Download:
MCE-1.306.tar.gz

Dependencies

Annotate this POD

Website

CPAN RT

Open  0
View/Report Bugs
Module Version: 1.306   Source   Latest Release: MCE-1.515

NAME ^

MCE - Many-Core Engine for Perl. Provides parallel processing capabilities.

VERSION ^

This document describes MCE version 1.306

DESCRIPTION ^

Many-core Engine (MCE) for Perl helps enable a new level of performance by maximizing all available cores. MCE spawns a pool of workers and therefore does not fork a new process per each element of data. Instead, MCE follows a bank queuing model. Imagine the line being the data and bank-tellers the parallel workers. MCE enhances that model by adding the ability to chunk the next n elements from the input stream to the next available worker.

Both chunking and input are optional in MCE. One can simply use MCE to have many workers run in parallel.

SYNOPSIS ^

new ( [ options ] )

   use MCE;

   ## A new instance shown with all available options.

   my $mce = MCE->new(

      tmp_dir      => $tmp_dir,

          ## Default is $MCE::Signal::tmp_dir which points to
          ## $ENV{TEMP} if defined. Otherwise, tmp_dir points
          ## to a location under /tmp.

      max_workers  => 8,                 ## Default is 1

          ## Number of workers to spawn.

      input_data   => $input_file,       ## Default is undef

          ## input_data => '/path/to/file' for input file
          ## input_data => \@array for input array
          ## input_data => \*FILE_HNDL for file handle
          ## input_data => \$scalar to treat like a file

          ## Use the sequence option if simply wanting to loop
          ## through a sequence of numbers instead.

      chunk_size   => 2000,              ## Default is 1

          ## Less than or equal to 8192 is number of records.
          ## Greater than 8192 is number of bytes. MCE reads
          ## till the end of record before calling user_func.

          ## chunk_size =>     1,        ## Consists of 1 record
          ## chunk_size =>  1000,        ## Consists of 1000 records
          ## chunk_size => 16384,        ## Approximate 16384 bytes
          ## chunk_size => 50000,        ## Approximate 50000 bytes

          ## Chunk_size is set to 1 for $self->foreach and
          ## the $self->forseq methods.

      use_slurpio  => 1,                 ## Default is 0
      use_threads  => 1,                 ## Default is 0 or 1

          ## Whether or not to enable slurpio when reading files
          ## (passes raw chunk to user function).

          ## By default MCE does forking (spawns child processes).
          ## MCE also supports threads via 2 threading libraries.
          ##
          ## The use of threads in MCE requires that you include
          ## threads support prior to loading MCE.
          ##
          ##    use threads;                  use forks;
          ##    use threads::shared;   (or)   use forks::shared;
          ##
          ##    use MCE                       use MCE;

      spawn_delay  => 0.035,             ## Default is undef
      submit_delay => 0.002,             ## Default is undef
      job_delay    => 0.150,             ## Default is undef

          ## Time to wait, in fractional seconds, before spawning
          ## each worker, parameters submission to each worker,
          ## and job commencement (staggered) for each worker.
          ## For example, use job_delay when wanting to stagger
          ## many workers connecting to a database.

      on_post_exit => \&on_post_exit,    ## Default is undef
      on_post_run  => \&on_post_run,     ## Default is undef

          ## Execute code block immediately after a worker exits
          ## (exit, $self->exit, or die). Execute code block after
          ## running a job ($self->process or $self->run).

          ## One can take action immediately after a worker exits
          ## or wait until after the job has completed.

      user_begin   => \&user_begin,      ## Default is undef
      user_func    => \&user_func,       ## Default is undef
      user_end     => \&user_end,        ## Default is undef

          ## Think of user_begin, user_func, user_end like the awk
          ## scripting language:
          ##    awk 'BEGIN { ... } { ... } END { ... }'

          ## MCE workers calls user_begin once per job, then
          ## calls user_func repeatedly until no chunks remain.
          ## Afterwards, user_end is called.

      user_error   => \&user_error,      ## Default is undef
      user_output  => \&user_output,     ## Default is undef

          ## When workers call the following functions, MCE will
          ## pass the data to user_error/user_output if defined.
          ## $self->sendto('stderr', 'Sending to STDERR');
          ## $self->sendto('stdout', 'Sending to STDOUT');

      stderr_file  => 'err_file',        ## Default is STDERR
      stdout_file  => 'out_file',        ## Default is STDOUT

          ## Or to file. User_error/user_output take precedence.

      flush_file   => 1,                 ## Default is 0
      flush_stderr => 1,                 ## Default is 0
      flush_stdout => 1,                 ## Default is 0

          ## Flush sendto file, standard error, or standard output.

      sequence     => {                  ## Default is undef
          begin => -1, end => 1 [, step => 0.1 [, format => "4.1f" ] ]
      },

          ## For looping through a sequence of numbers in parallel.
          ## STEP, if omitted, defaults to 1 if BEGIN is smaller than
          ## END or -1 if BEGIN is greater than END. The FORMAT string
          ## (without the %) is passed to sprintf behind the scene.
          ## e.g. $seq_n_formated = sprintf("%4.1f", $seq_n);

          ## Leave input_data set to undef when specifying the sequence
          ## option. One cannot specify both options together.

      user_tasks   => [                  ## Default is undef
         { ... },                        ## Options for task 0
         { ... },                        ## Options for task 1
         { ... },                        ## Options for task 2
      ],

          ## Takes a list of hash references, each allowing for 7
          ## options: max_workers, user_begin, user_func, user_end
          ## chunk_size, sequence, and task_end. Other options
          ## specified here are ignored. Input_data, if specified
          ## from the main MCE instance, is only applicable to task 0.

          ## Options not specified here (excluding task_end) will
          ## default to same option specified from the main MCE
          ## instance.

          ## The task_end, if specified, is called after the task
          ## has completed processing.

   );

RUNNING

   ## Run calls spawn, kicks off job, workers call user_begin,
   ## user_func, and user_end. Run shuts down workers afterwards.

   $mce->run();

   ## OR, spawn workers early.

   $mce->spawn();

   ## Acquire data arrays and/or input_files. The same pool of
   ## workers are used.

   $mce->process(\@input_data_1);        ## Process arrays
   $mce->process(\@input_data_2);
   $mce->process(\@input_data_n);

   $mce->process('input_file_1');        ## Process files
   $mce->process('input_file_2');
   $mce->process('input_file_n');

   ## Shutdown workers afterwards.

   $mce->shutdown();

SEQUENCE

   ## The 1.300 release allows workers to loop through a sequence of
   ## numbers in parallel. The distinct difference betweeen specifying
   ## input_data and sequence is input_data follows a bank queuing
   ## model when walking through the data. For sequence, workers walk
   ## through a sequence of numbers calculated mathematically without
   ## the overhead of an array. The latter can also be specified per
   ## each user_task. Input_data is applicable to the first task only.

   ## See a demo at the end of this documentation applying sequences
   ## with the user_tasks option including chunking.

   use MCE;

   my $mce = MCE->new(
      max_workers => 3,

      sequence    => {
         begin => 10, end => 19, step => 0.7, format => "4.1f"
      },

      user_func   => sub {
         my ($self, $n, $chunk_id) = @_;
         print $n, " from ", $self->wid(), " id ", $chunk_id, "\n";
      }
   );

   $mce->run();

   -- Output (sorted afterwards, notice wid and chunk_id in output)

   10.0 from 1 id 1
   10.7 from 2 id 2
   11.4 from 3 id 3
   12.1 from 1 id 4
   12.8 from 2 id 5
   13.5 from 3 id 6
   14.2 from 1 id 7
   14.9 from 2 id 8
   15.6 from 3 id 9
   16.3 from 1 id 10
   17.0 from 2 id 11
   17.7 from 3 id 12
   18.4 from 1 id 13

USER_TASKS

   ## The 1.2 release introduced the user_tasks option to allow for
   ## flexible workers. This takes an array of tasks. Each task allows
   ## for 7 options:
   ##   max_workers, use_threads, user_begin, user_func, user_end,
   ##   sequence, and chunk_size (the latter 2 coming from 1.3)
   ##
   ## In addition, task_end can be specified for the manager process
   ## to run when workers under task have completed processing.
   ##
   ## Forking and threading can be inter-mixed among tasks except under
   ## Cygwin. Input data (if defined) can be processed by the first
   ## task only. The run method will remain running until all workers
   ## have completed processing.
   ##
   ## Visit this URL for further reading on the new user_tasks option.
   ## L<http://code.google.com/p/many-core-engine-perl/wiki/MCE_Tasks>

   use MCE;

   my $mce = MCE->new(
      input_data => $list_file,

      user_tasks => [{
         max_workers => 2,
         user_func   => \&parallel_task1,
         use_threads => 0,
         task_end    => sub {
            print "Task 1 completed processing\n";
         }

      },{
         max_workers => 4,
         user_func   => \&parallel_task2,
         use_threads => 1

      }]
   );

   $mce->run();

SYNTAX for ON_POST_EXIT

   ## Executed following a worker exiting (exit, $self->exit, or die).
   ## e.g. $pid: PID_1234 or THR_123

   sub on_post_exit {

      my ($self, $e) = @_;

      print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
   }

   my $mce = MCE->new(

      user_func => sub {

         my $self = $_[0]; $self->exit(0, 'ok', 'pebbles');
      }
   );

   $mce->run();

   -- Output

   2: PID_7625: 0: ok: pebbles

SYNTAX for ON_POST_RUN

   ## Executed after running ($self->process or $self->run). The
   ## on_post_exit above is called immediately after a worker exits,
   ## whereas on_post_run below is called after running. The method
   ## receives an array reference of hashes.

   sub on_post_run {

      my ($self, $status_ref) = @_;

      foreach my $e ( @{ $status_ref } ) {
         print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
      }
   }

SYNTAX for USER_BEGIN and USER_END

   ## Both user_begin and user_end functions, if specified, behave
   ## similarly to awk 'BEGIN { ... } { ... } END { ... }'.

   ## Each worker calls this once prior to processing.

   sub user_begin {                   ## Optional via user_begin option

      my $self = shift;

      ## Prefix variables with wk_
      $self->{wk_total_rows} = 0;
   }

   ## And once after completion.

   sub user_end {                     ## Optional via user_end option

      my $self = shift;

      printf "## %d: Processed %d rows\n",
         $self->wid(), $self->{wk_total_rows};
   }

SYNTAX for USER_FUNC with USE_SLURPIO => 0

   ## MCE passes a reference to an array containing the chunk data.
   ## e.g. $chunk_ref = [ record1, record2, record3, ... ]

   sub user_func {

      my ($self, $chunk_ref, $chunk_id) = @_;

      foreach my $row ( @{ $chunk_ref } ) {
         print $row;
         $self->{wk_total_rows} += 1;
      }
   }

SYNTAX for USER_FUNC with USE_SLURPIO => 1

   ## MCE passes a reference to a scalar containing the raw chunk data.
   ## e.g. $chunk_ref = \$chunk_slurped_data

   sub user_func {

      my ($self, $chunk_ref, $chunk_id) = @_;

      my $count = () = $$chunk_ref =~ /abc/;
   }

SYNTAX for USER_ERROR and USER_OUTPUT

   ## MCE will direct $self->sendto('stderr/out', ...) calls to these
   ## functions in a serialized fashion. This is handy if one wants to
   ## filter, modify, and/or direct the data elsewhere.

   sub user_error {                   ## Optional via user_error option

      my $error = shift;

      print LOGERR $error;
   }

   sub user_output {                  ## Optional via user_output option

      my $output = shift;

      print LOGOUT $output;
   }

METHODS for MANAGER PROCESS and WORKERS ^

Methods listed below are callable by the main process and workers.

abort ( void )

   ## Notifies workers to abort after processing the current chunk.
   ## The abort method is applicable when processing input_data only.
   ##
   ## Workers write the next offset position to the queue socket for
   ## the next available worker. In essense, the abort method writes
   ## the last offset position. Workers, on requesting the next offset
   ## position, will think the end of input_data has been reached and
   ## leave the chunking loop.

   $self->abort();

chunk_size ( void )

   ## Returns the chunk_size used by MCE.

   my $chunk_size = $self->chunk_size();

task_id ( void )

   ## Returns the task ID. Applies to user_tasks option.
   ## Starts at 0 (array index for user_tasks).

   my $task_id = $self->task_id();

task_wid ( void )

   ## Returns the task worker ID. Applies to user_tasks option.
   ## Starts at 1 per each task configured with user_tasks.
   ## The value is set to 0 for the manager process.

   my $task_wid = $self->task_wid();

tmp_dir ( void )

   ## Returns the temporary directory used by MCE.

   my $tmp_dir = $self->tmp_dir();

wid ( void )

   ## Returns the MCE worker ID. Starts at 1 per MCE instance.
   ## The value is set to 0 for the manager process.

   my $wid = $self->wid();

METHODS for MANAGER PROCESS only ^

Methods listed below are callable by the main process only.

forchunk ( $input_data [, { options } ], sub { ... } )

   ## Forchunk, foreach, and forseq are sugar methods. Workers are
   ## automatically spawned, the code block is executed in parallel,
   ## and workers are shut down afterwards. Do not use these methods
   ## if wanting workers to remain alive after processing.
   ##
   ## Specifying options is optional. Valid options are the same as
   ## for the process method.

   my $mce = MCE->new(
      chunk_size  => 20,
      max_workers => $max_workers
   );

   ## Arguments inside code block are the same as for user_func.

   $mce->forchunk(\@input_data, sub {
      my ($self, $chunk_ref, $chunk_id) = @_;

      foreach ( @{ $chunk_ref } ) {
         $self->sendto("stdout", "$chunk_id: $_\n");
      }
   });

   ## Passing chunk_size as an option.

   $mce->forchunk(\@input_data, { chunk_size => 30 }, sub {
      ...
   });

foreach ( $input_data [, { options } ], sub { ... } )

   ## Foreach implies chunk_size => 1 (cannot be overwritten).

   my $mce = MCE->new(
      max_workers => $max_workers
   );

   ## Arguments inside code block are the same as for user_func.
   ## This holds true even if chunk_size is set to 1. MCE is both
   ## a chunking engine plus parallel engine all in one. Arguments
   ## within the block are the same whether chunking is 1 or > 1.

   $mce->foreach(\@input_data, sub {
      my ($self, $chunk_ref, $chunk_id) = @_;
      my $row = $chunk_ref->[0];
      $self->sendto("stdout", "$chunk_id: $row\n");
   });

   ## Passing an anonymous array as input data. For example,
   ## wanting to parallelize a serial for loop with MCE.

   for (my $i = 0; $i < $max; $i++) {
      ...  ## Runs serially
   }
   for my $i (0 .. $max - 1) {
      ...  ## Runs serially
   }

   $mce->foreach([ (0 .. $max - 1) ], sub {
      my ($self, $chunk_ref, $chunk_id) = @_;
      my $i = $chunk_ref->[0];  (OR)  my $i = $chunk_id - 1;
      ...  ## Runs in parallel
   });

forseq ( $sequence_spec [, { options } ], sub { ... } )

   ## Forseq implies chunk_size => 1 (cannot be overwritten).

   my $mce = MCE->new(
      max_workers => 3
   );

   $mce->forseq({ begin => 15, end => 10, step => -1 }, sub {
      my ($self, $n, $chunk_id) = @_;
      print $n, " from ", $self->wid(), "\n";
   });

   $mce->forseq({ begin => 20, end => 40 }, sub {
      my ($self, $n, $chunk_id) = @_;
      my $result = `ping 192.168.1.${n}`;
      ...
   });

process ( $input_data [, { options } ] )

   ## The process method will spawn workers automatically if not already
   ## spawned. It will set input_data => $input_data. It calls run(0) to
   ## not auto-shutdown workers. Specifying options is optional.
   ##
   ## Allowable options { key => value, ... } are:
   ##   chunk_size input_data job_delay spawn_delay submit_delay
   ##   flush_file flush_stderr flush_stdout stderr_file stdout_file
   ##   on_post_exit on_post_run sequence user_begin user_end user_func
   ##   user_error user_output use_slurpio
   ##
   ## Options remain persistent going forward unless changed. Setting
   ## user_begin, user_end, or user_func will cause already spawned
   ## workers to shutdown and re-spawn automatically. Therefore, define
   ## these during instantiation if possible.

   my $mce = MCE->new( ... );

   $mce->spawn();
   $mce->process($array_ref);
   $mce->process($array_ref, { stdout_file => $output_file });
   $mce->shutdown();

restart_worker ( $wid )

   ## One can restart a worker who has died or exited.
   ## The job never ends below due to restarting each time.

   ## Pass the same wid from the worker which has exited.

   my $mce = MCE->new(

      on_post_exit => sub {
         my ($self, $e) = @_;
         print "$e->{wid}: $e->{pid}: status $e->{status}: $e->{msg}";

         $self->restart_worker($e->{wid});
      },

      user_begin => sub {
         my $self = $_[0];
         ## Not interested in die messages going to STDERR.
         ## The die handler calls $self->exit(255, $_[0]).
         close STDERR;
      },

      user_tasks => [{
         max_workers => 5,
         user_func   => sub {
            my $self = $_[0]; sleep $self->wid();
            $self->exit(3, "exited from " . $self->wid() . "\n");
         }
      },{
         max_workers => 4,
         user_func   => sub {
            my $self = $_[0]; sleep $self->wid();
            die("died from " . $self->wid() . "\n");
         }
      }]
   );

   $mce->run();

   -- Output

   1: PID_85388: status 3: exited from 1
   2: PID_85389: status 3: exited from 2
   1: PID_85397: status 3: exited from 1
   3: PID_85390: status 3: exited from 3
   1: PID_85399: status 3: exited from 1
   4: PID_85391: status 3: exited from 4
   2: PID_85398: status 3: exited from 2
   1: PID_85401: status 3: exited from 1
   5: PID_85392: status 3: exited from 5
   1: PID_85404: status 3: exited from 1
   6: PID_85393: status 255: died from 6
   3: PID_85400: status 3: exited from 3
   2: PID_85403: status 3: exited from 2
   1: PID_85406: status 3: exited from 1
   7: PID_85394: status 255: died from 7
   1: PID_85410: status 3: exited from 1
   8: PID_85395: status 255: died from 8
   4: PID_85402: status 3: exited from 4
   2: PID_85409: status 3: exited from 2
   1: PID_85412: status 3: exited from 1
   9: PID_85396: status 255: died from 9
   3: PID_85408: status 3: exited from 3
   1: PID_85416: status 3: exited from 1

   ...

run ( [ $auto_shutdown [, { options } ] ] )

   ## The run method, by default, spawns workers, processes once,
   ## and shuts down workers. Set $auto_shutdown to 0 if not wanting
   ## to auto-shutdown workers after processing (default is 1).
   ##
   ## Specifying options is optional. Valid options are the same as
   ## for the process method.

   my $mce = MCE->new( ... );

   $mce->run(0);                         ## Disables auto-shutdown

shutdown ( void )

   ## The run method will automatically spawn workers, run once, and
   ## shutdown workers automatically. The process method leaves workers
   ## waiting for the next job after processing. Call shutdown after
   ## processing all jobs.

   my $mce = MCE->new( ... );

   $mce->spawn();

   $mce->process(\@input_data_1);        ## Processing multiple arrays
   $mce->process(\@input_data_2);
   $mce->process(\@input_data_n);

   $mce->process('input_file_1');        ## Processing multiple files
   $mce->process('input_file_2');
   $mce->process('input_file_n');

   $mce->shutdown();

spawn ( void )

   ## Workers are normally spawned automatically. The spawn method is
   ## beneficial when wanting to spawn workers early.

   my $mce = MCE->new( ... );

   $mce->spawn();

METHODS for WORKERS only ^

Methods listed below are callable by workers only.

do ( 'callback_func' [, $arg1, ... ] )

   ## MCE can serialized data transfers from worker processes via
   ## helper functions do & sendto. The main MCE thread will process
   ## these in a serial fashion. This utilizes the Storable Perl module
   ## for passing data from a worker process to the main MCE thread.
   ## The callback function can optionally return a reply.

   [ $reply = ] $self->do('callback' [, $arg1, ... ]);

   ## Passing args to a callback function using references & scalar.

   sub callback {
      my ($array_ref, $hash_ref, $scalar_ref, $scalar) = @_;
      ...
   }

   $self->do('main::callback', \@a, \%h, \$s, 'hello');
   $self->do('callback', \@a, \%h, \$s, 'hello');

   ## MCE knows if wanting a void, list, hash, or a scalar return value.

   $self->do('callback' [, $arg1, ... ]);

   my @array  = $self->do('callback' [, $arg1, ... ]);
   my %hash   = $self->do('callback' [, $arg1, ... ]);
   my $scalar = $self->do('callback' [, $arg1, ... ]);

exit ( [ $status [, $message [, $id ] ] ] )

   ## Worker exits entirely from MCE. $id (optional) can be used to pass
   ## the primary key or a string along with the message. Please look at
   ## the on_post_exit and on_post_run options for callback support.

   $self->exit();          ## default is 0
   $self->exit(1);
   $self->exit(2, 'error: entire chunk failed', $chunk_id);
   $self->exit(0, 'ok', 'pebbles');

last ( void )

   ## Worker immediately leaves the chunking loop or user func. Can
   ## be called from inside foreach, forchunk, forseq, and user_func.

   my @list = (1 .. 80);

   $mce->forchunk(\@list, { chunk_size => 2 }, sub {

      my ($self, $chunk_ref, $chunk_id) = @_;
      $self->last if ($chunk_id > 4);

      my @output = ();

      foreach my $rec ( @{ $chunk_ref } ) {
         push @output, $rec, "\n";
      }

      $self->sendto('stdout', @output);
   });

   -- Output (each chunk above consists of 2 elements)

   1
   2
   3
   4
   5
   6
   7
   8

next ( void )

   ## Worker starts the next iteration of the chunking loop. Can be
   ## called from inside foreach, forchunk, forseq, and user_func.

   my @list = (1 .. 80);

   $mce->forchunk(\@list, { chunk_size => 4 }, sub {

      my ($self, $chunk_ref, $chunk_id) = @_;
      $self->next if ($chunk_id < 20);

      my @output = ();

      foreach my $rec ( @{ $chunk_ref } ) {
         push @output, $rec, "\n";
      }

      $self->sendto('stdout', @output);
   });

   -- Output (each chunk above consists of 4 elements)

   77
   78
   79
   80

sendto ( 'to_string', $arg1, ... )

The sendto method is called by workers to serialize data to standard output, standard error, or to end of file. The action is done by the main process or thread.

Release 1.100 added the ability to pass multiple arguments.

syntax for 1.00x

   ## Release 1.00x supported only 1 data argument.
   ## /path/to/file is the 3rd argument for 'file'.

   $self->sendto('stdout', \@array);
   $self->sendto('stdout', \$scalar);
   $self->sendto('stdout', $scalar);

   $self->sendto('stderr', \@array);
   $self->sendto('stderr', \$scalar);
   $self->sendto('stderr', $scalar);

   $self->sendto('file', \@array, '/path/to/file');
   $self->sendto('file', \$scalar, '/path/to/file');
   $self->sendto('file', $scalar, '/path/to/file');

syntax for 1.100 and later releases

   ## Notice the syntax change for appending to a file.

   $self->sendto('stdout', $arg1 [, $arg2, ... ]);
   $self->sendto('stderr', $arg1 [, $arg2, ... ]);
   $self->sendto('file:/path/to/file', $arg1 [, $arg2, ... ]);

   ## Passing a reference is no longer necessary beginning with 1.100.

   $self->sendto("stdout", @a, "\n", %h, "\n", $s, "\n");

   ## To retain 1.00x compatibility, sendto outputs the content when a
   ## a single data argument is specified and is a reference.

   $self->sendto('stdout', \@array);
   $self->sendto('stderr', \$scalar);
   $self->sendto('file:/path/to/file', \@array);

   ## Otherwise, the reference for \@array and \$scalar is shown,
   ## not the content. Basically, output matches the print statement.
   ## Ex. print STDOUT "hello\n", \@array, \$scalar, "\n";

   $self->sendto('stdout', "hello\n", \@array, \$scalar, "\n");

EXAMPLES ^

MCE comes with various examples showing real-world use case scenarios on parallelizing something as small as cat (try with -n) to greping for patterns and word count aggregation.

INCLUDED with DISTRIBUTION

   cat.pl    Concatenation script, similar to the cat binary.
   egrep.pl  Egrep script, similar to the egrep binary.
   wc.pl     Word count script, similar to the wc binary.

   findnull.pl
             A parallel driven script to report lines containing
             null fields. It's many times faster than the binary
             egrep command. Try against a large file containing
             very long lines.

   scaling_pings.pl
             Perform ping test and report back failing IPs to
             standard output.

   seq_demo.pl
             A demonstration of the new sequence option appearing
             in MCE 1.300. Run with seq_demo.pl | sort

   tbray/wf_mce1.pl, wf_mce2.pl, wf_mce3.pl
             An implementation of wide finder utilizing MCE.
             As fast as MMAP IO when file resides in OS FS cache.
             2x ~ 3x faster when reading directly from disk.

   foreach.pl, forseq.pl, forchunk.pl
             These take the same sqrt example from Parallel::Loops
             and measures the overhead of the engine. The number
             indicates the size of @input which can be submitted
             and results displayed in under 1 second.

             Parallel::Loops is based on Parallel::ForkManager.
             MCE utilizes a pool of workers.

             Parallel::Loops:     600  Forking each @input is expensive
             MCE foreach....:  18,000  Sends result after each @input
             MCE forseq.....:  60,000  Loops through sequence of numbers
             MCE forchunk...: 385,000  Chunking reduces overhead

CHUNK_SIZE => 1 (in essence, wanting no chunking on input data)

   ## Imagine a long running process and wanting to parallelize an array
   ## against a pool of workers.

   ## 3.0: The sequence option can be used when knowing the sequence of
   ## numbers up front: sequence => { begin => 0, end => 18000 - 1 }

   my @input_data  = (0 .. 18000 - 1);
   my $max_workers = 3;
   my $order_id    = 1;
   my %result;

   ## Callback function for displaying results. The logic below shows
   ## how one can display results immediately while still preserving
   ## output order. The %result hash is a temporary cache to store
   ## results for out-of-order replies.

   sub display_result {

      my ($wk_result, $chunk_id) = @_;
      $result{$chunk_id} = $wk_result;

      while (1) {
         last unless (exists $result{$order_id});

         printf "i: %d sqrt(i): %f\n",
            $input_data[$order_id - 1], $result{$order_id};

         delete $result{$order_id};
         $order_id++;
      }
   }

   ## Compute via MCE.

   my $mce = MCE->new(
      input_data  => \@input_data,
      max_workers => $max_workers,
      chunk_size  => 1,

      user_func => sub {

         my ($self, $chunk_ref, $chunk_id) = @_;
         my $wk_result = sqrt($chunk_ref->[0]);

         $self->do('display_result', $wk_result, $chunk_id);
      }
   );

   $mce->run();

FOREACH sugar METHOD

   ## Compute via MCE. Foreach implies chunk_size => 1.

   my $mce = MCE->new(
      max_workers => $max_workers
   );

   ## Worker calls code block passing a reference to an array containing
   ## one item. Use $chunk_ref->[0] to retrieve the single element.

   $mce->foreach(\@input_data, sub {

      my ($self, $chunk_ref, $chunk_id) = @_;
      my $wk_result = sqrt($chunk_ref->[0]);

      $self->do('display_result', $wk_result, $chunk_id);
   });

CHUNKING INPUT_DATA

   ## Chunking reduces overhead many folds. Instead of passing a single
   ## item from @input_data, a chunk of $chunk_size is sent to the next
   ## available worker.

   ## 3.0: The sequence option can be used when knowing the sequence of
   ## numbers up front: sequence => { begin => 0, end => 385000 - 1 }

   my @input_data  = (0 .. 385000 - 1);
   my $max_workers = 3;
   my $chunk_size  = 500;
   my $order_id    = 1;
   my %result;

   ## Callback function for displaying results.

   sub display_result {

      my ($wk_result, $chunk_id) = @_;
      $result{$chunk_id} = $wk_result;

      while (1) {
         last unless (exists $result{$order_id});
         my $i = ($order_id - 1) * $chunk_size;

         foreach ( @{ $result{$order_id} } ) {
            printf "i: %d sqrt(i): %f\n", $input_data[$i++], $_;
         }

         delete $result{$order_id};
         $order_id++;
      }
   }

   ## Compute via MCE.

   my $mce = MCE->new(
      input_data  => \@input_data,
      max_workers => $max_workers,
      chunk_size  => $chunk_size,

      user_func => sub {

         my ($self, $chunk_ref, $chunk_id) = @_;
         my @wk_result;

         foreach ( @{ $chunk_ref } ) {
            push @wk_result, sqrt($_);
         }

         $self->do('display_result', \@wk_result, $chunk_id);
      }
   );

   $mce->run();

FORCHUNK sugar METHOD

   ## Compute via MCE.

   my $mce = MCE->new(
      max_workers => $max_workers,
      chunk_size  => $chunk_size
   );

   ## Below, $chunk_ref is a reference to an array containing the next
   ## $chunk_size items from @input_data.

   $mce->forchunk(\@input_data, sub {

      my ($self, $chunk_ref, $chunk_id) = @_;
      my @wk_result;

      foreach ( @{ $chunk_ref } ) {
         push @wk_result, sqrt($_);
      }

      $self->do('display_result', \@wk_result, $chunk_id);
   });

DEMO APPLYING SEQUENCES WITH USER_TASKS

One may specify the sequence option per each task. The following is taken directly from the seq_demo.pl example. Think of the following demonstration as having 3 mini-MCEs running simultaneously in parallel. Chunking can also be configured independently as well.

   use MCE;

   ## Run with seq_demo.pl | sort

   sub user_func {
      my ($self, $seq_n, $chunk_id) = @_;

      my $wid      = $self->wid();
      my $task_id  = $self->task_id();
      my $task_wid = $self->task_wid();

      if (ref $seq_n eq 'ARRAY') {
         ## Received the next "chunked" sequence of numbers
         ## e.g. when chunk_size > 1, $seq_n will be an array ref above

         foreach (@{ $seq_n }) {
            printf(
               "task_id %d: seq_n %s: chunk_id %d: wid %d: task_wid %d\n",
               $task_id,    $_,       $chunk_id,   $wid,   $task_wid
            );
         }
      }
      else {
         printf(
            "task_id %d: seq_n %s: chunk_id %d: wid %d: task_wid %d\n",
            $task_id,    $seq_n,   $chunk_id,   $wid,   $task_wid
         );
      }
   }

   ## Each task can be configured independently.

   my $mce = MCE->new(
      user_tasks => [{
         max_workers => 2,
         chunk_size  => 1,
         sequence    => { begin => 11, end => 19, step => 1 },
         user_func   => \&user_func
      },{
         max_workers => 2,
         chunk_size  => 5,
         sequence    => { begin => 21, end => 29, step => 1 },
         user_func   => \&user_func
      },{
         max_workers => 2,
         chunk_size  => 3,
         sequence    => { begin => 31, end => 39, step => 1 },
         user_func   => \&user_func
      }]
   );

   $mce->run();

   -- Output

   task_id 0: seq_n 11: chunk_id 1: wid 1: task_wid 1
   task_id 0: seq_n 12: chunk_id 2: wid 2: task_wid 2
   task_id 0: seq_n 13: chunk_id 3: wid 1: task_wid 1
   task_id 0: seq_n 14: chunk_id 4: wid 2: task_wid 2
   task_id 0: seq_n 15: chunk_id 5: wid 1: task_wid 1
   task_id 0: seq_n 16: chunk_id 6: wid 2: task_wid 2
   task_id 0: seq_n 17: chunk_id 7: wid 1: task_wid 1
   task_id 0: seq_n 18: chunk_id 8: wid 2: task_wid 2
   task_id 0: seq_n 19: chunk_id 9: wid 1: task_wid 1
   task_id 1: seq_n 21: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 22: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 23: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 24: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 25: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 26: chunk_id 2: wid 4: task_wid 2
   task_id 1: seq_n 27: chunk_id 2: wid 4: task_wid 2
   task_id 1: seq_n 28: chunk_id 2: wid 4: task_wid 2
   task_id 1: seq_n 29: chunk_id 2: wid 4: task_wid 2
   task_id 2: seq_n 31: chunk_id 1: wid 5: task_wid 1
   task_id 2: seq_n 32: chunk_id 1: wid 5: task_wid 1
   task_id 2: seq_n 33: chunk_id 1: wid 5: task_wid 1
   task_id 2: seq_n 34: chunk_id 2: wid 6: task_wid 2
   task_id 2: seq_n 35: chunk_id 2: wid 6: task_wid 2
   task_id 2: seq_n 36: chunk_id 2: wid 6: task_wid 2
   task_id 2: seq_n 37: chunk_id 3: wid 5: task_wid 1
   task_id 2: seq_n 38: chunk_id 3: wid 5: task_wid 1
   task_id 2: seq_n 39: chunk_id 3: wid 5: task_wid 1

MULTIPLE WORKERS RUNNING IN PARALLEL

Both input_data and sequence options are optional in MCE. One can simply use MCE to parallelize multiple workers. The "do" & "sendto" methods can be used to pass data back to the manager process. One doesn't have to wait until the worker has completed processing to pass data back. Both "do" & "sendto" methods are processed serially by the main process on a first come, first serve basis. All 4 workers run in parallel for the demonstration below.

   use MCE;

   sub report_stats {
      my ($wid, $msg, $hash_ref) = @_;
      print "Worker $wid says $msg: ", $hash_ref->{'counter'}, "\n";
   }

   my $mce = MCE->new(
      max_workers => 4,

      user_func => sub {
         my ($self) = @_;
         my $wid = $self->wid();

         if ($wid == 1) {
            my %hash = ('counter' => 0);
            while (1) {
               $hash{'counter'} += 1;
               $self->do('report_stats', $wid, 'Hello there', \%hash);
               last if ($hash{'counter'} == 4);
               sleep 2;
            }
         }

         else {
            my %hash = ('counter' => 0);
            while (1) {
               $hash{'counter'} += 1;
               $self->do('report_stats', $wid, 'Welcome ...', \%hash);
               last if ($hash{'counter'} == 2);
               sleep 4;
            }
         }

         $self->sendto('stdout', "Worker $wid is exiting\n");
      }
   );

   $mce->run;

   Worker 2 gets there first in 2nd output below.

   $ ./demo.pl
   Worker 1 says Hello there: 1
   Worker 2 says Welcome ...: 1
   Worker 3 says Welcome ...: 1
   Worker 4 says Welcome ...: 1
   Worker 1 says Hello there: 2
   Worker 2 says Welcome ...: 2
   Worker 3 says Welcome ...: 2
   Worker 1 says Hello there: 3
   Worker 2 is exiting
   Worker 3 is exiting
   Worker 4 says Welcome ...: 2
   Worker 4 is exiting
   Worker 1 says Hello there: 4
   Worker 1 is exiting

   $ ./demo.pl
   Worker 2 says Welcome ...: 1
   Worker 1 says Hello there: 1
   Worker 4 says Welcome ...: 1
   Worker 3 says Welcome ...: 1
   Worker 1 says Hello there: 2
   Worker 2 says Welcome ...: 2
   Worker 4 says Welcome ...: 2
   Worker 3 says Welcome ...: 2
   Worker 2 is exiting
   Worker 4 is exiting
   Worker 1 says Hello there: 3
   Worker 3 is exiting
   Worker 1 says Hello there: 4
   Worker 1 is exiting

REQUIREMENTS ^

Perl 5.8.0 or later

SEE ALSO ^

MCE::Signal

SOURCE ^

The source is hosted at: http://code.google.com/p/many-core-engine-perl/

AUTHOR ^

Mario E. Roy, <marioeroy AT gmail DOT com>

COPYRIGHT AND LICENSE ^

Copyright (C) 2012 by Mario E. Roy

MCE is free software; you can redistribute it and/or modify it under the same terms as Perl itself http://dev.perl.org/licenses/.

syntax highlighting: