The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Bio::Grid::Run::SGE::Master;

use Mouse;

use warnings;
use strict;
use Storable qw/nstore retrieve/;
use File::Temp qw/tempdir/;
use File::Spec;
use Config::Auto;
use Data::Dumper;
use Bio::Grid::Run::SGE::Index;
use Bio::Grid::Run::SGE::Iterator;
use Bio::Grid::Run::SGE::Util qw/my_glob my_sys expand_path my_mkdir expand_path_rel/;
use Cwd qw/fastcwd/;
use Clone qw/clone/;
use Data::Printer colored => 1, use_prototypes => 0;
use Bio::Gonzales::Util::Cerial;
use Config;
use FindBin;

our $VERSION = 0.01_01;
our $RC_FILE = "$ENV{HOME}/.bio-grid-run-sge.conf";

override 'BUILDARGS' => sub {
  my ($self) = @_;
  my $a = super();

  my $c;
  $c = eval { Config::Auto::parse($RC_FILE) } if ( $RC_FILE && -f $RC_FILE );
  if ( $c && !$@ ) {
    print STDERR "Using config from " . $RC_FILE . "\n";
    $a = { %{$c}, %{$a} };
  }

  if ( exists( $a->{'config'} ) ) {
    $c = eval { Config::Auto::parse( $a->{'config'} ) };
    unless ($@) {
      print STDERR "Using config from " . $a->{'config'} . "\n";
      $a = { %{$c}, %{$a} };
    }
  }
  if ( exists( $a->{method} ) && !exists( $a->{mode} ) ) {
    warn "The configuration option 'method' is DEPRECATED, use 'mode' instead.";
    $a->{mode}   = $a->{method};
    $a->{method} = "DEPRECATED: The configuration option 'method' is DEPRECATED, use 'mode' instead.";
  }
  return $a;
};

has 'cmd' => ( is => 'rw', required => 1, isa => 'ArrayRef[Str]' );
has 'no_post_task' => ( is => 'rw' );

has 'tmp_dir'    => ( is => 'rw', lazy_build => 1 );
has 'stderr_dir' => ( is => 'rw', lazy_build => 1 );
has 'stdout_dir' => ( is => 'rw', lazy_build => 1 );
has 'result_dir' => ( is => 'rw', lazy_build => 1 );
has 'log_dir'    => ( is => 'rw', lazy_build => 1 );
has 'idx_dir'    => ( is => 'rw', lazy_build => 1 );
has 'test'       => ( is => 'rw' );
has 'mail'       => ( is => 'rw' );
has 'smtp_server' => ( is => 'rw' );
has 'no_prompt'   => ( is => 'rw' );
has 'lib'         => ( is => 'rw' );
has 'script_dir' => ( is => 'ro', default => $FindBin::Bin);

has 'input' => ( is => 'rw', isa => 'ArrayRef', default => sub { [] } );

has 'extra' => ( is => 'rw', default => sub { {} } );

# one can supply parts or combinations per job
has 'parts' => ( is => 'rw', default => 0 );
has 'combinations_per_job' => ( is => 'rw' );

has 'job_name' => ( is => 'rw', default => 'cluster_job' );
has 'job_id' => ( is => 'rw' );

has 'mode' => ( is => 'rw', default => 'None' );

has '_worker_config_file' => ( is => 'rw', lazy_build => 1 );
has '_worker_env_script'  => ( is => 'rw', lazy_build => 1 );
has 'submit_bin'          => ( is => 'rw', default    => 'qsub' );
has 'submit_params'       => ( is => 'rw', default    => sub { [] }, isa => 'ArrayRef[Str]' );
has 'perl_bin'            => ( is => 'rw', default    => $Config{perlpath} );
has 'working_dir'         => ( is => 'rw', default    => '.' );
has 'prefix_output_dirs' => ( is => 'rw' );

# arguments for the cluster script
has 'args' => ( is => 'rw', isa => 'ArrayRef[Str]', default => sub { [] } );

has 'iterator' => ( is => 'rw', lazy_build => 1 );

sub num_slots { return shift->parts(@_) }

sub _build_log_dir {
  my ($self) = @_;

  return File::Spec->catfile( $self->tmp_dir(), 'log' );
}

sub _build_stderr_dir {
  my ($self) = @_;

  return File::Spec->catfile( $self->tmp_dir, 'err' );
}

sub _build_stdout_dir {
  my ($self) = @_;

  return File::Spec->catfile( $self->tmp_dir, 'out' );
}

sub _build_idx_dir {
  my ($self) = @_;

  return File::Spec->catfile( $self->working_dir, 'idx' );
}

sub _build_tmp_dir {
  my ($self) = @_;

  my $name = 'tmp';

  $name = join ".", $self->_job_name_stripped, $name
    if ( $self->prefix_output_dirs );

  return File::Spec->catfile( $self->working_dir, $name );
}

sub _job_name_stripped {
  my $self = shift;
  ( my $jn = $self->job_name ) =~ y/-0-9A-Za-z_./_/csd;
  return $jn;
}

