The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
package Makefile::Parallel;

use Makefile::Parallel::Grammar;
use Log::Log4perl;
use Proc::Simple;
use Clone qw(clone);
use Time::HiRes qw(gettimeofday tv_interval);
use Time::Interval;
use Time::Piece::ISO;
use GraphViz;
use Digest::MD5;
use Data::Dumper;

use warnings;
use strict;
our $VERSION = '0.09';

=encoding utf8

=head1 NAME

Makefile::Parallel - A distributed parallel makefile

=head1 SYNOPSIS

This module should not be called directly. Please see the perldoc of
the pmake program on the /examples directory of this distribution.

=cut

# Module Stuff
our @ISA = qw(Exporter);
our %EXPORT_TAGS = ( 'all' => [ qw() ] );
our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } );

our @EXPORT = qw( process_makefile );

my $logger;

my $queue;
my $running   = {}; # Holds the running ID's   (ID -> info)
my $finnished = {}; # Holds the finnished ID's (ID -> info)
my $scheduler;      # Holds the scheduler engine
my $counter   = 0;  # Holds the order of the executed processes
my $filename;       # Holds the filename of the makefile
my $debug;          # TRUE if we got debug enabled

# This stuff deals with the interruption (Ctrl + C)
$SIG{INT}  = \&process_interrupt;
my $interrupted       = 0;

=head1 process_makefile

Main function. Accepts a file to parse and a 
hash reference with options.

TODO: Document options

=cut

sub process_makefile {
    my ($file, $options) = @_;

    # Set sensible defaults
    $options              ||= {};
    $options->{scheduler} ||= 'LOCAL';
    $options->{local}     ||= '1'; # Default CPU's on local mode
    $options->{dump}      ||= 0;
    $options->{clean}     ||= 0;
    $options->{clock}     ||= 10;
    $options->{debug}     ||= 0;
    $options->{continue}  ||= 0;

    # TODO: Give more flexibility
    if($options->{scheduler} eq 'PBS') {
        use Makefile::Parallel::Scheduler::PBS;
        $scheduler = Makefile::Parallel::Scheduler::PBS->new();
        $scheduler->{mail} = $options->{mail} if $options->{mail};
    } else {
        use Makefile::Parallel::Scheduler::Local;
        $scheduler = Makefile::Parallel::Scheduler::Local->new({ max => $options->{local} });
    }

    # Debug settings
    if($options->{debug}) { 
        # Clean logs... ## FIXME - do not rely on OS.
        `rm -rf log/`; mkdir "log";

        my $conf = q(
            log4perl.category.PMake = DEBUG, Logfile, Screen

            log4perl.appender.Logfile = Log::Log4perl::Appender::File
            log4perl.appender.Logfile.filename = log/makefile.log
            log4perl.appender.Logfile.layout = Log::Log4perl::Layout::PatternLayout
            log4perl.appender.Logfile.layout.ConversionPattern = [%d] [%p]	%F(%L) %m%n

            log4perl.appender.Screen = Log::Log4perl::Appender::Screen
            log4perl.appender.Screen.stderr = 0
            log4perl.appender.Screen.layout = Log::Log4perl::Layout::PatternLayout
            log4perl.appender.Screen.layout.ConversionPattern = [%d] %m%n
        );
        Log::Log4perl::init(\$conf);
        $debug = 1;
    }
    else {
        my $conf = q(
            log4perl.category.PMake = INFO, Screen

            log4perl.appender.Screen = Log::Log4perl::Appender::Screen
            log4perl.appender.Screen.stderr = 0
            log4perl.appender.Screen.layout = Log::Log4perl::Layout::PatternLayout
            log4perl.appender.Screen.layout.ConversionPattern = [%d] %m%n
        );
        Log::Log4perl::init(\$conf);
        $debug = 0;
    }
    $logger = Log::Log4perl::get_logger("PMake");

    # Parse the file
    $logger->info("Trying to parse \"$file\"");
    $queue = Makefile::Parallel::Grammar->parseFile($file);

    if($queue) { $logger->info("Parse ok.. proceeding to plan the scheduling"); }
    else {       $logger->error("Parse failed, aborting..."); return }
    $filename = $file;

    # Copy perl routines to perl actions
    if(defined $queue->[-1]{perl}) {
	for my $job (@{$queue}) {
		if(defined $job->{action}[0]{perl}) {
			$job->{perl} = $queue->[-1]{perl};
		}
	}
	delete $queue->[-1];
    }

    # Dump if the user want it
    die Dumper $queue if($options->{dump});

    # Clean the temporary files if we are PBS
    clean() if($options->{clean});

    # Recover the journal if the user wants to continue
    journal_recover() if ($options->{continue});

    # Enter the loop
    while(1) {
        # $logger->debug("New loop starting");
        loop();
        # $logger->debug("Loop processed, sleeping");
        sleep $options->{clock};
    }
}

