The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Parallel::Fork::BossWorker;

use 5.008008;
use strict;
use warnings;
use Carp;
use Data::Dumper qw(Dumper);
use IO::Handle;

# Perl module variables
our @ISA = qw();
our $VERSION = '0.03';

sub new {
	my $class = shift;
	my %values = @_;
	
	my $self = {
		result_handler => $values{result_handler} || undef,  # Method for handling output of the workers
		worker_count   => $values{worker_count}   || 10,     # Number of workers
		global_timeout => $values{global_timeout} || 0,      # Number of seconds before the worker terminates the job, 0 for unlimited
		work_handler   => $values{work_handler},             # Handler which will process the data from the boss
		work_queue     => [],
		msg_delimiter  => "\0\0\0"
	};
	bless $self, ref($class) || $class;

	# The work handler is required
	if (not defined $self->{work_handler}) {
		croak("Parameters \`work_handler' is required.");
	}

	return $self;
}

sub add_work(\@) {
	my $self = shift;
	my $work = shift;
	push(@{ $self->{work_queue} }, $work);
}

sub process {
	my $self = shift;
	my $handler = shift;
	
	eval {
		
		# If a worker dies, there's a problem
		local $SIG{CHLD} = sub {
			my $pid = wait();
			if (defined $self->{workers}->{$pid}) {
				confess("Worker $pid died.");
			}
		};
		
		# Start the workers
		$self->start();
		
		# Handy pipe reference
		my $from_workers = $self->{from_workers};
		
		# Read from the workers, loop until they all shut down
		while(my $result = $self->receive($from_workers)) {
			
			# Process the result handler
			if ($result->{data} && defined $self->{result_handler}) {
				&{ $self->{result_handler} }( $result->{data} );
			}
			
			# If there's still work to be done, send it to the worker, otherwise shut it down
			if ($#{ $self->{work_queue} } > -1) {
				my $worker = $self->{workers}->{$result->{pid}};
				$self->send(
					$self->{workers}->{ $result->{pid} }, # Worker's pipe
					pop(@{ $self->{work_queue} })
				);
			} else {
				my $fh = $self->{workers}->{ $result->{pid} };
				delete($self->{workers}->{ $result->{pid} });
				close($fh);
			}
		}
	};
	
	if ($@) {
		croak($@);
	}
}

sub start {
	my $self = shift();
	
	# Create a pipe for the workers to communicate to the boss
	pipe($self->{from_workers}, $self->{to_boss});
	
	# Create the workers
	foreach (1..$self->{worker_count}) {
		
		# Open a pipe for the worker
		pipe(my $from_boss, my $to_worker);
		
		# Fork off a worker
		my $pid = fork();
		
		if ($pid > 0) {
			
			# Boss
			$self->{workers}->{$pid} = $to_worker;
			
		} elsif ($pid == 0) {
			
			# Worker
			
			# Close unused pipes
			close($self->{from_workers});
			close($to_worker);
			open(STDIN, '/dev/null');
			
			# Setup communication pipes
			my $to_boss = $self->{to_boss};
			
			# Send the initial request
			$self->send($to_boss, {pid => $$});
			
			# Start processing
			$self->worker($from_boss);
			
			# When the worker subroutine completes, exit
			exit;
		} else {
			confess("Failed to fork: $!");
		}
	}
	
	# Close unused pipes
	close($self->{to_boss});
	delete($self->{to_boss});
}

sub worker(\*) {
	my $self = shift();
	my $from_boss = shift();
	
	# Read instructions from the server
	while (my $instructions = $self->receive($from_boss)) {
		
		# If the handler's children die, that's not our business
		$SIG{CHLD} = 'IGNORE';
		
		# Execute the handler with the given instructions
		my $result;
		eval {
			# Handle alarms
			local $SIG{ALRM} = sub {
				die "Work handler timed out."
			};
			
			# Set alarm
			alarm($self->{global_timeout});
			
			# Execute the handler and get it's result
			if (defined $self->{work_handler}) {
				$result = &{ $self->{work_handler} }($instructions);
			}
			
			# Disable alarm
			alarm(0);
		};
		
		# Warn on errors
		if ($@) {
			croak("Worker $$ error: $@");
		}
		
		# Send the result to the server
		$self->send($self->{to_boss}, {pid => $$, data => $result});
	}
}

sub receive(\*) {
	my $self = shift();

	# Get the file handle
	my $fh = shift();
	
	# Get a value from the file handle
	local $/ = $self->{msg_delimiter};
	my $value = <$fh>;
	
	# Deserialize the data
	no strict;
	no warnings;
	my $data = eval($value);

	if ($@) {
		confess("Failed to deserialize data: $@");
	}

	return $data;
}

