The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Mojo::IOLoop::HoboProcess - Subprocesses with MCE::Hobo

VERSION

This document describes Mojo::IOLoop::HoboProcess version 0.004.

SYNOPSIS

  use feature 'say';
  use Mojo::IOLoop::HoboProcess;

  # Operation that would block the event loop for 5 seconds
  my $subprocess = Mojo::IOLoop::HoboProcess->new;

  $subprocess->run(
    sub {
      my $subprocess = shift;
      sleep 5;
      return '♥', 'Mojolicious';
    },
    sub {
      my ($subprocess, $err, @results) = @_;
      say "Subprocess error: $err" and return if $err;
      say "I $results[0] $results[1]!";
    }
  );

  # Start event loop if necessary
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

DESCRIPTION

Like Mojo::IOLoop::Subprocess, spawns subprocesses with MCE::Hobo instead.

Mojo::IOLoop::HoboProcess allows Mojo::IOLoop to perform computationally expensive operations in subprocesses, without blocking the event loop.

This module is supported on all platforms where MCE::Shared is supported.

ATTRIBUTES

Mojo::IOLoop::HoboProcess implements the following attribute.

timeout

  my $timeout = $subprocess->timeout;
  $subprocess->timeout(300);

Set to a non-zero value to enable timeout. The default is 0.

ioloop

  my $loop    = $subprocess->ioloop;
  $subprocess = $subprocess->ioloop(Mojo::IOLoop->new);

Event loop object to control, defaults to the global Mojo::IOLoop singleton.

METHODS

Mojo::IOLoop::HoboProcess inherits all methods from Mojo::Base and implements the following new ones.

exit

  sub { $^O eq 'MSWin32' ? shift->exit(0) : exit(0) }, # not recommended
  sub { shift->exit(0) }, # do this instead or call MCE::Hobo->exit(0)

  sub { exit(0) }, # safe on Cygwin and UNIX platforms

Exit a thread (Windows) or process (other platforms).

pid

  my $pid = $subprocess->pid;

  $$.$tid  Windows (only), includes thread id
  $$       Cygwin and UNIX platforms

Process id of the spawned subprocess if available.

run

  $subprocess = $subprocess->run( sub {...}, sub {...} );

Execute the first callback in a child process and wait for it to return one or more values, without blocking "ioloop" in the parent process. Then execute the second callback in the parent process with the results. The return values of the first callback and exceptions thrown by it, is serialized automatically by MCE::Hobo, using Sereal 3.015+ if installed or Storable otherwise.

hoboprocess

The hoboprocess method is injected into Mojo::IOLoop.

  use Mojo::IOLoop;
  use Mojo::IOLoop::HoboProcess;

  my $subprocess = Mojo::IOLoop->hoboprocess( sub {...}, sub {...} );
  my $subprocess = $loop->hoboprocess( sub {...}, sub {...} );

Build Mojo::IOLoop::HoboProcess object to perform computationally expensive operations in subprocesses, without blocking the event loop. Callbacks will be passed along to "run" in Mojo::IOLoop::HoboProcess.

  # Concurrent subprocesses
  my ($fail, $result) = ();

  Mojo::IOLoop->delay(
    sub {
      my $delay = shift;
      Mojo::IOLoop->hoboprocess( sub {1}, $delay->begin );
      Mojo::IOLoop->hoboprocess( sub {2}, $delay->begin );
    },
    sub {
      my ( $delay, $err1, $result1, $err2, $result2 ) = @_;
      $fail = $err1 || $err2;
      $result = [ $result1, $result2 ];
    }
  )->wait;

EXAMPLES

