The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# $Id: /mirror/perl/Swarmage/trunk/lib/Swarmage/Drone.pm 39735 2008-01-23T02:57:57.044329Z daisuke  $
#
# Copyright (c) 2007-2008 Daisuke Maki <daisuke@endeworks.jp>
# All rights reserved.

package Swarmage::Drone;
use strict;
use warnings;
use base qw(Class::Accessor::Fast);
use DBI;
use Log::Dispatch;
use Log::Dispatch::Handle;
use IO::Handle;
use POE ;
use Swarmage::Queue::Generic;
use Swarmage::Queue::Local;
use Swarmage::Task;
use Swarmage::Util;
use Swarmage::Worker;

use constant DEBUG => 0;

__PACKAGE__->mk_accessors($_) for qw(config alias delay log queue local_queue workers max_tasks buffered_tasks task_types);

sub new
{
    my $class = shift;
    my %args  = @_;

    my $alias = $args{alias} || do {
        require Sys::Hostname;
        join('-', Sys::Hostname::hostname(), $$, rand())
    } || die;

    my $self = bless {
        config         => \%args,
        alias          => $alias,
        workers        => [],
        max_tasks      => {},
        buffered_tasks => {},
        task_types     => {},
        delay          => 2,
    }, $class;
    $self->setup_log();

    POE::Session->create(
        heap => {
            shutdown => 0,
        },
        object_states => [
            $self, {
                _start => '_poe_start',
                map { ($_ => "_poe_$_") } qw(
                    spawn_queue
                    pump_queue
                    pump_worker_queue
                    buffer_work
                    monitor
                )
            }
        ]
    );
    return $self;
}

sub setup_log
{
    my $self = shift;
    my $log = Log::Dispatch->new(
        callbacks => sub {
            my %args = @_;
            my $message = $args{message};
            $message =~ s/(?!\n)\Z/\n/;
            $message = "[$args{level}:$$]: $message";
            return $message;
        }
    );
    my $stderr = Log::Dispatch::Handle->new(
        name      => 'stderr',
        min_level => DEBUG ? 'debug' : 'info',
        handle    => do {
            my $io = IO::Handle->new;
            $io->fdopen(fileno(STDERR), "w");
            $io;
        }
    );
    $log->add( $stderr );
    $self->log( $log );
}

sub _poe_start
{
    my ($self, $kernel, $session) = @_[OBJECT, KERNEL, SESSION, ARG0];
    if (my $alias = $self->alias) {
        $self->log->debug("Setting alias '$alias'");
        $kernel->alias_set($alias);
    }

    # Create a local queue
    my $local_queue = Swarmage::Queue::Local->new(unlink => 1);
    $self->local_queue( $local_queue );

    # Create external queue
    my $extern_queue = $kernel->call($session, 'spawn_queue', $self->config->{queue}) or die;
    $self->queue( $extern_queue );

    my @task_types;
    while (my ($task_type, $config) = each %{ $self->config->{workers} }) {
        if (ref $config ne 'ARRAY') {
            $config = [ $config ];
        }

        foreach my $conf (@$config) {
            $self->register_worker( $task_type, $conf );
        }
        push @task_types, $task_type;
    }
    $kernel->yield('monitor');
}

sub register_worker
{
    my ($self, $task_type, $config) = @_;

    # XXX - Fix calling syntax for task_type
    my $worker = Swarmage::Worker->new(
        %$config,
        queue     => $self->local_queue,
        task_type => $task_type,
        parent    => $self
    );
    push @{$self->workers}, $worker;

    $self->task_types->{ $worker->task_type } ||= [];
    push @{ $self->task_types->{ $worker->task_type } }, $worker;

    # The Drone can poll up to N number of jobs per worker. 
    # each worker is responsible for 1 task type, so we don't need to poll
    # task types for which the worker has been saturated.
    # To ease that process, we calculate the number of max jobs per
    # task here

    my $max_per_worker = 15;
    $self->max_tasks->{ $worker->task_type } ||= 0;
    $self->max_tasks->{ $worker->task_type } += 15;

    $worker->register_event('work_done', $self, { method => 'mark_worker_done' });
}

sub mark_worker_done
{
    my ($self, $event, $task) = @_;

    $self->buffered_tasks->{ $task->type }--;
    $self->queue->dequeue( $task->id );

    # If a worker is reported to be done, then there should be an empty
    # slot in the worker pool. pump the workers!

    if (! $self->{worker_pump_pending}{ $task->type }++) {
        $poe_kernel->yield('pump_worker_queue', $task->type);
    }
}

