The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# $Id: /mirror/perl/Swarmage/trunk/lib/Swarmage/ 39735 2008-01-23T02:57:57.044329Z daisuke  $
# Copyright (c) 2007-2008 Daisuke Maki <>
# All rights reserved.

package Swarmage::Drone;
use strict;
use warnings;
use base qw(Class::Accessor::Fast);
use DBI;
use Log::Dispatch;
use Log::Dispatch::Handle;
use IO::Handle;
use POE ;
use Swarmage::Queue::Generic;
use Swarmage::Queue::Local;
use Swarmage::Task;
use Swarmage::Util;
use Swarmage::Worker;

use constant DEBUG => 0;

__PACKAGE__->mk_accessors($_) for qw(config alias delay log queue local_queue workers max_tasks buffered_tasks task_types);

sub new
    my $class = shift;
    my %args  = @_;

    my $alias = $args{alias} || do {
        require Sys::Hostname;
        join('-', Sys::Hostname::hostname(), $$, rand())
    } || die;

    my $self = bless {
        config         => \%args,
        alias          => $alias,
        workers        => [],
        max_tasks      => {},
        buffered_tasks => {},
        task_types     => {},
        delay          => 2,
    }, $class;

        heap => {
            shutdown => 0,
        object_states => [
            $self, {
                _start => '_poe_start',
                map { ($_ => "_poe_$_") } qw(
    return $self;

sub setup_log
    my $self = shift;
    my $log = Log::Dispatch->new(
        callbacks => sub {
            my %args = @_;
            my $message = $args{message};
            $message =~ s/(?!\n)\Z/\n/;
            $message = "[$args{level}:$$]: $message";
            return $message;
    my $stderr = Log::Dispatch::Handle->new(
        name      => 'stderr',
        min_level => DEBUG ? 'debug' : 'info',
        handle    => do {
            my $io = IO::Handle->new;
            $io->fdopen(fileno(STDERR), "w");
    $log->add( $stderr );
    $self->log( $log );

sub _poe_start
    my ($self, $kernel, $session) = @_[OBJECT, KERNEL, SESSION, ARG0];
    if (my $alias = $self->alias) {
        $self->log->debug("Setting alias '$alias'");

    # Create a local queue
    my $local_queue = Swarmage::Queue::Local->new(unlink => 1);
    $self->local_queue( $local_queue );

    # Create external queue
    my $extern_queue = $kernel->call($session, 'spawn_queue', $self->config->{queue}) or die;
    $self->queue( $extern_queue );

    my @task_types;
    while (my ($task_type, $config) = each %{ $self->config->{workers} }) {
        if (ref $config ne 'ARRAY') {
            $config = [ $config ];

        foreach my $conf (@$config) {
            $self->register_worker( $task_type, $conf );
        push @task_types, $task_type;

sub register_worker
    my ($self, $task_type, $config) = @_;

    # XXX - Fix calling syntax for task_type
    my $worker = Swarmage::Worker->new(
        queue     => $self->local_queue,
        task_type => $task_type,
        parent    => $self
    push @{$self->workers}, $worker;

    $self->task_types->{ $worker->task_type } ||= [];
    push @{ $self->task_types->{ $worker->task_type } }, $worker;

    # The Drone can poll up to N number of jobs per worker. 
    # each worker is responsible for 1 task type, so we don't need to poll
    # task types for which the worker has been saturated.
    # To ease that process, we calculate the number of max jobs per
    # task here

    my $max_per_worker = 15;
    $self->max_tasks->{ $worker->task_type } ||= 0;
    $self->max_tasks->{ $worker->task_type } += 15;

    $worker->register_event('work_done', $self, { method => 'mark_worker_done' });

sub mark_worker_done
    my ($self, $event, $task) = @_;

    $self->buffered_tasks->{ $task->type }--;
    $self->queue->dequeue( $task->id );

    # If a worker is reported to be done, then there should be an empty
    # slot in the worker pool. pump the workers!

    if (! $self->{worker_pump_pending}{ $task->type }++) {
        $poe_kernel->yield('pump_worker_queue', $task->type);

sub _poe_pump_worker_queue
    my ($self, $kernel, $task_type) = @_[ OBJECT, KERNEL, ARG0 ];

    delete $self->{worker_pump_pending}{ $task_type };

    my $workers = $self->task_types->{ $task_type };
    foreach my $worker (@$workers) {
        $poe_kernel->post($worker->session_id, 'pump_queue') or die;

sub postback
    my $self = shift;

sub _poe_monitor
    my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];

    $kernel->delay_set('monitor', $self->delay);

sub monitor
    my $self = shift;

sub _poe_pump_queue
    my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];

    delete $heap->{pump_pending};

sub pump_queue
    my $self = shift;
    my $buffered_tasks = $self->buffered_tasks;
    my $max_tasks = $self->max_tasks;
    foreach my $task_type (keys %{ $max_tasks }) {
        my $diff = $max_tasks->{$task_type} - ($buffered_tasks->{$task_type} || 0);
        if ($diff > 0) {
            $self->log->debug("[DRONE] PUMP: $task_type");
                event      => 'buffer_work',
                task_types => [ $task_type ],
                limit      => $diff,

sub _poe_buffer_work
    my ($self, $kernel, $heap, $ref) = @_[OBJECT, KERNEL, HEAP, ARG0];

    $self->log->debug("[DRONE] PUMPED " . scalar(@{ $ref->{result} || [] }) . " tasks");
    my %task_types;
    foreach my $task (@{ $ref->{result} || [] }) {
        warn "?! Weird task received ?!" and next unless $task;
        $task_types{ $task->type }++;

    # Grrrrr, this is so inefficient
    foreach my $task_type (keys %task_types) {
        foreach my $worker (@{ $self->workers }) {
            next unless $worker->task_type eq $task_type;
            $kernel->post($worker->session_id, 'pump_queue') or die;

sub _poe_spawn_queue
    my ($self, $kernel, $session, $heap, $config) = @_[OBJECT, KERNEL, SESSION, HEAP, ARG0];

    my $module    = delete $config->{module} || 'DBI';
    my $queue_pkg = Swarmage::Util::load_module(

    # If it's async on its own, then it should be able to handle
    # things on its own
    $self->log->debug("Setting up queue $queue_pkg");
    my $queue;
    if ( $queue_pkg->is_async() ) {
        $queue = $queue_pkg->new(
            %{ $config->{config} },
            log => $self->log
    } else {
        $queue = Swarmage::Queue::Generic->new(
            %{ $config->{config} || {} },
            class   => $queue_pkg,
            verbose => 1,
            log     => $self->log,
    return $queue;



=head1 NAME

Swarmage::Drone - The Drone


  use Swarmage;
    queues => [
        module => 'DBI::Generic',
        config => {
          connect_info => [
          taks_types   => [ qw(foo bar) ],
    workers => {
        foo => {
            module => '+MyWorker',
            config => {
        # multiple workers
        bar => [
                module => '+MyWorker2',
                config => {
                module => '+MyWorker2',
                config => {


The Drone is responsible for retrieving jobs from the system queue, then 
launching, and keeping track of workers.  It's assumed that a Drone may contain multiple types of Workers, and that these Workers coordinate with each other,
thus forming a small cluster of processes that implement a particular logic set.

The Drone uses two sets of queues. One is the Global Queue, which is the queue
that is shared amongst Drones. The other is the Local Queue, which is shared
between one Drone and one or more Workers belonging to that Drone.

The Global Queue may be implemented in terms of a database, a message queue, 
or whatever that handles its own non-blocking logic to check for incoming tasks.
The Local Queue is implemeted with a simple SQLite database.

When an incoming task is notified by the Queue, the Drone checks the job's
type, and dispatches to the appropriate Worker.

=head1 METHODS

=head2 new

=head2 postback

=head2 register_worker

=head2 mark_worker_done

=head2 setup_log

=head2 pump_queue

=head2 monitor
