Mario Roy > MCE > MCE::Examples

Download:
MCE-1.514.tar.gz

Annotate this POD

Website

CPAN RT

Open  0
View/Report Bugs
Source  

NAME ^

MCE::Examples - A list of examples demonstrating Many-core Engine

VERSION ^

This document describes MCE::Examples version 1.514

DESCRIPTION ^

MCE comes with various examples showing real-world scenarios on parallelizing something as small as cat (try with -n) to searching for patterns and word count aggregation.

INCLUDED WITH THE DISTRIBUTION ^

   bin/mce_grep
             A wrapper script with support for the following C binaries.
                agrep, grep, egrep, fgrep, and tre-agrep

             Chunking may be applied either at the [file] level, for
             large file(s), or at the [list] level when parsing many
             files recursively.

             The gain in performance is noticeable for expensive
             patterns, especially with agrep and tre-agrep.

   barrier_sync.pl
             A barrier sync demonstration.

   cat.pl    Concatenation script, similar to the cat binary.
   egrep.pl  Egrep script, similar to the egrep binary.
   wc.pl     Word count script, similar to the wc binary.

   findnull.pl
             A parallel script for reporting lines containing null
             fields. It is many times faster than the egrep binary.
             Try this against a large file containing very long lines.

   flow_model.pl
             Demonstrates MCE::Flow, MCE::Queue, and MCE->gather.

   foreach.pl, forseq.pl, forchunk.pl
             These take the same sqrt example from Parallel::Loops and
             measures the overhead of the engine. The number indicates
             the size of @input which can be submitted and displayed
             in under 1 second.

             Parallel::Loops is based on Parallel::ForkManager. MCE
             utilizes a pool of workers which persist while running.

             Parallel::Loops:     600  Forking each @input is expensive
             MCE foreach....:  34,000  Sends result after each @input
             MCE forseq.....:  70,000  Loops through sequence of numbers
             MCE forchunk...: 480,000  Chunking reduces overhead

   interval.pl
             Demonstration of the interval option appearing in MCE 1.5.

   iterator.pl
             Similar to forseq.pl. Specifies an iterator for input_data.
             A factory function is called which returns a closure (the
             iterator itself).

   matmult/matmult_base.pl, matmult_mce.pl, strassen_mce.pl
             Various matrix multiplication demonstrations benchmarking
             PDL, PDL + MCE, as well as parallelizing Strassen's
             divide-and-conquer algorithm. Also included are 2 plain
             Perl examples.

   pipe1.pl, pipe2.pl
             Process STDIN or FILE in parallel. Processing is via Perl
             for pipe1.pl, whereas an external command for pipe2.pl.

   scaling_pings.pl
             Perform ping test and report back failed IPs to standard
             output.

   seq_demo.pl
             A demonstration of the new sequence option appearing
             in MCE 1.3. Run with seq_demo.pl | sort

   tbray/wf_mce1.pl, wf_mce2.pl, wf_mce3.pl
             An implementation of wide finder utilizing MCE.
             As fast as MMAP IO when file resides in OS FS cache.
             2x ~ 3x faster when reading directly from disk.

   utf8.pl
             A demonstration of MCE processing unicode input_data.

CHUNK_SIZE => 1 (in essence, wanting no chunking on input data) ^

Imagine a long running process and wanting to parallelize an array against a pool of workers. Note: The sequence option may be used if simply wanting to loop through a sequence of numbers instead.

Below, a callback function is used for displaying results. The logic shows how one can output results immediately while still preserving order as if processing serially. The %tmp hash is a temporary cache for out-of-order results.

   my @input_data  = (0 .. 18000 - 1);

   ## Make an output iterator for gather. Output order is preserved.

   sub output_iterator {
      my %tmp; my $order_id = 1;

      return sub {
         my ($result, $chunk_id) = @_;
         $tmp{$chunk_id} = $result;

         while (1) {
            last unless (exists $tmp{$order_id});

            printf "i: %d sqrt(i): %f\n",
               $input_data[$order_id - 1], $tmp{$order_id};

            delete $tmp{$order_id++};
         }
      };
   }

   ## Use $chunk_ref->[0] or $_ to retrieve the element.

   my $mce = MCE->new(
      input_data => \@input_data, gather => output_iterator(),
      chunk_size => 1, max_workers => 3,

      user_func => sub {
         my ($mce, $chunk_ref, $chunk_id) = @_;
         my $result = sqrt($chunk_ref->[0]);

         MCE->gather($result, $chunk_id);
      }
   );

   MCE->run();