sub send(\*$) {
	my $self = shift();

	# Get the file handle
	my $fh = shift();

	# Get the value which will be sent
	my $value = shift();

	# Print the value to the file handle
	local $Data::Dumper::Deepcopy = 1;
	local $Data::Dumper::Indent = 0;
	local $Data::Dumper::Purity = 1;
	print $fh Dumper($value) . $self->{msg_delimiter};
	
	# Force the file handle to flush
	$fh->flush();
}

1;
__END__

=head1 NAME

Parallel::Fork::BossWorker - Perl extension for easiliy creating forking queue processing applications.

=head1 SYNOPSIS

The minimal usage of Parallel::Fork::BossWorker requires you supply
the work_handler argument which returns a hash reference.

	use Parallel::Fork::BossWorker;
	
	# Create new BossWorker instance
	my $bw = new Parallel::Fork::BossWorker(
		work_handler => sub {
				my $work = shift;
				... do work here ...
				return {};
			}
	);
	
	$bw->add_work({key=>"value"});
	$bw->process();

Additionally, you could specify the result_handler argument, which
is passed the hash reference returned from your work_handler.

	use Parallel::Fork::BossWorker;
	
	# Create new BossWorker instance
	my $bw = new Parallel::Fork::BossWorker(
		work_handler => sub {
				my $work = shift;
				... do work here ...
				return {result => "Looks good"};
			},
		result_handler => sub {
			my $result = shift;
			print "$result->{result}\n";
		}
	);

=head1 DESCRIPTION

Parallel::Fork::BossWorker makes creating multiprocess applications easy.

The module is designed to work in a queue style of setup; with the worker
processes requesting 'work' from the boss process. The boss process
transparently serializes and sends the work data to your work handler, to be
consumed and worked. The worker process then transparently serializes and sends
optional data back to the boss process to be handled in your result handler.

This process repeats until the work queue is empty.

=head1 METHODS

=head2 new(...)

Creates and returns a new Parallel::Fork::BossWorker object.

	my $bw = Parallel::Fork::BossWorker->new(work_handler => \&routine)

Parallel::Fork::BossWorker has options which allow you to customize
how exactly the queue is handled and what is done with the data.

=over 4

=item * C<< work_handler => \&routine >>

The work_handler argument is required, the sub is called with it's first
and only argument being one of the values in the work queue. Each worker calls
this sub each time it receives work from the boss process. The handler may trap
$SIG{ALRM}, which may be called if global_timeout is specified.

The work_handler should clean up after itself, as the workers may call the
work_handler more than once.

=item * C<< result_handler => \&routine >>

The result_handler argument is optional, the sub is called with it's first
and only argument being the return value of work_handler. The boss process
calls this sub each time a worker returns data. This subroutine is not affected
by the value of global_timeout.

=item * C<< global_timeout => $seconds >>

By default, a handler can execute forever. If global_timeout is specified, an
alarm is setup to terminate the work_handler so processing can continue.

=item * C<< worker_count => $count >>

By default, 10 workers are started to process the data queue. Specifying
worker_count can scale the worker count to any number of workers you wish.

Take care though, as too many workers can adversely impact performance, though
the optimal number of workers will depend on what your handlers do.

=item * C<< msg_delimiter => $delimiter >>

Sending messages to and from the child processes is accomplished using
Data::Dumper. When transmitting data, a delimiter must be used to identify the
breaks in messages. By default, this delimiter is "\0\0\0", this delimiter may
not appear in your data.

=head2 add_work(\%work)

Adds work to the instance's queue.

	$bw->add_work({data => "my data"});

=head2 process()

Forks off the child processes and begins processing data.

	$bw->process();

=head1 REQUIREMENTS

This module depends on the following modules:

Carp

Data::Dumper

IO::Handle

=head1 BUGS

None I'm aware of yet, but I'm sure I'll receive reports :)

=head1 SEE ALSO

=head1 AUTHOR

Jeff Rodriguez, E<lt>jeff@jeffrodriguez.comE<gt>

=head1 COPYRIGHT AND LICENSE

Copyright (c) 2007, Jeff Rodriguez

All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

   * Redistributions of source code must retain the above copyright notice,
     this list of conditions and the following disclaimer.

   * Redistributions in binary form must reproduce the above copyright notice,
     this list of conditions and the following disclaimer in the documentation
     and/or other materials provided with the distribution.

   * Neither the name of the <ORGANIZATION> nor the names of its contributors
     may be used to endorse or promote products derived from this software
     without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

=cut