The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

package PBS::Build::Forked ;
use PBS::Debug ;

use 5.006 ;
use strict ;
use warnings ;

require Exporter ;
use AutoLoader qw(AUTOLOAD) ;

our @ISA = qw(Exporter) ;
our %EXPORT_TAGS = ('all' => [ qw() ]) ;
our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } ) ;
our @EXPORT = qw() ;

our $VERSION = '0.04' ;

use PBS::Output ;
use PBS::Constants ;
use PBS::Distributor ;
use PBS::Build::NodeBuilder ;
use PBS::Build::ForkedNodeBuilder ;
use Data::TreeDumper ;
use Time::HiRes qw(gettimeofday tv_interval) ;
use PBS::Build ;

use Socket;
use IO::Select ;
use PBS::ProgressBar ;

#-------------------------------------------------------------------------------

sub Build
{
=pod

The parallelisation algorithm is simple and effective enough as most dependency trees have many dependencies 
for each node making the graph look triangular to a very wide base triangular. Note that this is not the most
effective parallelisation algorithm. Building the nodes that have parents with few children first is more
effective as it maximizes tha number of build thread that are active. This means that we build hight first instead
for depth first. Since nodes have diffrent build time, the parallelisation algorith (in fact the prioritisation of the 
terminal nodes) should be dynamic to be optimal and in that case, should take into account the load on the cpu building
the node as build time is not only a factor of the CPU but also network and other I/O.

keeping previous build time to build the longest nodes to build first can also make the end of the build more effective as it
keeps the builder pool full.

=cut

my $t0 = [gettimeofday];

my $pbs_config      = shift ;
our $build_sequence = shift ;
my $inserted_nodes  = shift ;

my $build_queue = EnqueuTerminalNodes($build_sequence, $pbs_config) ;

my $number_of_nodes_to_build = scalar(@$build_sequence) - 1 ; # -1 as PBS root is never build
my $number_of_terminal_nodes = scalar(keys %$build_queue) ;

my $distributor        = PBS::Distributor::CreateDistributor($pbs_config, $build_sequence) ;
my $number_of_builders = GetNumberOfBuilders($number_of_terminal_nodes, $pbs_config, $distributor) ;
my $builders           = StartBuilders($number_of_builders, $pbs_config, $distributor, , $build_sequence, $inserted_nodes) ;

my $number_of_already_build_node = 0 ;
my $number_of_failed_builders = 0 ;
my $error_output = '' ;

my %builder_stats ;
my $builder_using_perl_time = 0 ;

my $progress_bar = CreateProgressBar($pbs_config, $number_of_nodes_to_build) ;
my $node_build_index = 0 ;

while(%$build_queue)
	{
	# start building a node if a process is free and no error occured
	unless($number_of_failed_builders)
		{
		my $started_builders = StartEnqueuedNodesBuild
					(
					  $pbs_config
					, $build_queue
					, $builders
					, $node_build_index
					, $number_of_nodes_to_build
					, \%builder_stats
					) ;
					
		$node_build_index += $started_builders ; 
		}
	
	my @built_nodes = WaitForBuilderToFinish($pbs_config, $builders) ;
	
	@built_nodes || last if $number_of_failed_builders ; # stop if nothing is building and an error occured
		
	for my $built_node_name (@built_nodes)
		{
		my ($build_result, $build_time, $node_error_output) = CollectNodeBuildResult($pbs_config, $built_node_name, $build_queue) ;
		
		$number_of_already_build_node++ ;
		
		if($build_result == BUILD_SUCCESS)
			{
			$progress_bar->update($number_of_already_build_node) if $progress_bar ;
			$builder_using_perl_time += $build_time if PBS::Build::NodeBuilderUsesPerlSubs($build_queue->{$built_node_name}) ;
			
			PBS::Depend::SynchronizeAfterBuild($build_queue->{$built_node_name}{NODE}) ;
			EnqueueNodeParents($pbs_config, $build_queue->{$built_node_name}{NODE}, $build_queue) ;
			}
		else
			{
			$error_output .= $node_error_output ;
			$number_of_failed_builders++ ;
			}
		
		delete $build_queue->{$built_node_name} ;
		}
	}

TerminateBuilders($builders) ;

if($number_of_failed_builders)
	{
	PrintError "** Failed build@{[$number_of_failed_builders > 1 ? 's' : '']} **\n" ;
	print $error_output ;
	}
	
if(defined $pbs_config->{DISPLAY_SHELL_INFO})
	{
	print WARNING DumpTree(\%builder_stats, '** Builder process statistics: **', DISPLAY_ADDRESS => 0) ;
	}
	
if($pbs_config->{DISPLAY_TOTAL_BUILD_TIME})
	{
	PrintInfo(sprintf("Total build time: %0.2f s. Perl subs time: %0.2f s.\n", tv_interval ($t0, [gettimeofday]), $builder_using_perl_time)) ;
	}

return(!$number_of_failed_builders) ;
}