This does the same thing using the foreach "sugar" method.

   my $mce = MCE->new(
      chunk_size => 1, max_workers => 3,
      gather => output_iterator()
   );

   MCE->foreach( \@input_data, sub {
      my ($mce, $chunk_ref, $chunk_id) = @_;
      my $result = sqrt($chunk_ref->[0]);

      MCE->gather($result, $chunk_id);
   });

The two examples described above were done using the core API. MCE 1.5 comes with various models. The MCE::Loop model is used for the next demonstration.

   use MCE::Loop;

   MCE::Loop::init {
      chunk_size => 1, max_workers => 3,
      gather => output_iterator()
   };

   mce_loop {
      my ($mce, $chunk_ref, $chunk_id) = @_;
      my $result = sqrt($chunk_ref->[0]);

      MCE->gather($result, $chunk_id);

   } @input_data;

CHUNKING INPUT_DATA ^

Chunking has the effect of reducing the IPC overhead by many folds. A chunk containing up to $chunk_size items is sent to the next available worker.

   my @input_data  = (0 .. 6000 - 1);
   my $chunk_size  = 500;
   my $max_workers = 3;

   ## Make an output iterator for gather. Output order is preserved.

   sub output_iterator {
      my %tmp; my $order_id = 1;

      return sub {
         my ($result, $chunk_id) = @_;
         $tmp{$chunk_id} = $result;

         while (1) {
            last unless (exists $tmp{$order_id});
            my $i = ($order_id - 1) * $chunk_size;

            foreach ( @{ $tmp{$order_id} } ) {
               printf "i: %d sqrt(i): %f\n", $input_data[$i++], $_;
            }

            delete $tmp{$order_id++};
         }
      };
   }

   ## Use $chunk_ref or $_ to access the array reference.
   ## Chunking requires one to loop inside the code block.

   my $mce = MCE->new(
      input_data => \@input_data, gather => output_iterator(),
      chunk_size => $chunk_size, max_workers => $max_workers,

      user_func => sub {
         my ($mce, $chunk_ref, $chunk_id) = @_;
         my @result;

         foreach ( @{ $chunk_ref } ) {
            push @result, sqrt($_);
         }

         MCE->gather(\@result, $chunk_id);
      }
   );

   MCE->run();

This does the same thing using the forchunk "sugar" method.

   my $mce = MCE->new(
      chunk_size => $chunk_size, max_workers => $max_workers,
      gather => output_iterator()
   );

   MCE->forchunk( \@input_data, sub {
      my ($mce, $chunk_ref, $chunk_id) = @_;
      my @result;

      foreach ( @{ $chunk_ref } ) {
         push @result, sqrt($_);
      }

      MCE->gather(\@result, $chunk_id);
   });

The MCE::Loop model is shown next. Looping is required inside the code block, just like above.

   use MCE::Loop;

   MCE::Loop::init {
      chunk_size => $chunk_size, max_workers => $max_workers,
      gather => output_iterator()
   };

   mce_loop {
      my ($mce, $chunk_ref, $chunk_id) = @_;
      my @result;

      foreach ( @{ $chunk_ref } ) {
         push @result, sqrt($_);
      }

      MCE->gather(\@result, $chunk_id);

   } @input_data;

DEMO APPLYING SEQUENCES WITH USER_TASKS ^

The following is an extract from the seq_demo.pl example included with MCE. Think of having several MCEs running simultaneously in parallel. The sequence option including chunk_size may be specified individually under each task.

