The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Coro::ProcessPool - An asynchronous process pool

VERSION

version 0.26_005

SYNOPSIS

  use Coro::ProcessPool;
  use Coro;

  my $pool = Coro::ProcessPool->new(
    max_procs => 4,
    max_reqs  => 100,
    include   => ['/path/to/my/task/classes', '/path/to/other/packages'],
  );

  my $double = sub { $_[0] * 2 };

  #-----------------------------------------------------------------------
  # Process in sequence
  #-----------------------------------------------------------------------
  my %result;
  foreach my $i (1 .. 1000) {
    $result{$i} = $pool->process($double, [$i]);
  }

  #-----------------------------------------------------------------------
  # Process as a batch
  #-----------------------------------------------------------------------
  my @results = $pool->map($double, 1 .. 1000);

  #-----------------------------------------------------------------------
  # Defer waiting for result
  #-----------------------------------------------------------------------
  my %deferred = map { $_ => $pool->defer($double, [$_]) } 1 .. 1000);
  foreach my $i (keys %deferred) {
    print "$i = " . $deferred{$i}->() . "\n";
  }

  #-----------------------------------------------------------------------
  # Use a "task class", implementing 'new' and 'run'
  #-----------------------------------------------------------------------
  my $result = $pool->process('Task::Doubler', 21);

  #-----------------------------------------------------------------------
  # Pipelines (work queues)
  #-----------------------------------------------------------------------
  my $pipe = $pool->pipeline;

  # Start producer thread to queue tasks
  my $producer = async {
    while (my $task = get_next_task()) {
      $pipe->queue('Some::TaskClass', $task);
    }

    # Let the pipeline know no more tasks are coming
    $pipe->shutdown;
  };

  # Collect the results of each task as they are received
  while (my $result = $pipe->next) {
    do_stuff_with($result);
  }

  $pool->shutdown;

DESCRIPTION

Processes tasks using a pool of external Perl processes.

ATTRIBUTES

max_procs

The maximum number of processes to run within the process pool. Defaults to the number of CPUs on the ssytem.

max_reqs

The maximum number of tasks a worker process may run before being terminated and replaced with a fresh process. This is useful for tasks that might leak memory over time.

include

An optional array ref of directory paths to prepend to the set of directories the worker process will use to find Perl packages.

PRIVATE ATTRIBUTES

procs_lock

Semaphore used to control access to the worker processes. Starts incremented to the number of processes (max_procs).

num_procs

Running total of processes that are currently running.

procs

Array holding the Coro::ProcessPool::Process objects.

all_procs

is_running

Boolean which signals to the instance that the shutdown method has been called.

METHODS

capacity

Returns the number of free worker processes.

shutdown

Shuts down all processes and resets state on the process pool. After calling this method, the pool is effectively in a new state and may be used normally.

process($f, $args)

Processes code ref $f in a child process from the pool. If $args is provided, it is an array ref of arguments that will be passed to $f. Returns the result of calling $f->(@$args).

Alternately, $f may be the name of a class implementing the methods new and run, in which case the result is equivalent to calling $f->new(@$args)->run(). Note that the include path for worker processes is identical to that of the calling process.

This call will yield until the results become available. If all processes are busy, this method will block until one becomes available. Processes are spawned as needed, up to max_procs, from this method. Also note that the use of max_reqs can cause this method to yield while a new process is spawned.

map($f, @args)

Applies $f to each value in @args in turn and returns a list of the results. Although the order in which each argument is processed is not guaranteed, the results are guaranteed to be in the same order as @args, even if the result of calling $f returns a list itself (in which case, the results of that calcuation is flattened into the list returned by map.

defer($f, $args)

Similar to "process" in ., but returns immediately. The return value is a code reference that, when called, returns the results of calling $f-(@$args)>.

  my $deferred = $pool->defer($coderef, [ $x, $y, $z ]);
  my $result   = $deferred->();

pipeline

Returns a Coro::ProcessPool::Pipeline object which can be used to pipe requests through to the process pool. Results then come out the other end of the pipe. It is up to the calling code to perform task account (for example, by passing an id in as one of the arguments to the task class).

  my $pipe = $pool->pipeline;

  my $producer = async {
    foreach my $args (@tasks) {
      $pipe->queue('Some::Class', $args);
    }

    $pipe->shutdown;
  };

  while (my $result = $pipe->next) {
    ...
  }

All arguments to pipeline() are passed transparently to the constructor of Coro::ProcessPool::Pipeline. There is no limit to the number of pipelines which may be created for a pool.

If the pool is shutdown while the pipeline is active, any tasks pending in "next" in Coro::ProcessPool::Pipeline will fail and cause the next call to next() to croak.

A NOTE ABOUT IMPORTS AND CLOSURES

Code refs are serialized using Data::Dump::Streamer, allowing closed over variables to be available to the code being called in the sub-process. Note that mutated variables are not updated when the result is returned.

See "Caveats-Dumping-Closures-(CODE-Refs)" in Data::Dump::Streamer for important notes regarding closures.

Use versus require

The use pragma is run a compile time, whereas require is evaluated at runtime. Because of this, the use of use in code passed directly to the process method can fail because the use statement has already been evaluated when the calling code was compiled.

This will not work:

  $pool->process(sub {
    use Foo;
    my $foo = Foo->new();
  });

This will work:

  $pool->process(sub {
    require Foo;
    my $foo = Foo->new();
  });

If use is necessary (for example, to import a method or transform the calling code via import), it is recommended to move the code into its own module, which can then be called in the anonymous routine:

  package Bar;

  use Foo;

  sub dostuff {
    ...
  }

Then, in your caller:

  $pool->process(sub {
    require Bar;
    Bar::dostuff();
  });

If it's a problem...

Use the task class method if the loading requirements are causing headaches:

  my $result = $pool->process('Task::Class', [@args]);

COMPATIBILITY

Coro::ProcessPool will likely break on Win32 due to missing support for non-blocking file descriptors (Win32 can only call select and poll on actual network sockets). Without rewriting this as a network server, which would impact performance and be really annoying, it is likely this module will not support Win32 in the near future.

The following modules will get you started if you wish to explore a synchronous process pool on Windows:

Win32::Process
Win32::IPC
Win32::Pipe

AUTHOR

Jeff Ober <sysread@fastmail.fm>

COPYRIGHT AND LICENSE

This software is copyright (c) 2017 by Jeff Ober.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.