=head1 journal_recover

Tries to recover the journal of the last makefile run.

=cut

sub journal_recover {
    my $journal = do "$filename.journal" or die "Can't open $filename.journal: $!";

    my $md5 = calc_makefile_md5();
    if($journal->{md5} ne $md5) {
        $logger->warn("MD5 Check Failed... The original Makefile was changed!! CONTINUE AT YOUR OWN RISK!");
    }

    # Restore the finnished list
    $finnished = $journal->{finnished};
    $counter   = $journal->{counter};

    # Ignore jobs already concluded
    # 1a passagem - cálculo das variáveis
    for my $job (@{$queue}) {
      next unless $job;
      if(is_finnished($job->{rule}{id})) {
        # If we got asShell to run, run it!
        find_and_run_asShell($job->{rule}{id});
        # If we got asPerl to run, run it!
        find_and_run_asPerl($job->{rule}{id});
      }
    }

    # 2a passagem - remoção dos já executados
    my $new_queue = [];
    for my $job (@{$queue}) {
      next unless $job;
      push @{$new_queue}, $job unless is_finnished($job->{rule}{id});
    }
    $queue = $new_queue;

    $logger->warn("Journal recovered.. Cross your fingers now..."); 
}

=head1 clean

This function is responsible to clean all the temporary files
created by the PBS system. It should be used only on the PBS scheduler
method.

=cut

sub clean {
    $scheduler->clean($queue);    

    $logger->info("Temporary files cleaned");
    exit(0);
}

=head1 loop

Loop it baby :D

=cut

sub loop {
    reap_dead_bodies();
    dispatch();
    write_journal();
}

=head1 reap_dead_bodies

This function is responsible of reaping the jobs that are
finnished. If the job needs to run something at the end
(example, find i <- grep | awk...) it is executed and the job
queue is expanded.

=cut