The input scalar $_ (not shown below) contains the same value as $seq_n.

   use MCE;

   ## Run with seq_demo.pl | sort

   sub user_func {
      my ($mce, $seq_n, $chunk_id) = @_;

      my $wid      = MCE->wid();
      my $task_id  = MCE->task_id();
      my $task_wid = MCE->task_wid();

      if (ref $seq_n eq 'ARRAY') {
         ## Received the next "chunked" sequence of numbers
         ## e.g. when chunk_size > 1, $seq_n will be an array ref above

         foreach (@{ $seq_n }) {
            printf(
               "task_id %d: seq_n %s: chunk_id %d: wid %d: task_wid %d\n",
               $task_id,    $_,       $chunk_id,   $wid,   $task_wid
            );
         }
      }
      else {
         printf(
            "task_id %d: seq_n %s: chunk_id %d: wid %d: task_wid %d\n",
            $task_id,    $seq_n,   $chunk_id,   $wid,   $task_wid
         );
      }
   }

   ## Each task is configured independently.

   my $mce = MCE->new(
      user_tasks => [{
         max_workers => 2,
         chunk_size  => 1,
         sequence    => { begin => 11, end => 19, step => 1 },
         user_func   => \&user_func
      },{
         max_workers => 2,
         chunk_size  => 5,
         sequence    => { begin => 21, end => 29, step => 1 },
         user_func   => \&user_func
      },{
         max_workers => 2,
         chunk_size  => 3,
         sequence    => { begin => 31, end => 39, step => 1 },
         user_func   => \&user_func
      }]
   );

   MCE->run();

   -- Output

   task_id 0: seq_n 11: chunk_id 1: wid 1: task_wid 1
   task_id 0: seq_n 12: chunk_id 2: wid 2: task_wid 2
   task_id 0: seq_n 13: chunk_id 3: wid 1: task_wid 1
   task_id 0: seq_n 14: chunk_id 4: wid 2: task_wid 2
   task_id 0: seq_n 15: chunk_id 5: wid 1: task_wid 1
   task_id 0: seq_n 16: chunk_id 6: wid 2: task_wid 2
   task_id 0: seq_n 17: chunk_id 7: wid 1: task_wid 1
   task_id 0: seq_n 18: chunk_id 8: wid 2: task_wid 2
   task_id 0: seq_n 19: chunk_id 9: wid 1: task_wid 1
   task_id 1: seq_n 21: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 22: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 23: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 24: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 25: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 26: chunk_id 2: wid 4: task_wid 2
   task_id 1: seq_n 27: chunk_id 2: wid 4: task_wid 2
   task_id 1: seq_n 28: chunk_id 2: wid 4: task_wid 2
   task_id 1: seq_n 29: chunk_id 2: wid 4: task_wid 2
   task_id 2: seq_n 31: chunk_id 1: wid 5: task_wid 1
   task_id 2: seq_n 32: chunk_id 1: wid 5: task_wid 1
   task_id 2: seq_n 33: chunk_id 1: wid 5: task_wid 1
   task_id 2: seq_n 34: chunk_id 2: wid 6: task_wid 2
   task_id 2: seq_n 35: chunk_id 2: wid 6: task_wid 2
   task_id 2: seq_n 36: chunk_id 2: wid 6: task_wid 2
   task_id 2: seq_n 37: chunk_id 3: wid 5: task_wid 1
   task_id 2: seq_n 38: chunk_id 3: wid 5: task_wid 1
   task_id 2: seq_n 39: chunk_id 3: wid 5: task_wid 1

GLOBALLY SCOPED VARIABLES AND MCE MODELS ^

It's possible that Perl may create a new code ref on subsequent runs causing MCE models to re-spawn. One solution to this is to declare global variables, referenced by workers, with "our" instead of "my".

Let's take a look. The $i variable is declared with my and being reference in both user_begin and mce_loop blocks. This will cause Perl to create a new code ref for mce_loop on subsequent runs.

   use MCE::Loop;

   my $i = 0;   ## <-- this is the reason, try our instead

   MCE::Loop::init {
      user_begin => sub {
         print "process_id: $$\n" if MCE->wid() == 1;
         $i++;
      },
      chunk_size => 1, max_workers => 'auto',
   };

   for (1..2) {
      ## Perl creates another code block ref causing workers
      ## to re-spawn on subsequent runs.
      print "\n"; mce_loop { print "$i: $_\n" } 1..4;
   }

   MCE::Loop::finish;

   -- Output

   process_id: 51380
   1: 1
   1: 2
   1: 3
   1: 4

   process_id: 51388
   1: 1
   1: 2
   1: 3
   1: 4