sub _poe_pump_worker_queue
{
    my ($self, $kernel, $task_type) = @_[ OBJECT, KERNEL, ARG0 ];

    delete $self->{worker_pump_pending}{ $task_type };

    my $workers = $self->task_types->{ $task_type };
    foreach my $worker (@$workers) {
        $poe_kernel->post($worker->session_id, 'pump_queue') or die;
    }
}

sub postback
{
    my $self = shift;
    $self->queue->enqueue($_[0]);
}

sub _poe_monitor
{
    my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];

    $self->monitor();
    $kernel->delay_set('monitor', $self->delay);
}

sub monitor
{
    my $self = shift;
    $self->pump_queue;
}

sub _poe_pump_queue
{
    my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];

    delete $heap->{pump_pending};
    $self->pump_queue();
}

sub pump_queue
{
    my $self = shift;
    my $buffered_tasks = $self->buffered_tasks;
    my $max_tasks = $self->max_tasks;
    foreach my $task_type (keys %{ $max_tasks }) {
        my $diff = $max_tasks->{$task_type} - ($buffered_tasks->{$task_type} || 0);
        if ($diff > 0) {
            $self->log->debug("[DRONE] PUMP: $task_type");
            $self->queue->pump(
                event      => 'buffer_work',
                task_types => [ $task_type ],
                limit      => $diff,
            );
        }
    }
}

sub _poe_buffer_work
{
    my ($self, $kernel, $heap, $ref) = @_[OBJECT, KERNEL, HEAP, ARG0];

    $self->log->debug("[DRONE] PUMPED " . scalar(@{ $ref->{result} || [] }) . " tasks");
    my %task_types;
    foreach my $task (@{ $ref->{result} || [] }) {
        warn "?! Weird task received ?!" and next unless $task;
        
        $self->buffered_tasks->{$task->type}++;
        $self->local_queue->enqueue($task);
        $task_types{ $task->type }++;
    }

    # Grrrrr, this is so inefficient
    foreach my $task_type (keys %task_types) {
        foreach my $worker (@{ $self->workers }) {
            next unless $worker->task_type eq $task_type;
            $kernel->post($worker->session_id, 'pump_queue') or die;
        }
    }
}

sub _poe_spawn_queue
{
    my ($self, $kernel, $session, $heap, $config) = @_[OBJECT, KERNEL, SESSION, HEAP, ARG0];

    my $module    = delete $config->{module} || 'DBI';
    my $queue_pkg = Swarmage::Util::load_module(
        $module,
        'Swarmage::Queue'
    );

    # If it's async on its own, then it should be able to handle
    # things on its own
    $self->log->debug("Setting up queue $queue_pkg");
    my $queue;
    if ( $queue_pkg->is_async() ) {
        $queue = $queue_pkg->new(
            %{ $config->{config} },
            log => $self->log
        );
    } else {
        $queue = Swarmage::Queue::Generic->new(
            %{ $config->{config} || {} },
            class   => $queue_pkg,
            verbose => 1,
            log     => $self->log,
        );
    }
    return $queue;
}

1;

__END__

=head1 NAME

Swarmage::Drone - The Drone

=head1 SYNOPSIS

  use Swarmage;
  Swarmage::Drone->new(
    queues => [
      {
        module => 'DBI::Generic',
        config => {
          connect_info => [
            'dbi:Pg:dbname=swarmage',
            $username,
            $password,
            ...
          ],
          taks_types   => [ qw(foo bar) ],
        },
      }
    ],
    workers => {
        foo => {
            module => '+MyWorker',
            config => {
                ....
            }
        },
        # multiple workers
        bar => [
            {
                module => '+MyWorker2',
                config => {
                    ...
                }
            },
            {
                module => '+MyWorker2',
                config => {
                    ...
                }
            },
        ]
    }
  );
  POE::Kernel->run();

=head1 DESCRIPTION

The Drone is responsible for retrieving jobs from the system queue, then 
launching, and keeping track of workers.  It's assumed that a Drone may contain multiple types of Workers, and that these Workers coordinate with each other,
thus forming a small cluster of processes that implement a particular logic set.

The Drone uses two sets of queues. One is the Global Queue, which is the queue
that is shared amongst Drones. The other is the Local Queue, which is shared
between one Drone and one or more Workers belonging to that Drone.

The Global Queue may be implemented in terms of a database, a message queue, 
or whatever that handles its own non-blocking logic to check for incoming tasks.
The Local Queue is implemeted with a simple SQLite database.

When an incoming task is notified by the Queue, the Drone checks the job's
type, and dispatches to the appropriate Worker.

=head1 METHODS

=head2 new

=head2 postback

=head2 register_worker

=head2 mark_worker_done

=head2 setup_log

=head2 pump_queue

=head2 monitor

=cut