sub _build_result_dir {
  my ($self) = @_;

  my $name = 'result';

  $name = join ".", $self->_job_name_stripped, $name
    if ( $self->prefix_output_dirs );

  return File::Spec->catfile( $self->working_dir, $name );
}

sub BUILD {
  my ( $self, $args ) = @_;

  #confess "No input given" unless ( @{ $self->input } > 0 );

  for my $i ( @{ $self->input } ) {
    #merge different namings to one std. naming: elements
    for my $key (qw/list files/) {
      $i->{elements} = delete $i->{$key} if ( exists( $i->{$key} ) && @{ $i->{$key} } > 0 );

    }

    confess "No input given" unless ( exists( $i->{elements} ) && @{ $i->{elements} } );
  }

  $self->perl_bin( expand_path_rel( $self->perl_bin ) );
  confess "working dir not correct " . $self->working_dir unless ( -d $self->working_dir );

  my $curdir = fastcwd;
  #chdir $self->working_dir;
  for my $d (qw/working_dir log_dir stderr_dir stdout_dir result_dir tmp_dir idx_dir/) {
    $self->$d( expand_path( $self->$d ) );
    unless ( -d $self->$d ) {
      my_mkdir( $self->$d );
    }
  }
  chdir $curdir;

  my $m = __PACKAGE__->meta;

  my %extra = %{$args};
  my %attrs = map { $_->name => 1 } $m->get_all_attributes;
  for my $k ( keys %extra ) {
    delete $extra{$k} if ( $attrs{$k} );
  }

  $self->extra( { %extra, %{ $self->extra } } );
}

sub _unknown_attrs_to_extra {
  my ( $self, $c ) = @_;
  my $m = __PACKAGE__->meta;

  $c->{extra} //= {};
  my %attrs = map { $_->name => 1 } $m->get_all_attributes;
  for my $k ( keys %$c ) {
    unless ( exists( $attrs{$k} ) ) {
      $c->{extra}{$k} = delete $c->{$k};
    }
  }

  return $c;
}

sub to_string {
  my ($self) = @_;

  $self->_prepare;
  my %c = %{$self};
  delete $c{iterator};
  $c{input} = clone( $self->input );

  for my $in ( @{ $c{input} } ) {
    if ( @{ $in->{elements} } > 10 ) {
      my @elements;
      push @elements, @{ $in->{elements} }[ 0 .. 4 ];
      push @elements, '...';
      push @elements, @{ $in->{elements} }[ -5 .. -1 ];
      $in->{elements} = \@elements;
    }
  }
  $c{parts} = $self->_calculate_number_of_parts;
  my $string = p \%c;

  return "CONFIGURATION:\n" . $string;
}

sub _prepare {
  my ($self) = @_;
  $self->_worker_config_file;
  $self->iterator;
}

sub _build__worker_config_file {
  my $self = shift;
  return File::Spec->catfile( $self->tmp_dir, join( '', $self->job_name, '.config.dat' ) );
}

sub _build__worker_env_script {
  my $self = shift;
  return File::Spec->catfile( $self->tmp_dir, join( '.', 'env',$self->job_name, 'pl' ) );
}

sub generate_idx_file_name {
  my ( $self, $suffix ) = @_;
  return File::Spec->catfile( $self->idx_dir, join( '.', ( $self->job_name, $suffix, 'idx' ) ) );
}

sub _build_iterator {
  my ($self) = @_;

  my @indices;

  my $i = 0;
  for my $in ( @{ $self->input } ) {
    $in->{idx_file} = $self->generate_idx_file_name( $i++ );
    push @indices, Bio::Grid::Run::SGE::Index->new( %{$in}, writeable => 1 );
    $indices[-1]->create( $in->{elements} );
  }

  # create iterator
  my $iter = Bio::Grid::Run::SGE::Iterator->new( mode => $self->mode, indices => \@indices, );
  return $iter;
}

sub run {
  my ($self) = @_;

  $self->_prepare;

  my $tmp_dir     = $self->tmp_dir;
  my $config_file = $self->_worker_config_file;

  my ( $cmd_args, $c ) = $self->cache_config($config_file);
  my $cmd = join ' ', @$cmd_args;

  say STDERR "Running: " . $cmd;

  my $res = qx/$cmd/;
  $res =~ /^Your\s*job(-array)?\s*(\d+)/;
  $self->job_id($2);

  my $tmp_job_id = -1;
  $tmp_job_id = $2 if defined $2;

  open my $main_fh, '>',
    File::Spec->catfile( $self->log_dir, sprintf( "main.%s.j%s.cmd", $self->job_name, $tmp_job_id ) )
    or confess "Can't open filehandle: $!";
  print $main_fh "cd '" . fastcwd . "' && " . $cmd, "\n";
  close $main_fh;
  $self->queue_post_task($config_file) if ( $self->job_id );

  return { config => $c, command => $cmd_args };
}