By making the one line change, we see that workers persist for the duration of the script.

   use MCE::Loop;

   our $i = 0;  ## <-- changed my to our

   MCE::Loop::init {
      user_begin => sub {
         print "process_id: $$\n" if MCE->wid() == 1;
         $i++;
      },
      chunk_size => 1, max_workers => 'auto',
   };

   for (1..2) {
      ## Workers persist between runs. No re-spawning.
      print "\n"; mce_loop { print "$i: $_\n" } 1..4;
   }

   -- Output

   process_id: 51457
   1: 1
   1: 2
   1: 4
   1: 3

   process_id: 51457
   2: 1
   2: 2
   2: 3
   2: 4

One may alternatively specify a code reference to existing routines for user_begin and mce_loop. Take notice of the comma after \&_func though.

   use MCE::Loop;

   my $i = 0;  ## my (ok)

   sub _begin {
      print "process_id: $$\n" if MCE->wid() == 1;
      $i++;
   }
   sub _func {
      print "$i: $_\n";
   }

   MCE::Loop::init {
      user_begin => \&_begin,
      chunk_size => 1, max_workers => 'auto',
   };

   for (1..2) {
      print "\n"; mce_loop \&_func, 1..4;
   }

   MCE::Loop::finish;

   -- Output

   process_id: 51626
   1: 1
   1: 2
   1: 3
   1: 4

   process_id: 51626
   2: 1
   2: 2
   2: 3
   2: 4

MONTE CARLO SIMULATION ^

There is an article on the web (search for comp.lang.perl.misc MCE) suggesting that MCE::Examples does not cover a simple simulation scenario. This section covers just that.

The serial code is based off the one by "gamo". A sleep is added to imitate extra CPU time. The while loop is wrapped within a for loop to run 10 times. The random number generator is seeded as well.

   use Time::HiRes qw(time sleep);

   srand(5906);

   my ($var, $foo, $bar) = (1, 2, 3);
   my ($r, $a, $b);

   my $start = time();

   for (1..10) {
      while (1) {
         $r = rand();

         $a = $r * ($var + $foo + $bar);
         $b = sqrt($var + $foo + $bar);

         last if ($a < $b + 0.001 && $a > $b - 0.001);
         sleep 0.002;
      }

      print "$r -> $a\n";
   }

   my $end = time();

   printf STDERR "\n## compute time: %0.03f secs\n\n", $end - $start;

   -- Output

   0.408246276657106 -> 2.44947765994264
   0.408099657137821 -> 2.44859794282693
   0.408285842931324 -> 2.44971505758794
   0.408342292008765 -> 2.45005375205259
   0.408333076522673 -> 2.44999845913604
   0.408344266898869 -> 2.45006560139321
   0.408084104120526 -> 2.44850462472316
   0.408197400014714 -> 2.44918440008828
   0.408344783704855 -> 2.45006870222913
   0.408248062985479 -> 2.44948837791287

   ## compute time: 82.355 secs