#---------------------------------------------------------------------------------------------------------------

sub EnqueuTerminalNodes
{
my ($build_sequence, $pbs_config) = @_ ;
my %build_queue ;

for my $node (@$build_sequence)
	{
	# node in the build sequence might have been build already.
	# when a node is build, its __BUILD_DONE field is set
	
	for my $child (keys %$node)
		{
		next if $child =~ /^__/ ;
		
		if(exists $node->{$child}{__TRIGGERED} && defined $node->{$child}{__BUILD_DONE})
			{
			if(defined $pbs_config->{DISPLAY_JOBS_INFO})
				{
				PrintInfo("Removing '$node->{$child}{__NAME}' from parallel sequence dependency.\n") ;
				}
				
			$node->{__CHILDREN_TO_BUILD}-- ;
			}
		}
		
	#enque node if it's terminal
	if(! defined $node->{__CHILDREN_TO_BUILD} || 0 == $node->{__CHILDREN_TO_BUILD})
		{
		if(defined $pbs_config->{DISPLAY_JOBS_INFO})
			{
			PrintInfo "Enqueuing terminal node '$node->{__NAME}'.\n" ;
			}
			
		$build_queue{$node->{__NAME}} = {NODE => $node} ;
		}
	}
	
return(\%build_queue) ;
}

#----------------------------------------------------------------------------------------------------------------------

sub GetNumberOfBuilders
{
my ($number_of_terminal_nodes, $pbs_config, $distributor)  = @_ ;

my $number_of_builders = $pbs_config->{JOBS} ;

if($number_of_builders > 0)
	{
	$number_of_builders = $number_of_builders > $distributor->GetNumberOfShells() ? $distributor->GetNumberOfShells() : $number_of_builders ;
	}
else
	{
	# let distributor determine the number of build threads
	$number_of_builders = $pbs_config->{JOBS} = $distributor->GetNumberOfShells() ;
	}
	
if($number_of_builders > $number_of_terminal_nodes)
	{
	$number_of_builders = $number_of_terminal_nodes ;
	}

$number_of_builders ||= 1 ; #safeguard for user errors

my $builder_plural = '' ; $builder_plural = 'es' if($number_of_builders > 1) ;
my $build_process = "build process$builder_plural" ;

PrintInfo("Parallel build: using $number_of_builders $build_process out of maximum $pbs_config->{JOBS} for $number_of_terminal_nodes terminal nodes.\n") ;

return($number_of_builders ) ;
}

#----------------------------------------------------------------------------------------------------------------------

sub StartBuilders
{
my ($number_of_builders, $pbs_config, $distributor, $build_sequence, $inserted_nodes)  = @_ ;

my @builders ;
for my$builder_index (0 .. ($number_of_builders - 1))
	{
	my $shell = $distributor->GetShell($builder_index) ;
	
	my ($builder_channel) = StartBuilderProcess
				(
				  $pbs_config
				, $build_sequence
				, $inserted_nodes
				, $shell
				, "[$builder_index] " . __PACKAGE__ . ' ' . __FILE__ . ':' . __LINE__
				) ;
				
	unless(defined $builder_channel)
		{
		PrintError "Couldn't start a forked builder #$_!\n" ;
		die ;
		}
	
	print $builder_channel "GET_PROCESS_ID" . "__PBS_FORKED_BUILDER__" . "\n";
	
	my $child_pid = -1 ;
	while(<$builder_channel>)
		{
		last if /__PBS_FORKED_BUILDER__/ ;
		chomp ;
		$child_pid = $_ ;
		}
	
	$builders[$builder_index] = 
		{
		  PID              => $child_pid
		, BUILDER_CHANNEL  => $builder_channel
		, SHELL            => $shell
		, BUILDING         => 0
		} ;
	}

return(\@builders) ;
}

