The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# $Id: /mirror/perl/Swarmage/trunk/lib/Swarmage/Queue/DBI.pm 38128 2008-01-07T04:52:02.712309Z daisuke  $
#
# Copyright (c) 2007-2008 Daisuke Maki <daisuke@endeworks.jp>
# All right reserved.

package Swarmage::Queue::DBI;
use strict;
use warnings;
use base qw(Swarmage::Queue Class::Data::Inheritable);
use DBI;
use Time::HiRes();

__PACKAGE__->mk_accessors($_) for qw(connect_info dbh);
__PACKAGE__->mk_classdata(table_name => 'queues');

sub new
{
    my $class = shift;
    my %args  = @_;
    my $self  = bless { connect_info => $args{connect_info} }, $class;
    $self->prepare_db();
    return $self;
}

sub prepare_db
{
    my $self = shift;
# warn "Connecting to " . $self->connect_info->[0];
    my $dbh = DBI->connect(@{ $self->connect_info });
    $self->dbh( $dbh );
}

sub enqueue
{
    my $self = shift;
    my $task = shift;
    my $dbh  = $self->dbh;

    eval {
        my $table = $self->table_name;
# warn "$self enqueue $table";
        my $sth = $dbh->prepare(<<"        EOSQL");
            INSERT INTO $table
                (id, task_type, task_data, modified_on, inserted_on)
            VALUES
                (?, ?, ?, ?, ?)
        EOSQL
        my $rv = $sth->execute($task->id, $task->type, $task->serialize, Time::HiRes::time(), Time::HiRes::time());
        if (! $rv) {
            die "Could not insert task: $@";
        }
        $sth->finish;
    };
    warn if $@;
}

sub dequeue
{
    my $self = shift;
    my $id   = shift;
    my $dbh  = $self->dbh;

    my $table = $self->table_name;
    my $sth = $dbh->prepare_cached(<<"    EOSQL");
        DELETE FROM $table WHERE id = ?
    EOSQL
    $sth->execute($id);
    $sth->finish;
}

sub pump
{
    my $self       = shift;
    my %args       = @_;
    my $task_types = $args{task_types};
    my $limit      = $args{limit} || 10;

    my @tasks;
    eval {
        my $where  = sprintf(
            'taken_by is NULL AND task_type IN (%s)',
            join(', ', ('?') x scalar(@$task_types))
        );
        my $dbh = $self->dbh;

        my $table = $self->table_name;
        my $select_sth = $dbh->prepare(<<"        EOSQL");
            SELECT id, task_data, modified_on
            FROM $table
            WHERE $where
            ORDER BY inserted_on ASC
            LIMIT $limit
        EOSQL
        my $update_sth = $dbh->prepare_cached(<<"        EOSQL");
            UPDATE $table
            SET taken_by = ?,
                taken_on = ?,
                modified_on = ?
            WHERE
                id = ? AND modified_on = ?
        EOSQL

        if ( $dbh->{Driver}->{Name} =~ /^sqlite$/) {
            $self->_fetch_sqlite(\@tasks, $select_sth, $task_types, $update_sth);
        } else {
            $self->_fetch_other(\@tasks, $select_sth, $task_types, $update_sth);
        }
        $select_sth->finish;
        $update_sth->finish;
    };
# warn "$self Polling resulted in " . scalar(@tasks) . " tasks";

    return @tasks;
}

sub _fetch_sqlite
{
    my ($self, $tasks, $select_sth, $task_types, $update_sth) = @_;

    my @candidates;
    $select_sth->execute(@$task_types);

    my ($id, $task_data, $modified_on);
    $select_sth->bind_columns(\($id, $task_data, $modified_on));
    while ($select_sth->fetchrow_arrayref) {
        push @candidates, [ $id, $task_data, $modified_on ];
    }
    $select_sth->finish;

    foreach my $data (@candidates) {
        my $now = Time::HiRes::time();
        if ($update_sth->execute($$, $now, $now, $id, $modified_on) > 0) {
            my $task = Swarmage::Task->deserialize($task_data);
            push @$tasks, $task if $task;
        }
        $update_sth->finish;
    }
}

sub _fetch_other
{
    my ($self, $tasks, $select_sth, $task_types, $update_sth) = @_;

    $select_sth->execute(@$task_types);

    my ($id, $task_data, $modified_on);
    $select_sth->bind_columns(\($id, $task_data, $modified_on));
    while ($select_sth->fetchrow_arrayref) {
        my $now = Time::HiRes::time();
        if ($update_sth->execute($$, $now, $now, $id, $modified_on) > 0) {
            my $task = Swarmage::Task->deserialize($task_data);
            push @$tasks, $task if $task;
        }
    }
}

1;

__END__

=head1 NAME

Swarmage::Queue::DBI - DBI Based Queue For Swarmage

=head1 SYNOPSIS

  # To use from POE:
  use Swarmage::Queue::DBI;
  use Swarmage::Queue::DBI::Generic;

  my $queue = Swarmage::Queue::DBI::Generic->new(
    connect_info => [
      'dbi:Pg:dbname=swarmage',
      $username,
      $password,
      { RaiseError => 1, AutoCommit => 1 }
    ]
  );

=head1 METHODS

=head2 new

=head2 enqueue

=head2 dequeue

=head2 poll_wait

=head2 prepare_db

=head2 pump

=cut