Next, we'd do the same with MCE. This demonstration requires at least MCE 1.509 to run properly. Folks on prior releases (1.505 - 1.508) will not see output for the 2nd run and beyond.

   use Time::HiRes qw(time sleep);
   use MCE::Loop;

   ## Configure MCE. Move common variables inside the user_begin
   ## block when not needed by the manager process.

   MCE::Loop::init {
      user_begin => sub {
         use vars qw($var $foo $bar);
         our ($var, $foo, $bar) = (1, 2, 3);
      },
      chunk_size => 1, max_workers => 'auto',
      input_data => \&_input, gather => \&_gather
   };

   ## Callback functions.

   my ($done, $r, $a);

   sub _input {
      return if $done;
      return rand();
   }

   sub _gather {
      my ($_r, $_a, $_b) = @_;
      return if $done;

      if ($_a < $_b + 0.001 && $_a > $_b - 0.001) {
         ($done, $r, $a) = (1, $_r, $_a);
      }
      return;
   }

   ## Compute in parallel.

   my $start = time(); srand(5906);

   for (1..10) {
      $done = 0;      ## Reset $done before running

      mce_loop {
       # my ($mce, $chunk_ref, $chunk_id) = @_;
       # my $r = $chunk_ref->[0];

         my $r = $_;  ## Valid due to chunk_size => 1

         my $a = $r * ($var + $foo + $bar);
         my $b = sqrt($var + $foo + $bar);

         MCE->gather($r, $a, $b);
         sleep 0.002;
      };

      print "$r -> $a\n";
   }

   printf "\n## compute time: %0.03f secs\n\n", time() - $start;

   -- Output

   0.408246276657106 -> 2.44947765994264
   0.408099657137821 -> 2.44859794282693
   0.408285842931324 -> 2.44971505758794
   0.408342292008765 -> 2.45005375205259
   0.408333076522673 -> 2.44999845913604
   0.408344266898869 -> 2.45006560139321
   0.408084104120526 -> 2.44850462472316
   0.408197400014714 -> 2.44918440008828
   0.408344783704855 -> 2.45006870222913
   0.408248062985479 -> 2.44948837791287

   ## compute time: 11.290 secs

Well, there you have it. MCE is able to complete the same simulation many times faster.

MULTIPLE WORKERS RUNNING IN PARALLEL ^

There are occasions when one wants several workers to run in parallel without having to specify input_data or seqeunce. These two options are optional in MCE. The "do" and "sendto" methods, for sending data to the manager process, are also demonstrated below. Both are processed serially by the manager process on a first come, first serve basis.

   use MCE;

   sub report_stats {
      my ($wid, $msg, $h_ref) = @_;
      print "Worker $wid says $msg: ", $h_ref->{"counter"}, "\n";
   }

   my $mce = MCE->new(
      max_workers => 4,

      user_func => sub {
         my ($mce) = @_;
         my $wid = MCE->wid();

         if ($wid == 1) {
            my %h = ("counter" => 0);
            while (1) {
               $h{"counter"} += 1;
               MCE->do("report_stats", $wid, "Hey there", \%h);
               last if ($h{"counter"} == 4);
               sleep 2;
            }
         }
         else {
            my %h = ("counter" => 0);
            while (1) {
               $h{"counter"} += 1;
               MCE->do("report_stats", $wid, "Welcome..", \%h);
               last if ($h{"counter"} == 2);
               sleep 4;
            }
         }

         MCE->sendto("STDOUT", "Worker $wid is exiting\n");
      }
   );

   MCE->run;

   -- Output

   Note how worker 2 comes first in the 2nd run below.

   $ ./demo.pl
   Worker 1 says Hey there: 1
   Worker 2 says Welcome..: 1
   Worker 3 says Welcome..: 1
   Worker 4 says Welcome..: 1
   Worker 1 says Hey there: 2
   Worker 2 says Welcome..: 2
   Worker 3 says Welcome..: 2
   Worker 1 says Hey there: 3
   Worker 2 is exiting
   Worker 3 is exiting
   Worker 4 says Welcome..: 2
   Worker 4 is exiting
   Worker 1 says Hey there: 4
   Worker 1 is exiting

   $ ./demo.pl
   Worker 2 says Welcome..: 1
   Worker 1 says Hey there: 1
   Worker 4 says Welcome..: 1
   Worker 3 says Welcome..: 1
   Worker 1 says Hey there: 2
   Worker 2 says Welcome..: 2
   Worker 4 says Welcome..: 2
   Worker 3 says Welcome..: 2
   Worker 2 is exiting
   Worker 4 is exiting
   Worker 1 says Hey there: 3
   Worker 3 is exiting
   Worker 1 says Hey there: 4
   Worker 1 is exiting

INDEX ^

MCE

AUTHOR ^

Mario E. Roy, <marioeroy AT gmail DOT com>

LICENSE ^

This program is free software; you can redistribute it and/or modify it under the terms of either: the GNU General Public License as published by the Free Software Foundation; or the Artistic License.

See http://dev.perl.org/licenses/ for more information.

syntax highlighting: