# $Id: /mirror/coderepos/lang/perl/Queue-Q4M/trunk/lib/Queue/Q4M.pm 103794 2009-04-13T11:38:30.159603Z daisuke $
#
# Copyright (c) 2008 Daisuke Maki <daisuke@endeworks.jp>
# All rights reserved.
package Queue::Q4M;
use Any::Moose;
use Any::Moose '::Util::TypeConstraints';
use Carp();
use DBI;
use SQL::Abstract;
use Queue::Q4M::Status;
class_type 'Queue::Q4M::Result';
has 'auto_reconnect' => (
is => 'rw',
isa => 'Bool',
required => 1,
default => 1,
);
has 'owner_mode' => (
is => 'rw',
isa => 'Bool',
default => 0
);
has '_connect_pid' => (
is => 'rw',
isa => 'Int'
);
has 'connect_info' => (
is => 'rw',
isa => 'ArrayRef',
required => 1,
);
has 'sql_maker' => (
is => 'rw',
isa => 'SQL::Abstract',
required => 1,
default => sub { SQL::Abstract->new }
);
has '_dbh' => (
is => 'rw',
);
has '__table' => (
is => 'rw',
);
has '__res' => (
is => 'rw',
# isa => 'Maybe[Queue::Q4M::Result]'
);
__PACKAGE__->meta->make_immutable;
no Any::Moose;
no Any::Moose '::Util::TypeConstraints';
our $AUTHORITY = 'cpan:DMAKI';
our $VERSION = '0.00019';
use constant Q4M_MINIMUM_VERSION => '0.8';
sub connect
{
my $self = shift;
if (! ref $self) {
$self = $self->new(@_);
}
if (my $old = $self->_dbh()) {
$old->disconnect();
}
my $dbh = $self->_connect();
$self->_dbh( $dbh );
# Make sure we have the minimum supported API version
# (or, a Q4M enabled mysql, for that matter)
my $version;
eval {
my $sth = $dbh->prepare(<<' EOSQL');
SELECT PLUGIN_VERSION from
information_schema.plugins
WHERE plugin_name = ?
EOSQL
$sth->execute('QUEUE');
$sth->bind_columns(\$version);
$sth->fetchrow_arrayref;
$sth->finish;
};
warn if $@;
if (! $version || $version < Q4M_MINIMUM_VERSION) {
Carp::confess( "Connected database does not meet the minimum required q4m version (" . Q4M_MINIMUM_VERSION . "). Got version " . (defined $version ? $version : '(undef)' ) );
}
$self;
}
sub _connect
{
my $self = shift;
return DBI->connect(@{ $self->connect_info });
}
sub dbh
{
my $self = shift;
my $dbh = $self->_dbh;
my $pid = $self->_connect_pid;
if ( ($pid || '') ne $$ || ! $dbh || ! $dbh->ping) {
$self->auto_reconnect or die "not connect";
$dbh = $self->_connect();
$self->_dbh( $dbh );
$self->_connect_pid($$);
}
return $dbh;
}
sub next
{
my $self = shift;
my @args = @_;
# First, undef any cached table name that we might have had
$self->__table(undef);
my @tables =
grep { !/^\d+$/ }
map {
(my $v = $_) =~ s/:.*$//;
$v
}
@args
;
# Cache this statement handler so we don't unnecessarily create
# string or handles
my $dbh = $self->dbh;
my $sql = sprintf(
"SELECT queue_wait(%s)",
join(',', (('?') x scalar(@args)))
);
my ($index) = $dbh->selectrow_array($sql, undef, @args);
my $table = defined $index && $index > 0 ? $tables[$index - 1] : undef;
my $res = Queue::Q4M::Result->new(
rv => defined $table,
table => $table,
on_release => sub { $self->__table(undef) }
);
if (defined $table) {
$self->__table($table);
}
$self->__res($res) if $res;
$self->owner_mode(1);
return $res;
}
*fetch = \&fetch_array;
BEGIN
{
foreach my $type qw(array arrayref hashref) {
eval sprintf( <<'EOSUB', $type, $type );
sub fetch_%s {
my $self = shift;
my $table = shift;
$table ||= $self->__table;
if (Scalar::Util::blessed $table &&
$table->isa('Queue::Q4M::Result'))
{
$table = $table->[1];
}
$table or die "no table";
my ($sql, @bind) = $self->sql_maker->select($table, @_);
my $dbh = $self->dbh;
$self->owner_mode(0);
return $dbh->selectrow_%s($sql, undef, @bind);
}
EOSUB
die if $@;
}
}
sub insert
{
my $self = shift;
my $table = shift;
my ($sql, @bind) = $self->sql_maker->insert($table, @_);
my $dbh = $self->dbh;
my $sth = $dbh->prepare($sql);
my $rv = $sth->execute(@bind);
$sth->finish;
return $rv;
}
sub disconnect
{
my $self = shift;
my $dbh = $self->dbh;
if ($dbh) {
$dbh->do("select queue_end()");
$dbh->disconnect;
$self->_dbh(undef);
}
}
sub clear
{
my ($self, $table) = @_;
return $self->dbh->do("DELETE FROM $table");
}
sub status {
return Queue::Q4M::Status->fetch( shift->dbh );
}
sub DEMOLISH
{
my $self = shift;
local $@;
eval {
$self->dbh->do("SELECT queue_abort()") if $self->owner_mode;
$self->disconnect;
};
}
package
Queue::Q4M::Result;
use overload
bool => \&as_bool,
'""' => \&as_string,
fallback => 1
;
use Scope::Guard;
sub new
{
my $class = shift;
my %args = @_;
return bless [ $args{rv}, $args{table}, Scope::Guard->new( $args{on_release} ) ], $class;
}
sub as_bool { $_[0]->[0] }
sub as_string { $_[0]->[1] }
sub DESTROY { $_[0]->[2]->dismiss(1) if $_[0]->[2] }
1;
__END__
=head1 NAME
Queue::Q4M - Simple Interface To q4m
=head1 SYNOPSIS
use Queue::Q4M;
my $q = Queue::Q4M->connect(
connect_info => [
'dbi:mysql:dbname=mydb',
$username,
$password
],
);
for (1..10) {
$q->insert($table, \%fieldvals);
}
while ($q->next($table)) {
my ($col1, $col2, $col3) = $q->fetch($table, \@fields);
print "col1 = $col1, col2 = $col2, col3 = $col3\n";
}
while ($q->next($table)) {
my $cols = $q->fetch_arrayref($table, \@fields);
print "col1 = $cols->[0], col2 = $cols->[1], col3 = $cols->[2]\n";
}
while ($q->next($table)) {
my $cols = $q->fetch_hashref($table, \@fields);
print "col1 = $cols->{col1}, col2 = $cols->{col2}, col3 = $cols->{col3}\n";
}
# to use queue_wait(table_cond1,table_cond2,timeout)
while (my $which = $q->next(@table_conds)) {
# $which contains the table name
}
$q->disconnect;
=head1 DESCRIPTION
Queue::Q4M is a simple wrapper to q4m, which is an implementation of a queue
using mysql.
=head1 METHODS
=head2 new
Creates a new Queue::Q4M instance. Normally you should use connect() instead.
=head2 connect
Connects to the target database.
my $q = Queue::Q4M->connect(
connect_info => [
'dbi:mysql:dbname=q4m',
]
);
=head2 next($table_cond1[, $table_cond2, $table_cond3, ..., $timeout])
Blocks until the next item is available. This is equivalent to calling
queue_wait() on the given table.
my $which = $q->next( $table_cond1, $table_cond2, $table_cond3 );
=head2 fetch
=head2 fetch_array
Fetches the next available row. Takes a table name and the list of columns to be fetched.
my ($col1, $col2, $col3) = $q->fetch( $table, [ qw(col1 col2 col3) ] );
=head2 fetch_arrayref
Same as fetch_array, but fetches using fetchrow_arrayref()
my $arrayref = $q->fetch_arrayref( $table, [ qw(col1 col2 col3) ] );
=head2 fetch_hashref
Same as fetch_array, but fetches using fetchrow_hashref()
my $hashref = $q->fetch_hashref( $table, [ qw(col1 col2 col3) ] );
=head2 insert($table, \%field)
Inserts into the queue. The first argument should be a scalar specifying
a table name. The second argument is a hashref that specifies the mapping
between column names and their respective values.
$q->insert($table, { col1 => $val1, col2 => $val2, col3 => $val3 });
For backwards compatibility, you may omit $table if you specified $table
in the constructor.
=head2 clear($table)
Deletes everything the specified queue. Be careful!
=head2 status()
Returns an instance of Queue::Q4M::Status (actually, a subclass there of).
=head2 dbh
Returns the database handle after making sure that it's connected.
=head2 disconnect
Disconnects.
=head2 BUILD
=head2 DEMOLISH
These are defined as part of Moose infrastructure
=head2 Q4M_MINIMUM_VERSION
The minimum version of q4m that Queue::Q4M supports
=head1 AUTHOR
Copyright (c) 2008 Daisuke Maki E<lt>daisuke@endeworks.jpE<gt>
=head1 CONTRIBUTOR
Taro Funaki
=head1 LICENSE
This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.
See http://www.perl.com/perl/misc/Artistic.html
=cut