#----------------------------------------------------------------------------------------------------------------------

sub StartBuilderProcess
{
# all arguments are passed to PBS::Build::ForkedNodeBuilder::NodeBuilder
#my ($pbs_config, $build_sequence, $inserted_nodes, $shell, $builder_info) = @_ ;

# PBS sends the name of the node to build, the builder returns the build result
my ($to_child, $to_parent) ;
socketpair($to_child, $to_parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC)  or  die "socketpair: $!";

my $pid = fork() ;
if($pid)
	{
	close($to_parent) ;
	$to_child->autoflush(1);
	
	return($to_child) ;
	}
else
	{
	# new process
	unless(defined $pid)
		{
		# couldn't fork
		close($to_child) ;
		close($to_parent) ;
		return ;
		}
		
	close($to_child) ;
	$to_parent->autoflush(1) ;
	
	PBS::Build::ForkedNodeBuilder::NodeBuilder($to_parent, @_) ; # waits for commands from parent
	}
}

#-------------------------------------------------------------------------------------------------------

sub CreateProgressBar
{
my ($pbs_config, $number_of_nodes_to_build) = @_ ;

if($pbs_config->{DISPLAY_PROGRESS_BAR})
	{
	return
		(
		PBS::ProgressBar->new
			({
			  count => $number_of_nodes_to_build 
			, ETA   => "linear", 
			#, pre_update_user_code => $PBS::Output::global_info_escape_code
			#, post_update_user_code => $PBS::Output::global_reset_escape_code
			})
		);
	}
}

#----------------------------------------------------------------------------------------------------------------------

sub WaitForBuilderToFinish
{
my ($pbs_config, $builders) = @_ ;
	
my $select_all = new IO::Select ;
my @waiting_for_messages ;

my $number_of_builders = @$builders ;

for (0 .. ($number_of_builders - 1))
	{
	if($builders->[$_]{BUILDING} == 1)
		{
		push @waiting_for_messages, "Waiting for '$builders->[$_]{NODE}' on '" . $builders->[$_]{SHELL}->GetInfo() . " '\n" ;
		$select_all->add($builders->[$_]{BUILDER_CHANNEL}) ;
		}
	}


my @build_nodes ;

if(@waiting_for_messages)
	{
	if(defined $pbs_config->{DISPLAY_JOBS_RUNNING})
		{
		PrintWarning "* * * * * *\n" ;
		PrintWarning $_ for(@waiting_for_messages) ;
		PrintWarning "* * * * * *\n" ;
		}
		
	# block till we get end of build from a builder thread
	my @sockets_ready = $select_all->can_read() ; 
	
	for (0 .. ($number_of_builders - 1))
		{
		for my $socket_ready (@sockets_ready)
			{
			if($socket_ready == $builders->[$_]{BUILDER_CHANNEL})
				{
				push @build_nodes, $builders->[$_]{NODE} ;
				}
			}
		}
	}

return(@build_nodes) ;
}

#----------------------------------------------------------------------------------------------------------------------

