The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package BGS;

use strict;
use warnings;

use Exporter;
our @ISA = qw(Exporter);
our @EXPORT = qw(bgs_call bgs_back bgs_wait bgs_break);

our $VERSION = '0.12';

use IO::Select;
use Scalar::Util qw(refaddr);
use Storable qw(freeze thaw);
use POSIX ":sys_wait_h";


our $limit = 0; 


my $sel = IO::Select->new();

my %fh2data   = (); 
my %vpid2data = (); 
my @to_call   = (); 



sub _call {
	my ($data) = @_;

	my $sub = delete $$data{sub};

	pipe my $from_kid_fh, my $to_parent_fh or die "pipe: $!";

	my $kid_pid = fork;
	defined $kid_pid or die "Can't fork: $!";

	if ($kid_pid) {
		$sel->add($from_kid_fh);

		my $vpid = $$data{vpid};

		$$data{fh}  = $from_kid_fh;
		$$data{pid} = $kid_pid;

		$fh2data{$from_kid_fh} = $data;
		$vpid2data{$vpid}      = $data;

	} else {
		%fh2data   = ();
		%vpid2data = ();
		@to_call   = ();

		binmode $to_parent_fh;
		print $to_parent_fh freeze \ scalar $sub->();
		close $to_parent_fh;
		exit;
	}

}


sub _bgs_call {
	my ($sub, $callback) = @_;

	my $data = { sub => $sub };
	my $vpid = $$data{vpid} = refaddr $data;

	$$data{callback} = $callback if $callback;

	if ($limit > 0 and keys %fh2data >= $limit) {
		push @to_call, $data;
	} else {
		_call($data);
	}

	return $data;
}

sub bgs_call(&$) {
	my ($sub, $callback) = @_;

	my $data = _bgs_call($sub, $callback);

	return $$data{vpid};
}

sub bgs_back(&) { shift }


sub bgs_wait(;$) {
	my ($waited) = @_;

	if ($waited and not exists $vpid2data{$waited} and not grep { $$_{vpid} eq $waited } @to_call) {
		return;
	}

	local $SIG{PIPE} = "IGNORE";
	my $buf;            
	my $blksize = 1024; 
	while ($sel->count()) {
		foreach my $fh ($sel->can_read()) {
			my $data = $fh2data{$fh};
			my $len = sysread $fh, $buf, $blksize;
			if ($len) {
				push @{$$data{from_kid}}, $buf; 
			} elsif (defined $len) { 
				$sel->remove($fh); 
				close $fh or warn "Kid is existed: $?";

				delete $$data{fh};
				my $pid = delete $$data{pid};
				my $callback = delete $$data{callback};
				
				unless ($$data{break}) {
					if (exists $$data{from_kid} and my $r = eval { thaw(join "", @{delete $$data{from_kid}}) }) {
						if ($callback) {
							$callback->($$r);
						} else {
							$$data{result} = $$r;
						}
					} else {
						if ($callback) {
							$callback->();
						} else {
							$$data{result} = undef;
						}
					}
				}

				my $vpid = $$data{vpid};
				delete $fh2data{$fh};
				delete $vpid2data{$vpid};

				waitpid($pid, 0);

				if (my $call = shift @to_call) {
					_call($call);
				}

				if ($waited and $waited == $vpid) {
					return;
				}

			} else {
				die "Can't read '$fh': $!";
			}
		}
	}
}



sub bgs_break(;$) {
	my ($vpid) = @_;
	if (defined $vpid) {
		my $data = $vpid2data{$vpid};
		defined $data or return;
		if (my $pid = $$data{pid}) {
			$$data{break} = 1;
			kill "TERM", $pid;
		}
		 @to_call = grep { $$_{vpid} ne $vpid } @to_call;
	} else {
		foreach my $data (values %vpid2data) {
			if (my $pid = $$data{pid}) {
				$$data{break} = 1;
				kill "TERM", $pid;
			}
		}
		@to_call = ();
	}
}


1;


__END__


=head1 NAME

BGS - Background execution of subroutines in child processes.

=head1 SYNOPSIS

  use BGS;
  # $BGS::limit = 0;

  my @foo;

  foreach my $i (1 .. 2) {
    bgs_call {
      # child process
      return "Start $i";
    } bgs_back {
      # callback subroutine
      my $r = shift;
      push @foo, "End $i. Result: '$r'.\n";
    };
  }

  bgs_wait();

  print foreach @foo;

=head1 MOTIVATION

The module was created when need to receive information from dozens of
database servers in the shortest time appeared.

=head1 DESCRIPTION

=head2 bgs_call

Child process is created for each subroutine, that is prescribed with
B<bgs_call>, and it executes within this child process.

The subroutine must return either a B<scalar> or a B<reference>!

The answer of the subroutine passes to the callback subroutine as an argument.
If a child process ended without bgs_call value returning, than bgs_back subprogram is called without argument.

bgs_call return vpid (virtual pid) of child process.

=head2 bgs_back

The callback subroutine is described in B<bgs_back> block.

The answer of B<bgs_call> subroutine passes to B<bgs_back> subroutine
as an argument.

=head2 bgs_wait

Call of bgs_wait() reduces to child processes answers wait and
callback subroutines execution.

Call bgs_wait($vpid) to wait specific process.

=head2 bgs_break

kill all or specific child processes.

Call bgs_break($vpid) to kill specific process.

=head2 $BGS::limit

Set $BGS::limit to limit child processes count. Default is 0 (unlimited).

=head1 AUTHOR

Nick Kostyria

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2011 by Nick Kostyria

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.8 or,
at your option, any later version of Perl 5 you may have available.

=cut