package MyCPAN::Indexer::Worker;
use strict;
use warnings;
use parent qw(MyCPAN::Indexer::Component);
use vars qw($VERSION $logger);
$VERSION = '1.28_12';
use Cwd;
use File::Basename;
use File::Spec::Functions qw(catfile);
use Log::Log4perl;
use MyCPAN::Indexer;
use Proc::ProcessTable;
use YAML;
=head1 NAME
MyCPAN::Indexer::Worker - Do the indexing
=head1 SYNOPSIS
Use this in backpan_indexer.pl by specifying it as the queue class:
# in backpan_indexer.config
worker_class MyCPAN::Indexer::Worker
=head1 DESCRIPTION
This class takes a distribution and analyses it. This is what the dispatcher
hands a disribution to for the actual indexing.
=head2 Methods
=over 4
=item get_task
C<get_task> sets the C<child_task> key in the notes. The
value is a code reference that takes a distribution path as its only
argument and indexes that distribution.
See L<MyCPAN::Indexer::Tutorial> for details about what C<get_task> expects
and should do.
=cut
BEGIN {
$logger = Log::Log4perl->get_logger( 'Worker' );
}
=item component_type
This is a worker component.
=cut
sub component_type { $_[0]->worker_type }
=item get_task
=cut
sub get_task
{
my( $self ) = @_;
my $config = $self->get_config;
my $coordinator = $self->get_coordinator;
my $indexer = $coordinator->get_component( 'indexer' );
$logger->debug( "Worker class is " . __PACKAGE__ );
$logger->debug( "Indexer class is " . $indexer->class );
my $child_task = sub {
my $dist = shift;
my $dist_basename = basename( $dist );
my $basename = $coordinator->get_reporter->check_for_previous_successful_result( $dist );
$logger->debug( "Found successful report for $dist_basename" ) unless $basename;
return bless {
dist_info => {
dist_path => $dist,
dist_basename => $dist_basename
},
skipped => 1,
}, $indexer->class unless $basename;
my $previous_error_basename = $coordinator->get_reporter->check_for_previous_error_result( $dist ) || '';
$logger->debug( "Error report returned [$previous_error_basename]" );
$logger->debug( "Found error report for $dist_basename" ) if $previous_error_basename;
# we used to handle this by just deleting all the old error
# reports in setup_dirs over in MyCPAN::App::BackPAN::Indexer
# deleting all the reports before we got started made it
# impossible to get a list of error reports to retry
if( $previous_error_basename and ! $config->retry_errors )
{
$logger->debug( "By config, skipping $dist because I'm not retrying errors" );
return bless {
dist_info => {
dist_path => $dist,
dist_basename => $dist_basename,
},
skip_error => 1,
}, $self->get_config->indexer_class;
}
elsif( $previous_error_basename and $config->retry_errors )
{
# if we are re-trying errors and there is already a report
# unlink the previous report
my $report_full_path = $coordinator->get_reporter->get_error_report_path( $dist );
$logger->debug( "Trying to unlink $report_full_path" );
my $rc = unlink $report_full_path;
$logger->debug( ($rc ? 'unlinked ' : 'failed to unlink ') . $report_full_path );
}
$logger->info( "Starting Worker for $dist_basename\n" );
my $starting_dir = cwd();
unless( chdir $config->temp_dir )
{
$logger->error( "Could not change to " . $config->temp_dir . " : $!\n" );
exit 255;
}
$logger->debug( sprintf "Setting alarm for %d seconds", $config->alarm );
local $SIG{ALRM} = sub {
$logger->info( "Alarm rang for $dist_basename in process $$!\n" );
$self->_cleanup_children;
$logger->info( "Cleaned up" );
die;
};
local $SIG{CHLD} = 'IGNORE';
alarm( $config->alarm || 15 );
$logger->debug( "Examining $dist_basename" );
my $info = do {
unless( -e $dist )
{
$logger->warn( "Dist $dist does not exist" );
undef;
}
elsif( ! -s $dist )
{
$logger->warn( "Dist $dist has zero size" );
my $info = bless {}, $self->get_config->indexer_class;
$info->setup_dist_info( $dist );
$info->set_dist_info( 'unindexable', 'zero size' );
$info->setup_run_info;
$info->set_run_info( qw(completed 1) );
$info;
}
else
{
$logger->warn( "Indexing $dist" );
eval { $indexer->run( $dist ) };
}
};
$logger->debug( "Done examining $dist_basename" );
my $at = $@; chomp $at;
alarm 0;
chdir $starting_dir;
unless( defined $info )
{
$logger->error( "Indexing failed for $dist_basename: $at" );
$info = bless {}, $self->get_config->indexer_class; # XXX TODO make this a real class
$info->setup_dist_info( $dist );
$info->setup_run_info;
$info->set_run_info( qw(completed 0) );
$info->set_run_info( error => $at );
}
elsif( ! eval { $info->run_info( 'completed' ) } )
{
$logger->error( "Indexing for $dist_basename did not complete" );
$self->_copy_bad_dist( $info ) if $config->copy_bad_dists;
}
$self->_add_run_info( $info );
$coordinator->get_note('reporter')->( $info );
$logger->debug( "Worker for $dist_basename done" );
# some things hang anyway, so just to be careful we'll cleanup
# everything here.
$self->_cleanup_children;
$logger->debug( "Cleaned up, returning..." );
$info;
};
$coordinator->set_note( 'child_task', $child_task );
1;
}
sub _cleanup_children
{
$logger->warn( "Cleaning up after $$" );
my %children =
map { $_->{pid}, 1 }
grep { $_->{'ppid'} == $$ }
@{ Proc::ProcessTable->new->table };
$logger->debug( "Child processes are @{[keys %children]}" );
my @grandchildren =
map { $_->{pid} }
grep { exists $children{ $_->{'ppid'} } }
@{ Proc::ProcessTable->new->table };
$logger->debug( "Grandchild processes are @grandchildren" );
my @processes = ( keys %children, @grandchildren );
$logger->debug( "There are " . @processes . " processes to clean up" );
return unless @processes;
$logger->debug( "Preparing to kill" );
kill 9, @processes;
return;
}
sub _copy_bad_dist
{
my( $self, $info ) = @_;
my $config = $self->get_config;
my $bad_dist_dir = $config->copy_bad_dists;
return unless $bad_dist_dir;
unless( -d $bad_dist_dir and mkdir $bad_dist_dir ) {
$logger->error( "Could not make dist dir [$bad_dist_dir]: $!" );
return;
}
my $dist_file = $info->dist_info( 'dist_file' );
my $basename = $info->dist_info( 'dist_basename' );
my $new_name = catfile( $bad_dist_dir, $basename );
unless( -e $new_name )
{
$logger->debug( "Copying bad dist" );
my( $in, $out );
unless( open $out, ">", $new_name )
{
$logger->fatal( "Could not copy bad dist to $new_name: $!" );
return;
}
unless( open $in, "<", $dist_file )
{
$logger->fatal( "Could not open bad dist to $dist_file: $!" );
return;
}
while( <$in> ) { print { $out } $_ }
close $in;
close $out;
}
}
sub _add_run_info
{
my( $self, $info ) = @_;
my $config = $self->get_config;
return unless eval { $info->can( 'set_run_info' ) };
$info->set_run_info( $_, $config->get( $_ ) )
foreach ( $config->directives );
$info->set_run_info( 'uuid', $self->get_note( 'UUID' ) );
$info->set_run_info( 'child_pid', $$ );
$info->set_run_info( 'parent_pid', eval { $config->indexer_class->getppid } );
$info->set_run_info( 'ENV', \%ENV );
return 1;
}
=back
=head1 SEE ALSO
MyCPAN::Indexer, MyCPAN::Indexer::Tutorial
=head1 SOURCE AVAILABILITY
This code is in Github:
git://github.com/briandfoy/mycpan-indexer.git
=head1 AUTHOR
brian d foy, C<< <bdfoy@cpan.org> >>
=head1 COPYRIGHT AND LICENSE
Copyright (c) 2008-2013, brian d foy, All Rights Reserved.
You may redistribute this under the same terms as Perl itself.
=cut
1;