sub reap_dead_bodies {
    # Search all running procs for someone who died
    for my $runid (keys %{$running}) {
        if($scheduler->poll($running->{$runid}, $logger)) {
            # Still running
        } else {
            # No running anymore, remove from running and save

            # Save time stats
            my $t1 = [gettimeofday];
            my $elapsed = tv_interval($running->{$runid}->{starttime}, $t1);

            $running->{$runid}->{stoptime} = $t1;

            $elapsed = parseInterval(seconds => int($elapsed), Small => 1);
            $running->{$runid}->{elapsed}  = $elapsed;

            # Give user some feedback
            $logger->info("Process " . $scheduler->get_id($running->{$runid})
                        . " (" . $running->{$runid}->{rule}->{id} 
                        . ") has terminated [$elapsed]");
            $finnished->{$runid} = $running->{$runid};
            delete $running->{$runid};

            # Don't do nothing more if it was interrupted
            next if($finnished->{$runid}{interrupted});

            # Verify the exit status
            $scheduler->get_dead_job_info($finnished->{$runid});
            if($finnished->{$runid}{exitstatus} && !$finnished->{$runid}{interrupted}) {
                # Pumm!! Cancelar tudo!
                $logger->fatal("Process " . $scheduler->get_id($finnished->{$runid}) . " exited
                                with exit status " . $finnished->{$runid}{exitstatus} . "! Aborting
                                all queue...");
                process_interrupt(1);          # Forced;
                $finnished->{$runid}{fatal} = 1; # To graphviz later...
            }

            # If we got asShell to run, run it!
            find_and_run_asShell($runid);
            # If we got asPerl to run, run it!
            find_and_run_asPerl($runid);
        }
    }
}

=head1 find_and_run_asShell

This function goes through the finnished job
and tries to find asShell commands to run, doing
all the expands necessary

=cut

sub find_and_run_asShell {
    my ($runid) = @_;

    for my $action (@{$finnished->{$runid}->{action}}) {
        if($action->{asShell} && !(defined $finnished->{__var__}->{$action->{def}})) {
            $logger->info("Running shell action $action->{asShell}");
            $finnished->{__var__}->{$action->{def}} = [];

            open P, "$action->{asShell} |"; 
            while(<P>) {
                chomp;
                $logger->warn("Return value from the shell action is not a integer") unless /^\d+$/;
                push @{$finnished->{__var__}->{$action->{def}}}, $_;
            }
            close P;

            # Now expand the queue
            expand_forks($action->{def});
        }
    }
}

=head1 find_and_run_asPerl

This function goes through the finnished job
and tries to find asPerl commands to run, doing
all the expands necessary

=cut

sub find_and_run_asPerl {
    my ($runid) = @_;

    for my $action (@{$finnished->{$runid}->{action}}) {
        if($action->{asPerl} && !(defined $finnished->{__var__}->{$action->{def}})) {
            $logger->info("Running perl action $action->{asPerl}");
            $finnished->{__var__}->{$action->{def}} = [];
            $finnished->{__var__}{$action->{def}} = paction_list($action->{asPerl});

            # Now expand the queue
            expand_forks($action->{def});
        }
    }
}

=head1 paction_list

this function evaluates a perl action and retruns a list of strings.
the action can: 

 .return a ARRAY reference, 
 .print a list of lines to STDOUT (to be splited end chomped)
 .or return a string (to be splited and chomped)

=cut

sub paction_list{
  my $act=shift;
  my $var="";
  my $final=[];
  open(A,'>', \$var);
  my $old= select A;
  my $res = eval( "package main; no strict; " . $act );
  die $@ if $@;
  close A;
  select $old;

  if   (ref($res) eq "ARRAY"){ 
      $final = $res; }
  elsif($var =~ /\S/) {
      for(split("\n",$var)){ push (@$final, $_) if /\S/; } }
  else{
      for(split("\n",$res)){ push (@$final, $_) if /\S/; } }

  $final;
}

=head1 expand_forks

This function is responsible of expanding all the jobs
when a variable is evaluated. It expands both forks and 
joins.

=cut

sub expand_forks {
    my ($var)  = @_;
    my $values = $finnished->{__var__}->{$var};

    # For all queue items that has a $var, expand
    my $index = -1;
    for my $job (@{$queue}) {
		$index++;
		next unless $job;

        if($job->{rule}{vars} && (grep { $_ eq "\$$var" } @{$job->{rule}{vars}} )) {
            $logger->info("Found a fork on $job->{rule}->{id}. Expanding...");

            # Expand, expand, expand
			$job->{rule}{vars} = [ grep { $_ ne "\$$var" } @{$job->{rule}{vars} }];	
			delete $job->{rule}{vars} unless scalar @{$job->{rule}{vars}};
			
			delete $queue->[$index];

            my $count = 0;
			my @added_jobs = ();
            for my $index (@{$values}) {
               my $newjob = clone($job);
               $count++;

               # Actualiazr o id
               $newjob->{rule}{id} .= $index;

               # Actualizar a linha a executar
               for my $act (@{$newjob->{action}}) {
                   if($act->{shell}){
                     $act->{shell} =~ s/\$$var\b/$index/g;
                   }
                   elsif($act->{perl}){
                     $act->{perl} =~ s/\$$var\b/$index/g;
                   }
               }

               # Expand pipelines
               for my $dep (@{$newjob->{depend_on}}) {
                   if ($dep->{vars} && (grep { $_ eq "\$$var"} @{$dep->{vars}} )) {
                      # Expand the dependencie
					  $dep->{vars} = [ grep { $_ ne "\$$var" } @{$dep->{vars}} ];
					  delete $dep->{vars} unless scalar @{$dep->{vars}};
                      $dep->{id} .= $index;
                   }
               }
               push @{$queue}, $newjob;
			   push @added_jobs, $newjob->{rule}{id};
            } 
            $logger->info("Expanded.. Created new $count jobs: @added_jobs");
        }

        # Find joiners
        my $pos = 0;
        for my $dep (@{$job->{depend_on}}) {
            if ($dep->{vars} && (grep { $_ eq "\$$var" } @{$dep->{vars}} )) {
	
			   $dep->{vars} = [ grep { $_ ne "\$$var" } @{$dep->{vars}}];
	
               # Expand the dependencies
               delete $job->{depend_on}->[$pos];
               for my $index (@{$values}) {
				    my @vars = (scalar @{$dep->{vars}})?(vars => $dep->{vars}):();
                    push @{$job->{depend_on}}, { @vars, 
                                                 id => $dep->{id} . $index }; 
               }
            }
            $pos++;
        }
    }

    # Now find constructors like @var
    for my $job (@{$queue}) {
        next unless $job;

        # Search on actions
        for my $action (@{$job->{action}}) {
            if($action->{shell} && $action->{shell} =~ /\@$var\b/) {
                my $string = '';
                map { $string .= "$_ " } @{$values};
                $action->{shell} =~ s/\@$var\b/$string/g;
                $logger->info("The job $job->{rule}->{id} has been action expanded with $string");
            }
            elsif($action->{perl} && $action->{perl} =~ /\@$var\b/) {
                my $string = join(",", map { "q{$_}" } @{$values});
                $action->{perl} =~ s/\@$var\b/($string)/g;
                $logger->info("The job $job->{rule}->{id} has been action expanded with ($string)");
            }
        }

        # Search on asShell
        for my $action (@{$job->{action}}) {
            if($action->{asShell} && $action->{asShell} =~ /\@$var\b/) {
                my $string = '';
                map { $string .= "$_ " } @{$values};
                $action->{asShell} =~ s/\@$var\b/$string/g;
                $logger->info("The job $job->{rule}->{id} has been shell expanded with $string");
            }
        }

        # Search on asPerl
        for my $action (@{$job->{action}}) {
            if($action->{asPerl} && $action->{asPerl} =~ /\@$var\b/) {
                my $string = 'qw/';
                map { $string .= "$_ " } @{$values};
                $string .= "/";

                $action->{asPerl} =~ s/\@$var\b/$string/g;
                $logger->info("The job $job->{rule}->{id} has been Perl expanded with $string");
            }
        }
    }
}

=head1 report

Print a pretty report bla bla bla

=cut

sub report {

    $logger->info("Creating HTML report");
    open REPORT, ">$filename.html" or die "Can't create $filename.html";

    print REPORT "<table>\n";
    print REPORT "<tr><td>ID</td><td>Start Time</td><td>End Time</td><td>Elapsed</td></tr>\n";

    my ($id,$start,$stop,$interval);
    for my $job (sort sortcallback keys %{$finnished}) {
       next unless $finnished->{$job}{rule};

       $id       = $finnished->{$job}{rule}{id};
       $start    = (localtime($finnished->{$job}{starttime}[0]))->iso;
       $stop     = (localtime($finnished->{$job}{stoptime}[0]))->iso;
       $interval = $finnished->{$job}{realtime} || $finnished->{$job}{elapsed};
       print REPORT "<tr><td>$id</td><td>$start</td><td>$stop</td><td>$interval</td></tr>\n";
    }

    print REPORT "</table>\n";
    close REPORT;
}

sub sortcallback {
    my $foo = $finnished->{$a};
    my $bar = $finnished->{$b};

    return  0 if(!$foo->{order} && !$bar->{order});
    return -1 unless $foo->{order};
    return  1 unless $bar->{order};

    return $foo->{order} <=> $bar->{order};
}

=head1 dispatch

This function is responsible for dispatching the jobs that can run.

=cut

sub dispatch {
    my $new_queue = [];

    # If we aren't running nothing and ($interrupted || $queue empty) exit
    if((scalar keys %{$running}) == 0 && ($interrupted || (scalar @{$queue} == 0))) {
        $logger->info("Terminating the pipeline");
        at_exit();
    }

    # We don't wanna dispatch NOTHING if we have interrupted
    return if $interrupted;

    for my $job (@{$queue}) {
        next unless $job;

        # Find if the job dependencies are finnished
        if(can_run_job($job->{rule}->{id}, $job->{depend_on})) {

            $logger->info(Dumper($job)) unless $job->{rule}{id};

            $logger->info("The job \"" . $job->{rule}->{id} . "\" is ready to run. Launching");
            launch($job);
            $job->{starttime} = [gettimeofday];

            # Jump to the next job in queue
            next;
        } 

        # This job can't run yet.. add it to the new queue
        push @{$new_queue}, $job;
    } 

    # Return the new queue, the jobs that can't be dispatched yet
    $queue = $new_queue;
}

=head1 is_finnished

This function checks if the specified job is already done in
the finnished list.

=cut

sub is_finnished {
  my ($jobid) = @_;
  for my $job (keys %{$finnished}) {
    next unless $finnished->{$job}{rule};
    return 1 if($finnished->{$job}{rule}{id} eq $jobid);
  }
  return 0;
}

=head1 at_exit

This sub is called at the program exit

=cut

sub at_exit {
    graphviz();
    report();
    write_journal();

    exit(0);
}

=head1 write_journal

Saves the scheduler state to disk.

=cut

sub write_journal {
    my $journal = {};
    $journal->{md5} = calc_makefile_md5();

    # Pass all interrupted and failled processes back to queue
    my $acabados = clone($finnished);
    for my $job (keys %{$acabados}) {
        next unless $acabados->{$job}{rule};

        if($acabados->{$job}{fatal} || $acabados->{$job}{interrupted}) {
            delete $acabados->{$job};
        }
    }
    delete $acabados->{__var__};

    $journal->{finnished} = $acabados;
    $journal->{counter}   = $counter;

    open F, ">$filename.journal";
    print F (Dumper $journal);
    close F;
}

=head1 calc_makefile_md5

Calculates the MD5 of the current makefile

=cut

sub calc_makefile_md5 {
    open F, "<$filename";
    my $ctx = Digest::MD5->new;
    $ctx->addfile(*F);
    close F;

    return $ctx->b64digest;
}

=head1 can_run_jub

This one finds out if a job can run (all the dependencies are met).

=cut

sub can_run_job {
    my ($id, $deps) = @_;

    return 0 unless $scheduler->can_run();

    for my $dep (@{$deps}) {
        next unless $dep;
        next unless $dep->{id};
        return 0 unless defined $finnished->{$dep->{id}}
    }

    return 1;
}

=head1 launch

Launch a process (really??)

=cut

sub launch {
    my ($job) = @_;

    # Launch the process
    $scheduler->launch($job, $debug);
    $job->{order} = $counter++;

    # Save in the running list
    $running->{$job->{rule}->{id}} = $job;
    $logger->info("Launched \"" . $job->{rule}->{id} . "\" (" . $scheduler->get_id($job) . ")");
}

=head1 graphviz

Builds a preety graphviz file after the execution of the makefile

=cut

sub graphviz {
    my $time_for = {}; # Holds the walltime for the job id

    my $g = GraphViz->new(rankdir => 1);

    $logger->info("Creating GraphViz nodes");
    # Create all nodes
    for my $job (keys %{$finnished}) {
        next unless $finnished->{$job}{rule};

        my $id = $finnished->{$job}{rule}{id};
        $time_for->{$id} = $finnished->{$job}{realtime} || $finnished->{$job}{elapsed};

        my $color = 'black';
        $color = 'red' if $finnished->{$job}{fatal};
        $color = 'yellow' if $finnished->{$job}{interrupted};

        $g->add_node($id, label => "$id\n$time_for->{$id}"
                        , shape => 'box', color => $color);
    }

    $logger->info("Creating GraphViz edges");
    # Create edges
    for my $job (keys %{$finnished}) {
        next unless $finnished->{$job}{rule};

        for my $dep (@{$finnished->{$job}{depend_on}}) {
            next unless $dep;

            $g->add_edge($dep->{id}, $finnished->{$job}{rule}{id});
        }
    }

    open F, ">$filename.ps";
    print F $g->as_ps;
    close F;

    open F, ">$filename.dot";
    print F $g->as_text;
    close F;

    $logger->info("GraphViz file created on $filename.ps");
}

=head1 process_interrupt

This function is called everytime the user send a SIGINT to this process. 
The objective is to kill all the running processes and wait for them to die.

=cut

sub process_interrupt {
    my $forced = shift;
    $forced = 0 if $forced eq "INT"; # Hack O:-)

    if(!$interrupted || $forced) {
        if(!$forced) {
            $logger->warn("Interrupt pressed, enter QUIT to quit, other thing to continue");
            my $linha = <STDIN>;
            chomp($linha);

            if($linha ne 'QUIT') {
                $logger->info("Interrupt canceled... Keeping the loop");
                return;
            }
        }

        $interrupted = 1;
        $logger->info("Interrupt pressed, cleaning all the running processes");

        for my $runid (keys %{$running}) {
            $logger->info("Terminating job " . $scheduler->get_id($running->{$runid}));
            $running->{$runid}{interrupted} = 1;
            $scheduler->interrupt($running->{$runid});
        }
   } else {
      $logger->warn("Interrupt already called, please wait while cleaning");
   }
}


=head1 AUTHOR

Ruben Fonseca, C<< <root@cpan.org> >>

Alberto Simões C<< <ambs@cpan.org> >>

José João Almeida C<< <jj@di.uminho.pt> >>

=head1 BUGS

Please report any bugs or feature requests to
C<bug-makefile-parallel@rt.cpan.org>, or through the web interface at
L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Makefile-Parallel>.
I will be notified, and then you'll automatically be notified of progress on
your bug as I make changes.

=head1 COPYRIGHT & LICENSE

Copyright 2006-2011 Ruben Fonseca, et al, all rights reserved.

This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.

=cut

1; # End of Makefile::Parallel