The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package App::Netdisco::Daemon::Worker::Manager;

use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';

use Net::Domain 'hostfqdn';
use Try::Tiny;

use Role::Tiny;
use namespace::clean;

my $fqdn = hostfqdn || 'localhost';

my $role_map = {
  (map {$_ => 'Poller'}
      qw/discoverall discover arpwalk arpnip macwalk macsuck/),
  (map {$_ => 'Interactive'}
      qw/location contact portcontrol portname vlan power/)
};

sub worker_begin {
  my $self = shift;
  my $wid = $self->wid;
  debug "entering Manager ($wid) worker_begin()";

  # requeue jobs locally
  debug "mgr ($wid): searching for jobs booked to this processing node";
  my $rs = schema('netdisco')->resultset('Admin')
    ->search({status => "queued-$fqdn"});

  my @jobs = map {{$_->get_columns}} $rs->all;

  if (scalar @jobs) {
      info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs;
      map { $_->{role} = $role_map->{$_->{action}} } @jobs;

      $self->do('add_jobs', \@jobs);
  }
}

sub worker_body {
  my $self = shift;
  my $wid = $self->wid;
  my $num_slots = $self->do('num_workers')
    or return debug "mgr ($wid): this node has no workers... quitting manager";

  # get some pending jobs
  my $rs = schema('netdisco')->resultset('Admin')
    ->search(
      {status => 'queued'},
      {order_by => 'random()', rows => $num_slots},
    );

  while (1) {
      debug "mgr ($wid): getting potential jobs for $num_slots workers";
      while (my $job = $rs->next) {
          my $jid = $job->job;

          # check for available local capacity
          next unless $self->do('capacity_for', $job->action);
          debug sprintf "mgr (%s): processing node has capacity for job %s (%s)",
            $wid, $jid, $job->action;

          # mark job as running
          next unless $self->lock_job($job);
          info sprintf "mgr (%s): job %s booked out for this processing node",
            $wid, $jid;

          my $local_job = { $job->get_columns };
          $local_job->{role} = $role_map->{$job->action};

          # copy job to local queue
          $self->do('add_jobs', [$local_job]);
      }

      # reset iterator so ->next() triggers another DB query
      $rs->reset;

      # TODO also check for stale jobs in Netdisco DB

      debug "mgr ($wid): sleeping now...";
      sleep( setting('workers')->{sleep_time} || 2 );
  }
}

sub lock_job {
  my ($self, $job) = @_;
  my $happy = 0;

  # lock db row and update to show job has been picked
  try {
      schema('netdisco')->txn_do(sub {
          schema('netdisco')->resultset('Admin')->find(
            {job => $job->job, status => 'queued'},
            {for => 'update'}
          )->update({ status => "queued-$fqdn" });
      });
      $happy = 1;
  };

  return $happy;
}

1;