The following is a variation of the synopsis above for demonstrating data sharing between subprocesses.

  use Mojo::Cache;
  use Mojo::IOLoop::HoboProcess;

  use MCE::Shared;

  # Construct a shared cache and counter variable
  my $cache   = MCE::Shared->share( Mojo::Cache->new( max_keys => 50 ) );
  my $counter = MCE::Shared->scalar(0);

  # Also, construct a shared file handle
  mce_open my $OUT, ">", \*STDOUT or die "$!";

  # Operations that would block the event loop for 3+ seconds
  my $subprocess1 = Mojo::IOLoop::HoboProcess->new;
  my $subprocess2 = Mojo::IOLoop::HoboProcess->new;

  $subprocess1->run(
    sub {
      my $subprocess = shift;
      say $OUT "Subprocess [$$] started";
      $cache->set('key1', 'foo');
      sleep 4;
      my $val = $cache->get('key2');
      return "♥", "Mojolicious lots: $val: " . $counter->incr;
    },
    sub {
      my ($subprocess, $err, @results) = @_;
      say $OUT "Subprocess error: $err" and return if $err;
      say $OUT "I $results[0] $results[1]!";
    }
  );

  $subprocess2->run(
    sub {
      my $subprocess = shift;
      say $OUT "Subprocess [$$] started";
      $cache->set('key2', 'baz');
      sleep 3;
      my $val = $cache->get('key1');
      return "♥", "Mojolicious more: $val: " . $counter->incr;
    },
    sub {
      my ($subprocess, $err, @results) = @_;
      say $OUT "Subprocess error: $err" and return if $err;
      say $OUT "I $results[0] $results[1]!";
    }
  );

  # Start event loop if necessary
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

  __END__
  Program output.

  Subprocess [7625] started
  Subprocess [7626] started
  I ♥ Mojolicious more: foo: 1!
  I ♥ Mojolicious lots: baz: 2!

A parallel demonstration for Recurring Monte Carlo Pi Calculation.

  # Original source by Demian Riccardi and Joel Berger.
  # https://gist.github.com/dmr3/e69127ab449bdabd5af7b000c9b5b3b1
  #
  # Parallel demonstration by Mario Roy.

  use Mojolicious::Lite;

  use MCE::Hobo;
  use MCE::Shared;

  any '/' => 'index';

  websocket '/data' => sub {
    my $self = shift;
    my ($pi,$runs,$new_runs) = (0,0,1000000);

    my $timer = Mojo::IOLoop->recurring( 0.1 => sub {
      ($pi,$runs) = calc_pi($pi,$runs,$new_runs);
      $self->send({ json => [$runs,$pi] });
    });

    $self->on( finish => sub {
      Mojo::IOLoop->remove($timer);
    });
  };

  sub gen_data {
    my $x = shift;
    return [ $x, sin( $x + 2*rand() - 2*rand() ) ]
  }

  sub calc_pi {
    my ( $pi, $total_runs, $new_runs ) = @_;

    # use the itr to show how to submit multiples
    my $cnt_pi = MCE::Shared->scalar(
      -1 * ( ( $pi * $total_runs ) / 4 - $total_runs )
    );

    # shared sequence-generator
    my $seq = MCE::Shared->sequence(
      { chunk_size => 10000, bounds_only => 1 },
      1, $new_runs
    );

    # Run
    my $routine = sub {
      while ( my ($beg, $end) = $seq->next ) {
        my ($cnt, $x, $y) = (0);

        foreach ( $beg .. $end ) {
          $x = rand(1);
          $y = rand(1);
          $cnt++ if $x * $x + $y * $y > 1;
        }

        $cnt_pi->incrby($cnt);
      }
    };

    MCE::Hobo->create( $routine ) for ( 1 .. 3 );
    MCE::Hobo->waitall;

    $total_runs += $new_runs;

    $pi = 4 * ( $total_runs - $cnt_pi->get ) / $total_runs;

    return ($pi, $total_runs);
  }

  app->start;

  __DATA__

  @@ index.html.ep
  <!DOCTYPE html>
  <html>
    <head>
      <title><%= title %></title>
    </head>
    <body>
      %= content
    </body>
  <html>

  %= javascript 'https://ajax.googleapis.com/ajax/libs/jquery/3.1.1/jquery.min.js'
  %= javascript 'https://cdnjs.cloudflare.com/ajax/libs/flot/0.8.3/jquery.flot.js'

  <div id="plot" style="width:600px;height:300px">
  </div>

  %= javascript begin
    var data = [];
    var plot = $.plot($('#plot'), [ data ]);

    var url = '<%= url_for('data')->to_abs %>';
    var ws = new WebSocket( url );

    ws.onmessage = function(e){
      var point = JSON.parse(e.data);
      data.push(point);
      plot.setData([data]);
      plot.setupGrid();
      plot.draw();
    };
  % end

ACKNOWLEDGMENTS

Mojo::IOLoop::Subprocess and Mojo::IOLoop::Subprocess::Sereal were used as templates in the making of this module.

AUTHOR

Mario E. Roy, <marioeroy AT gmail DOT com>

COPYRIGHT AND LICENSE

Copyright (C) 2016-2017 by Mario E. Roy

Mojo::IOLoop::HoboProcess is released under the same license as Perl.

See http://dev.perl.org/licenses/ for more information.

SEE ALSO

Mojolicious, Mojolicious::Guides, http://mojolicious.org

MCE::Shared