sub StartEnqueuedNodesBuild
{
my ($pbs_config, $build_queue, $builders, $node_build_index, $number_of_nodes_to_build, $builder_stats) = @_ ;

my $number_of_builders = @$builders ;
my $started_builders = 0 ;

for my $enqued_node (keys %$build_queue)
	{
	my $node_pid = $build_queue->{$enqued_node}{PID} ;
	
	next if defined $node_pid ; # node is under build
	
	my $pid = undef ;
	for (0 .. ($number_of_builders - 1))
		{
		if($builders->[$_]{BUILDING} == 0)
			{
			$pid = $builders->[$_] ;
			last ;
			}
		}
			
	if($pid)
		{
		$started_builders++ ;
		$build_queue->{$enqued_node}{PID} = $pid ;
		$pid->{BUILDING} = 1 ;
		$pid->{NODE} = $enqued_node ;
		
		# this info is for the root process. The modification we make here
		# are not seen in the builder processes
		$build_queue->{$enqued_node}{NODE}{__SHELL_ORIGIN} = __PACKAGE__ . __FILE__ . __LINE__ ;
		
		if(defined $pid->{SHELL})
			{
			$build_queue->{$enqued_node}{NODE}{__SHELL_INFO} = $pid->{SHELL}->GetInfo() ;
			}
			
		# keep some stats on which builder ran
		$builder_stats->{'PID ' . $pid->{PID}}{RUNS}++ ;
		$builder_stats->{'PID ' . $pid->{PID}}{NAME} = $pid->{SHELL}->GetInfo() ;
		
		unless(exists $build_queue->{$enqued_node}{NODE}{__SHELL_OVERRIDE})
			{
			push @{$builder_stats->{'PID ' . $pid->{PID}}{NODES}}, $enqued_node ;
			}
		else
			{
			push @{$builder_stats->{'PID ' . $pid->{PID}}{NODES}}, "$enqued_node [L]";
			}
			
		my $node_index = $started_builders + $node_build_index ;
		
		SendIpcToBuildNode($pbs_config, $build_queue->{$enqued_node}{NODE}, $node_index, $number_of_nodes_to_build, $pid) ;
		}
	else
		{
		last ;
		}
		
	}

return($started_builders) ;
}

#---------------------------------------------------------------------------------------------------------------

sub SendIpcToBuildNode
{
my ($pbs_config, $node, $node_index, $number_of_nodes_to_build, $pid) = @_ ;
my $node_name = $node->{__NAME} ; 

if(defined $pbs_config->{DISPLAY_JOBS_INFO})
	{
	my $percent_done = int(($node_index * 100) / $number_of_nodes_to_build) ;
	my $node_build_sequencer_info = "$node_index/$number_of_nodes_to_build, $percent_done%" ;
	
	PrintInfo "Starting build of '$node_name' ($node_build_sequencer_info) in '@{[$pid->{SHELL}->GetInfo()]}' pid: $pid->{PID}\n" ;
	}
	
# IPC start the build
my $percent_done = int(($node_index * 100) / $number_of_nodes_to_build ) ;
my $builder_channel = $pid->{BUILDER_CHANNEL} ;

print $builder_channel "BUILD_NODE" . "__PBS_FORKED_BUILDER__"
			. $node_name . "__PBS_FORKED_BUILDER__"
			. "$node_index/$number_of_nodes_to_build, $percent_done%\n" ;
}

#---------------------------------------------------------------------------------------------------------------

