The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Coro::ProcessPool - An asynchronous pool of perl processes

VERSION

version 0.30

SYNOPSIS

  use Coro::ProcessPool;
  use Coro;

  my $pool = Coro::ProcessPool->new(
    max_procs => 4,
    max_reqs  => 100,
    include   => ['/path/to/my/task/classes', '/path/to/other/packages'],
  );

  my $double = sub { $_[0] * 2 };

  #-----------------------------------------------------------------------
  # Process in sequence, waiting for each result in turn
  #-----------------------------------------------------------------------
  my %result;
  foreach my $i (1 .. 1000) {
    $result{$i} = $pool->process($double, $i);
  }

  #-----------------------------------------------------------------------
  # Process as a batch
  #-----------------------------------------------------------------------
  my @results = $pool->map($double, 1 .. 1000);

  #-----------------------------------------------------------------------
  # Defer waiting for result
  #-----------------------------------------------------------------------
  my %deferred;

  $deferred{$_} = $pool->defer($double, $_)
    foreach 1 .. 1000;

  # Later
  foreach my $i (keys %deferred) {
    print "$i = " . $deferred{$i}->() . "\n";
  }

  #-----------------------------------------------------------------------
  # Use a "task class" implementing 'new' and 'run'
  #-----------------------------------------------------------------------
  my $result = $pool->process('Task::Doubler', 21);

  #-----------------------------------------------------------------------
  # Pipelines (work queues)
  #-----------------------------------------------------------------------
  my $pipe = $pool->pipeline;

  # Start producer thread to queue tasks
  my $producer = async {
    while (my $task = get_next_task()) {
      $pipe->queue('Some::TaskClass', $task);
    }

    # Let the pipeline know no more tasks are coming
    $pipe->shutdown;
  };

  # Collect the results of each task as they are received
  while (my $result = $pipe->next) {
    do_stuff_with($result);
  }

  $pool->shutdown;

DESCRIPTION

Processes tasks using a pool of external Perl processes.

CONSTRUCTOR

  my $pool = Coro::ProcessPool->new(
    max_procs => 4,
    max_reqs  => 100,
    include   => ['path/to/my/packages', 'some/more/packages'],
  );

max_procs

The maximum number of processes to run within the process pool. Defaults to the number of CPUs on the ssytem.

max_reqs

The maximum number of tasks a worker process may run before being terminated and replaced with a fresh process. This is useful for tasks that might leak memory over time.

include

An optional array ref of directory paths to prepend to the set of directories the worker process will use to find Perl packages.

METHODS

join

Cedes control to the event loop until the pool has completed all remaining tasks and woken up any threads watching them.

defer

Queues a task to be processed by the pool. Tasks may specified in either of two forms, as a code ref or the fully qualified name of a perl class which implements two methods, new and run. Any remaining arguments to defer are passed unchanged to the code ref or the new method of the task class.

defer will immediately return an "condvar" in AnyEvent that will wait for and return the result of the task (or croak if the task generated an error).

  # Using a code ref
  my $cv = $pool->defer(\&func, $arg1, $arg2, $arg3);
  my $result = $cv->recv;

  # With a task class
  my $cv = $pool->defer('Some::Task::Class', $arg1, $arg2, $arg3);
  my $result = $cv->recv;

process

Calls defer and immediately calls recv on the returned condvar, returning the result. This is useful if your workflow includes multiple threads which share the same pool. All arguments are passed unchanged to defer.

map

Like perl's map, applies a code ref to a list of arguments. This method will cede until all results have been returned by the pool, returning the result as a list. The order of arguments and results is preserved as expected.

  my @results = $pool->map(\&func, $arg1, $arg2, $arg3);

pipeline

Returns a Coro::ProcessPool::Pipeline object which can be used to pipe requests through to the process pool. Results then come out the other end of the pipe, not necessarily in the order in which they were queued. It is up to the calling code to perform task accounting (for example, by passing an id in as one of the arguments to the task class).

  my $pipe = $pool->pipeline;

  my $producer = async {
    foreach my $args (@tasks) {
      $pipe->queue('Some::Class', $args);
    }

    $pipe->shutdown;
  };

  while (my $result = $pipe->next) {
    ...
  }

All arguments to pipeline() are passed transparently to the constructor of Coro::ProcessPool::Pipeline. There is no limit to the number of pipelines which may be created for a pool.

A NOTE ABOUT IMPORTS AND CLOSURES

Code refs are serialized using Data::Dump::Streamer, allowing closed over variables to be available to the code being called in the sub-process. Mutated variables are not updated when the result is returned.

See "Caveats-Dumping-Closures-(CODE-Refs)" in Data::Dump::Streamer for important notes regarding closures.

Use versus require

The use pragma is run at compile time, whereas require is evaluated at runtime. Because of this, the use of use in code passed directly to the process method can fail in the worker process because the use statement has already been evaluated in the parent process when the calling code was compiled.

This will not work:

  $pool->process(sub {
    use Foo;
    my $foo = Foo->new();
  });

This will work:

  $pool->process(sub {
    require Foo;
    my $foo = Foo->new();
  });

If use is necessary (for example, to import a method or transform the calling code via import), it is recommended to move the code into its own module (or to expliticly call require and import in the subroutine), which can then be called in the anonymous routine:

  package Bar;

  use Foo;

  sub dostuff {
    ...
  }

Then, in your caller:

  $pool->process(sub {
    require Bar;
    Bar::dostuff();
  });

Alternately, a task class may be used if dependency management is causing a headaches:

  my $result = $pool->process('Task::Class', @args);

COMPATIBILITY

Coro::ProcessPool will likely break on Win32 due to missing support for non-blocking file descriptors (Win32 can only call select and poll on actual network sockets). Without rewriting this as a network server, which would impact performance and be really annoying, it is likely this module will not support Win32 in the near future.

The following modules will get you started if you wish to explore a synchronous process pool on Windows:

Win32::Process
Win32::IPC
Win32::Pipe

SEE ALSO

Coro
"condvar" in AnyEvent

AUTHOR

Jeff Ober <sysread@fastmail.fm>

COPYRIGHT AND LICENSE

This software is copyright (c) 2017 by Jeff Ober.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.