The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package App::Netdisco::Daemon::Worker::Manager;

use Dancer qw/:moose :syntax :script/;

use Role::Tiny;
use namespace::clean;

use List::Util 'sum';
use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_lock/;

sub worker_begin {
  my $self = shift;
  my $wid = $self->wid;
  debug "entering Manager ($wid) worker_begin()";

  if (setting('workers')->{'no_manager'}) {
      return debug "mgr ($wid): no need for manager... skip begin";
  }

  # requeue jobs locally
  debug "mgr ($wid): searching for jobs booked to this processing node";
  my @jobs = jq_locked;

  if (scalar @jobs) {
      info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs;
      $self->do('add_jobs', @jobs);
  }
}

sub worker_body {
  my $self = shift;
  my $wid = $self->wid;

  return debug "mgr ($wid): no need for manager... quitting"
    if setting('workers')->{'no_manager'};

  my $num_slots = sum( 0, map { setting('workers')->{$_} }
                              values %{setting('job_type_keys')} );

  while (1) {
      debug "mgr ($wid): getting potential jobs for $num_slots workers";

      # get some pending jobs
      # TODO also check for stale jobs in Netdisco DB
      foreach my $job ( jq_getsome($num_slots) ) {

          # check for available local capacity
          my $job_type = setting('job_types')->{$job->action};
          next unless $job_type and $self->do('capacity_for', $job_type);
          debug sprintf "mgr (%s): processing node has capacity for job %s (%s)",
            $wid, $job->id, $job->action;

          # mark job as running
          next unless jq_lock($job);
          info sprintf "mgr (%s): job %s booked out for this processing node",
            $wid, $job->id;

          # copy job to local queue
          $self->do('add_jobs', $job);
      }

      debug "mgr ($wid): sleeping now...";
      sleep( setting('workers')->{sleep_time} || 2 );
  }
}

1;