sub CollectNodeBuildResult
{
my ($pbs_config, $built_node_name, $build_queue) = @_ ;
	
my $built_node = $build_queue->{$built_node_name}{NODE} ;

my $pid = $build_queue->{$built_node_name}{PID} ;
$pid->{BUILDING} = 0 ;

my $builder_channel = $pid->{BUILDER_CHANNEL} ;

my ($build_result,$build_message) = split /__PBS_FORKED_BUILDER__/, <$builder_channel> ;
$build_result = BUILD_FAILED unless defined $build_result ;

my $build_time = -1 ;
my $error_output = '' ;

print "\n" unless $build_result == BUILD_SUCCESS ;

if(@{$pbs_config->{DISPLAY_BUILD_INFO}})
	{
	PrintWarning("--bi defined, continuing.\n") ;
	print $builder_channel "GET_OUTPUT" . "__PBS_FORKED_BUILDER__" . "\n" ;
	
	while(<$builder_channel>)
		{
		last if /__PBS_FORKED_BUILDER__/ ;
		print $_ ;
		}
	}
else
	{
	print $builder_channel "GET_BUILD_TIME" . "__PBS_FORKED_BUILDER__" . "\n";
	
	while(<$builder_channel>)
		{
		last if /__PBS_FORKED_BUILDER__/ ;
		chomp ;
		$build_time = $_ ;
		}
	
	my $no_output = defined $PBS::Shell::silent_commands && defined $PBS::Shell::silent_commands_output ;
	
	if($build_result == BUILD_SUCCESS)
		{
		$built_node->{__BUILD_DONE} = "PBS::Build::Forked Done." ;
		print $builder_channel "GET_OUTPUT" . "__PBS_FORKED_BUILDER__" . "\n" ;
		
		# collect builder output and display it
		while(<$builder_channel>)
			{
			last if /__PBS_FORKED_BUILDER__/ ;
			print $_ unless $no_output ;
			}
		}
	else
		{
		# the build failed, save the builder output to display later and stop building
		PrintError "#------------------------------------------------------------------------------\n"
			  ."Error building node '$built_node_name'! Error will be reported bellow.\n" ;
			  
		#~ my $no_output = defined $PBS::Shell::silent_commands && defined $PBS::Shell::silent_commands_output ;
		#~ $no_output = 0 if($pbs_config->{BUILD_AND_DISPLAY_NODE_INFO} || scalar(@{$pbs_config->{DISPLAY_NODE_INFO}})) ;
		#~ $no_output = 1 if defined $pbs_config->{DISPLAY_NO_BUILD_HEADER} ;
		
		#~ if($no_output)
			#~ {
			#~ #add the missing header, the builder will add the output even if in the no_output mode
			#~ my $node_name = "Node '$built_node_name':" ;
			#~ my $columns = length($node_name) ;
			#~ my $separator = '#' . ('-' x ($columns - 1)) . "\n"  ;
			
			#~ $error_output  .= ERROR($separator . $node_name . "\n" . $separator) ;
			#~ }
			
		print $builder_channel "GET_OUTPUT" . "__PBS_FORKED_BUILDER__" . "\n" ;
		while(<$builder_channel>)
			{
			last if /__PBS_FORKED_BUILDER__/ ;
			$error_output  .= $_ ;
			}
		}
	}
	
# handle log
if(defined (my $lh = $pbs_config->{CREATE_LOG}))
	{
	print $builder_channel "GET_LOG" . "__PBS_FORKED_BUILDER__" . "\n" ;
	while(<$builder_channel>)
		{
		last if /__PBS_FORKED_BUILDER__/ ;
		print $lh $_ ;
		}
	}
	
if(defined $pbs_config->{DISPLAY_JOBS_INFO})
	{
	PrintInfo "'$built_node_name': $build_result,$build_message\n" ;
	}
	
return($build_result, $build_time, $error_output) ;
}

#---------------------------------------------------------------------------------------------------------------

sub EnqueueNodeParents
{
my ($pbs_config, $node, $build_queue) = @_ ;

# check if any node waiting for a child node to be build can be build
for my $parent (@{$node->{__PARENTS}})
	{
	$parent->{__CHILDREN_TO_BUILD}-- ;
	
	next if $parent->{__NAME} =~ /^__/ ;
	
	if(0 == $parent->{__CHILDREN_TO_BUILD})
		{
		if(defined $pbs_config->{DISPLAY_JOBS_INFO})
			{
			PrintInfo "Enqueuing parent node '$parent->{__NAME}'.\n" ;
			}
			
		$build_queue->{$parent->{__NAME}} = {NODE => $parent} ;
		}
	}
}

#---------------------------------------------------------------------------------------------------------------

sub TerminateBuilders
{
my ($builders) = @_;
my $number_of_builders = @$builders ;

PrintInfo "\n** Terminating Builders **\n" ;

for my $builder_index (0 .. ($number_of_builders - 1))
	{
	my $builder_channel = $builders->[$builder_index]{BUILDER_CHANNEL} ;
	
	if($builders->[$builder_index]{BUILDING})
		{
		# 20 feb 2005, I don't think this can happend any more NK.
		# happend 20 May 2005 :-)
		die ; 
		}
		
	print $builder_channel "STOP_PROCESS\n" ;
	}
	
for (0 .. ($number_of_builders - 1))
	{
	waitpid($builders->[$_], 0) ;
	}
}

#------------------------------------------------------------------------------------------------------------

1 ;

__END__

=head1 NAME

PBS::Build::Forked -

=head1 DESCRIPTION

=head1 AUTHOR

Khemir Nadim ibn Hamouda. nadim@khemir.net

=cut