sub _calculate_number_of_parts {
  my ($self) = @_;

  my $iter = $self->iterator;
  if ( !$self->parts || $self->parts > $iter->num_comb ) {
    if ( $self->combinations_per_job && $self->combinations_per_job > 1 ) {
      my $parts = int( $iter->num_comb / $self->combinations_per_job );

      #we have a rest, so one part more
      $parts++
        if ( $parts * $self->combinations_per_job < $iter->num_comb );

      return $parts;
    } else {
      return $iter->num_comb;
    }
  }
  return $self->parts;
}

sub cache_config {
  my ( $self, $config_file ) = @_;

  $self->_prepare;

  my $iter = $self->iterator;

  $self->parts( $self->_calculate_number_of_parts );

  my %c = ( %{$self}, num_comb => $iter->num_comb, extra => $self->extra );
  delete $c{iterator};

  my ( $from, $to ) = ( 1, $self->parts );
  $to = $self->test if ( $self->test && $self->test > 0 && $to > 7 );

  my @cmd = ( $self->submit_bin );
  push @cmd, '-t', "$from-$to";
  push @cmd, '-S', $self->perl_bin;
  push @cmd, '-N', $self->job_name;
  push @cmd, '-e', $self->stderr_dir;
  push @cmd, '-o', $self->stdout_dir;
  push @cmd, @{ $self->submit_params };

  my $worker_env_script_cmd = $self->prepare_worker_env_script($config_file);
  push @cmd, $worker_env_script_cmd, @{ $self->cmd }, '--worker', $config_file;

  my $cmd = join ' ', @cmd;
  $c{job_cmd} = $cmd;
  $c{range} = [ $from, $to ];

  nstore \%c, $config_file;

  return ( \@cmd, \%c );
}

sub prepare_worker_env_script {
  my ( $self, $config_file ) = @_;

  open my $fh, '>', $self->_worker_env_script or confess "Can't open filehandle: $!";
  print $fh <<EOS;
#!/usr/bin/env perl
use warnings;
use strict;

EOS

  if ( exists $ENV{PERL5LIB} ) {
    my @inc_dirs = split( /\Q$Config{path_sep}\E/, $ENV{PERL5LIB} );
    print $fh "use lib ('" . join( "','", @inc_dirs ) . "');\n"
      if ( @inc_dirs && @inc_dirs > 0 );
  }

  print $fh 'my $cmd = shift;',                        "\n";
  print $fh 'unless ( my $return = do $cmd ) {',       "\n";
  print $fh '  warn "could not parse $cmd $@" if $@;', "\n";
  # not necessary
  print $fh '  warn "could not do $cmd $!" unless defined $return;', "\n";
  print $fh '  warn "could not run $cmd" unless $return;',           "\n";
  print $fh '}',                                                     "\n";
  print $fh 'exit;',                                                 "\n";
  close $fh;

  return $self->_worker_env_script;
}

sub queue_post_task {
  my ( $self, $config_file ) = @_;

  my @cmd = ( $self->submit_bin );
  push @cmd, '-S', $self->perl_bin;
  push @cmd, '-N', join( '_', 'p' . $self->job_id, $self->job_name );
  push @cmd, '-e', $self->stderr_dir;
  push @cmd, '-o', $self->stdout_dir;

  my @hold_arg = ( '-hold_jid', $self->job_id );

  #push @cmd, @{ $self->submit_params };

  my @post_cmd = ( $self->_worker_env_script, @{ $self->cmd }, '--post_task', $self->job_id, $config_file );

  $self->save_config;
  say STDERR "post processing: " . join( " ", @cmd, @hold_arg, @post_cmd );

  my $job_id = $self->job_id;
  my $post_cmd_file
    = File::Spec->catfile( $self->tmp_dir, sprintf( "post.%s.j%s.cmd", $self->job_name, $self->job_id ) );

  open my $post_fh, '>', $post_cmd_file or confess "Can't open filehandle: $!";
  print $post_fh join( " ", "cd", "'" . fastcwd . "'", '&&', @cmd, @post_cmd ), "\n";
  close $post_fh;

  chmod 0755, $post_cmd_file;

  my_sys( @cmd, @hold_arg, @post_cmd );

  return;
}

sub save_config {
  my ($self) = @_;

  my $cfg_save
    = File::Spec->catfile( $self->result_dir, sprintf( "%s.j%s.config", $self->job_name, $self->job_id ) );

  say STDERR "Saving config to " . $cfg_save;
  open my $cfg_fh, '>', $cfg_save
    or confess "Can't open filehandle: $!";
  print $cfg_fh Dumper($self);
  close $cfg_fh;

  return;
}

1;

__END__

=head1 NAME



=head1 SYNOPSIS

  #wenn export, dann hier im qw()

=head1 DESCRIPTION

=over 4

=item B<< combinations_per_job >>

=item B<< parts >>

=back


=head1 OPTIONS

=head1 SUBROUTINES
=head1 METHODS

=head1 SEE ALSO

=head1 AUTHOR

jw bargsten, C<< <jwb at cpan dot org> >>

=cut