package TheSchwartz::Moosified;
use Moose;
use Moose::Util::TypeConstraints;
use Carp;
use Scalar::Util qw( refaddr );
use List::Util qw( shuffle );
use File::Spec ();
use Storable ();
use TheSchwartz::Moosified::Utils qw/insert_id sql_for_unixtime bind_param_attr run_in_txn order_by_priority/;
use TheSchwartz::Moosified::Job;
use TheSchwartz::Moosified::JobHandle;
our $VERSION = '0.07'; # when bumping, also update t/00-load.t
our $AUTHORITY = 'cpan:FAYLAND';
## Number of jobs to fetch at a time in find_job_for_workers.
our $FIND_JOB_BATCH_SIZE = 50;
# subtype-s
my $sub_verbose = sub {
my $msg = shift;
$msg =~ s/\s+$//;
print STDERR "$msg\n";
};
subtype 'TheSchwartz.Moosified.Verbose'
=> as 'CodeRef'
=> where { 1; };
coerce 'TheSchwartz.Moosified.Verbose'
=> from 'Int'
=> via {
if ($_) {
return $sub_verbose;
} else {
return sub { 0 };
}
};
has 'verbose' => ( is => 'rw', isa => 'TheSchwartz.Moosified.Verbose', coerce => 1, default => 0 );
has 'prioritize' => ( is => 'rw', isa => 'Bool', default => 0 );
has 'retry_seconds' => (is => 'rw', isa => 'Int', default => 30);
has 'retry_at' => ( is => 'rw', isa => 'HashRef', default => sub { {} } );
has 'databases' => (
is => 'rw',
isa => 'ArrayRef',
default => sub { [] },
);
has 'all_abilities' => ( is => 'rw', isa => 'ArrayRef', default => sub { [] } );
has 'current_abilities' => ( is => 'rw', isa => 'ArrayRef', default => sub { [] } );
has 'current_job' => ( is => 'rw', isa => 'Object' );
has 'funcmap_cache' => ( is => 'rw', isa => 'HashRef', default => sub { {} } );
has 'scoreboard' => (
is => 'rw',
isa => 'Str',
trigger => \&_trigger_scoreboard,
);
has 'prefix' => ( is => 'rw', isa => 'Str', default => '' );
has 'error_length' => ( is => 'rw', isa => 'Int', default => 255 );
sub debug {
my $self = shift;
return unless $self->verbose;
$self->verbose->(@_);
}
sub shuffled_databases {
my $self = shift;
return shuffle( @{ $self->databases } );
}
sub _try_insert {
my $self = shift;
my $job = shift;
my $dbh = shift;
$job->funcid( $self->funcname_to_id( $dbh, $job->funcname ) );
run_in_txn {
$job->insert_time(time());
my $row = $job->as_hashref;
if ($dbh->{Driver}{Name} && $dbh->{Driver}{Name} eq 'Pg') {
delete $row->{jobid};
}
my @col = keys %$row;
my $table_job = $self->prefix . 'job';
my $sql = sprintf 'INSERT INTO %s (%s) VALUES (%s)',
$table_job, join( ", ", @col ), join( ", ", ("?") x @col );
my $sth = $dbh->prepare_cached($sql);
my $i = 1;
for my $col (@col) {
$sth->bind_param(
$i++,
$row->{$col},
bind_param_attr( $dbh, $col ),
);
}
$sth->execute();
my $jobid = insert_id( $dbh, $sth, $table_job, "jobid" );
$job->jobid($jobid);
} $dbh;
}
sub insert {
my $self = shift;
my $job;
if ( ref $_[0] eq 'TheSchwartz::Moosified::Job' ) {
$job = $_[0];
}
else {
$job = TheSchwartz::Moosified::Job->new(funcname => $_[0], arg => $_[1]);
}
$job->arg( Storable::nfreeze( $job->arg ) ) if ref $job->arg;
for my $dbh ( $self->shuffled_databases ) {
eval {
$self->_try_insert($job,$dbh);
};
$self->debug("insert failed: $@") if $@;
next unless $job->jobid;
## We inserted the job successfully!
## Attach a handle to the job, and return the handle.
my $handle = TheSchwartz::Moosified::JobHandle->new({
dbh => $dbh,
client => $self,
jobid => $job->jobid
});
$job->handle($handle);
return $handle;
}
return;
}
sub find_job_for_workers {
my $client = shift;
my ($worker_classes) = @_;
$worker_classes ||= $client->{current_abilities};
return unless (scalar @$worker_classes);
my $limit = $FIND_JOB_BATCH_SIZE;
for my $dbh ( $client->shuffled_databases ) {
my $unixtime = sql_for_unixtime($dbh);
my $order_by = $client->prioritize ? order_by_priority($dbh) : '';
my @jobs;
eval {
## Search for jobs in this database where:
## 1. funcname is in the list of abilities this $client supports;
## 2. the job is scheduled to be run (run_after is in the past);
## 3. no one else is working on the job (grabbed_until is in
## in the past).
my @ids = map { $client->funcname_to_id( $dbh, $_ ) }
@$worker_classes;
my $ids = join(',', @ids);
my $table_job = $client->prefix . 'job';
my $sql = qq~SELECT * FROM $table_job WHERE funcid IN ($ids) AND run_after <= $unixtime AND grabbed_until <= $unixtime $order_by LIMIT $limit~;
my $sth = $dbh->prepare_cached($sql);
$sth->execute();
while ( my $ref = $sth->fetchrow_hashref ) {
push @jobs, $ref;
}
$sth->finish;
};
# if ($@) {
#
# }
my $job = $client->_grab_a_job($dbh, @jobs);
return $job if $job;
}
}
sub get_server_time {
my ( $client, $dbh ) = @_;
my $unixtime_sql = sql_for_unixtime($dbh);
return $dbh->selectrow_array("SELECT $unixtime_sql");
}
sub _grab_a_job {
my ( $client, $dbh, @jobs ) = @_;
## Got some jobs! Randomize them to avoid contention between workers.
@jobs = shuffle(@jobs);
JOB:
while (my $ref = shift @jobs) {
my $job = TheSchwartz::Moosified::Job->new( $ref );
## Convert the funcid to a funcname, based on this database's map.
$job->funcname( $client->funcid_to_name($dbh, $job->funcid) );
## Update the job's grabbed_until column so that
## no one else takes it.
my $worker_class = $job->funcname;
my $old_grabbed_until = $job->grabbed_until;
my $server_time = $client->get_server_time($dbh)
or die "expected a server time";
my $new_grabbed_until = $server_time + ($worker_class->grab_for || 1);
# Prevent a condition that could result in more than one client
# grabbing the same job b/c it doesn't look grabbed
next JOB if ($new_grabbed_until == $old_grabbed_until);
## Update the job in the database, and end the transaction.
my $table_job = $client->prefix . 'job';
my $sql = qq~UPDATE $table_job SET grabbed_until = ? WHERE jobid = ? AND grabbed_until = ?~;
my $sth = $dbh->prepare($sql);
$sth->execute($new_grabbed_until, $job->jobid, $old_grabbed_until);
my $rows = $sth->rows;
next JOB unless $rows == 1;
$job->grabbed_until( $new_grabbed_until );
## Now prepare the job, and return it.
my $handle = TheSchwartz::Moosified::JobHandle->new({
dbh => $dbh,
client => $client,
jobid => $job->jobid,
});
$job->handle($handle);
return $job;
}
return undef;
}
sub list_jobs {
my ( $self, $arg ) = @_;
die "No funcname" unless exists $arg->{funcname};
my @options;
push @options, {
key => 'run_after',
op => '<=',
value => $arg->{run_after}
} if exists $arg->{run_after};
push @options, {
key => 'grabbed_until',
op => '<=',
value => $arg->{grabbed_until}
} if exists $arg->{grabbed_until};
if ( $arg->{coalesce} ) {
$arg->{coalesce_op} ||= '=';
push @options, {
key => 'coalesce',
op => $arg->{coalesce_op},
value => $arg->{coalesce}
};
}
my $limit = $arg->{limit} || $FIND_JOB_BATCH_SIZE;
my @jobs;
for my $dbh ( $self->shuffled_databases ) {
my $order_by = $self->prioritize ? order_by_priority($dbh) : '';
eval {
my ($funcid, $funcop);
if ( ref($arg->{funcname}) ) { # ARRAYREF
$funcid = '(' . join(',', map { $self->funcname_to_id($dbh, $_) } @{$arg->{funcname}}) . ')';
$funcop = 'IN';
} else {
$funcid = $self->funcname_to_id($dbh, $arg->{funcname});
$funcop = '=';
}
my $table_job = $self->prefix . 'job';
my $sql = qq~SELECT * FROM $table_job WHERE funcid $funcop $funcid~;
my @value = ();
for (@options) {
$sql .= " AND $_->{key} $_->{op} ?";
push @value, $_->{value};
}
$sql .= qq~ $order_by LIMIT $limit~;
my $sth = $dbh->prepare_cached($sql);
$sth->execute(@value);
while ( my $ref = $sth->fetchrow_hashref ) {
$ref->{funcname} = $self->funcid_to_name($dbh, $ref->{funcid});
my $job = TheSchwartz::Moosified::Job->new( $ref );
if ($arg->{want_handle}) {
my $handle = TheSchwartz::Moosified::JobHandle->new({
dbh => $dbh,
client => $self,
jobid => $job->jobid
});
$job->handle($handle);
}
push @jobs, $job;
}
$sth->finish;
};
}
return @jobs;
}
sub find_job_with_coalescing_prefix {
my $client = shift;
my ($funcname, $coval) = @_;
$coval .= "%";
return $client->_find_job_with_coalescing('LIKE', $funcname, $coval);
}
sub find_job_with_coalescing_value {
my $client = shift;
return $client->_find_job_with_coalescing('=', @_);
}
sub _find_job_with_coalescing {
my $client = shift;
my ($op, $funcname, $coval) = @_;
my $limit = $FIND_JOB_BATCH_SIZE;
my $order_by = $client->prioritize ? 'ORDER BY priority DESC' : '';
for my $dbh ( $client->shuffled_databases ) {
my $unixtime = sql_for_unixtime($dbh);
my @jobs;
eval {
## Search for jobs in this database where:
## 1. funcname is in the list of abilities this $client supports;
## 2. the job is scheduled to be run (run_after is in the past);
## 3. no one else is working on the job (grabbed_until is in
## in the past).
my $funcid = $client->funcname_to_id($dbh, $funcname);
my $table_job = $client->prefix . 'job';
my $sql = qq~SELECT * FROM $table_job WHERE funcid = ? AND run_after <= $unixtime AND grabbed_until <= $unixtime AND coalesce $op ? $order_by LIMIT $limit~;
my $sth = $dbh->prepare_cached($sql);
$sth->execute( $funcid, $coval );
while ( my $ref = $sth->fetchrow_hashref ) {
$ref->{funcname} = $funcname;
push @jobs, $ref;
}
$sth->finish;
};
# if ($@) {
#
# }
my $job = $client->_grab_a_job($dbh, @jobs);
return $job if $job;
}
}
sub can_do {
my $client = shift;
my ($class) = @_;
push @{ $client->{all_abilities} }, $class;
push @{ $client->{current_abilities} }, $class;
}
sub reset_abilities {
my $client = shift;
$client->{all_abilities} = [];
$client->{current_abilities} = [];
}
sub restore_full_abilities {
my $client = shift;
$client->{current_abilities} = [ @{ $client->{all_abilities} } ];
}
sub temporarily_remove_ability {
my $client = shift;
my ($class) = @_;
$client->{current_abilities} = [
grep { $_ ne $class } @{ $client->{current_abilities} }
];
if (!@{ $client->{current_abilities} }) {
$client->restore_full_abilities;
}
}
sub work {
my $client = shift;
my ($delay) = @_;
$delay ||= 5;
while (1) {
sleep $delay unless $client->work_once;
}
}
sub work_until_done {
my $client = shift;
while (1) {
$client->work_once or last;
}
}
## Returns true if it did something, false if no jobs were found
sub work_once {
my $client = shift;
my $job = shift; # optional specific job to work on
## Look for a job with our current set of abilities. Note that the
## list of current abilities may not be equal to the full set of
## abilities, to allow for even distribution between jobs.
$job ||= $client->find_job_for_workers;
## If we didn't find anything, restore our full abilities, and try
## again.
if (!$job &&
@{ $client->{current_abilities} } < @{ $client->{all_abilities} }) {
$client->restore_full_abilities;
$job = $client->find_job_for_workers;
}
my $class = $job ? $job->funcname : undef;
if ($job) {
my $priority = $job->priority ? ", priority " . $job->priority : "";
$job->debug("TheSchwartz::work_once got job of class '$class'$priority");
} else {
$client->debug("TheSchwartz::work_once found no jobs");
}
## If we still don't have anything, return.
return unless $job;
## Now that we found a job for this particular funcname, remove it
## from our list of current abilities. So the next time we look for a
## we'll find a job for a different funcname. This prevents starvation of
## high funcid values because of the way MySQL's indexes work.
$client->temporarily_remove_ability($class);
$class->work_safely($job);
## We got a job, so return 1 so work_until_done (which calls this method)
## knows to keep looking for jobs.
return 1;
}
sub funcid_to_name {
my ( $client, $dbh, $funcid ) = @_;
my $cache = $client->_funcmap_cache($dbh);
return $cache->{funcid2name}{$funcid};
}
sub funcname_to_id {
my ( $self, $dbh, $funcname ) = @_;
my $dbid = refaddr $dbh;
my $cache = $self->_funcmap_cache($dbh);
my $table_funcmap = $self->prefix . 'funcmap';
unless ( exists $cache->{funcname2id}{$funcname} ) {
my $id;
eval {
run_in_txn {
## This might fail in a race condition since funcname is UNIQUE
my $sth = $dbh->prepare_cached(
"INSERT INTO $table_funcmap (funcname) VALUES (?)");
$sth->execute($funcname);
$id = insert_id( $dbh, $sth, $table_funcmap, "funcid" );
} $dbh;
};
## If we got an exception, try to load the record again
if ($@) {
my $sth = $dbh->prepare_cached(
"SELECT funcid FROM $table_funcmap WHERE funcname = ?");
$sth->execute($funcname);
($id) = $sth->fetchrow_array;
$sth->finish;
croak "Can't find or create funcname $funcname: $@" unless $id;
}
$cache->{funcname2id}{ $funcname } = $id;
$cache->{funcid2name}{ $id } = $funcname;
$self->funcmap_cache->{$dbid} = $cache;
}
$cache->{funcname2id}{$funcname};
}
sub _funcmap_cache {
my ( $client, $dbh ) = @_;
my $dbid = refaddr $dbh;
my $table_funcmap = $client->prefix . 'funcmap';
unless ( exists $client->funcmap_cache->{$dbid} ) {
my $cache = { funcname2id => {}, funcid2name => {} };
my $sth = $dbh->prepare_cached("SELECT funcid, funcname FROM $table_funcmap");
$sth->execute;
while ( my $row = $sth->fetchrow_arrayref ) {
$cache->{funcname2id}{ $row->[1] } = $row->[0];
$cache->{funcid2name}{ $row->[0] } = $row->[1];
}
$sth->finish;
$client->funcmap_cache->{$dbid} = $cache;
}
return $client->funcmap_cache->{$dbid};
}
sub _trigger_scoreboard {
my ($self, $dir) = @_;
return unless $dir;
return if (-f $dir); # no endless loop
# They want the scoreboard but don't care where it goes
if (($dir eq '1') or ($dir eq 'on')) {
$dir = File::Spec->tmpdir();
}
$self->{scoreboard} = $dir."/theschwartz.scoreboard.$$";
}
sub start_scoreboard {
my $client = shift;
# Don't do anything if we're not configured to write to the scoreboard
my $scoreboard = $client->scoreboard;
return unless $scoreboard;
# Don't do anything of (for some reason) we don't have a current job
my $job = $client->current_job;
return unless $job;
my $class = $job->funcname;
open(my $sb, '>', $scoreboard)
or $job->debug("Could not write scoreboard '$scoreboard': $!");
print $sb join("\n", ("pid=$$",
'funcname='.($class||''),
'started='.($job->grabbed_until-($class->grab_for||1)),
'arg='._serialize_args($job->arg),
)
), "\n";
close($sb);
return;
}
# Quick and dirty serializer. Don't use Data::Dumper because we don't need to
# recurse indefinitely and we want to truncate the output produced
sub _serialize_args {
my ($args) = @_;
if (ref $args) {
if (ref $args eq 'HASH') {
return join ',',
map { ($_||'').'='.substr($args->{$_}||'', 0, 200) }
keys %$args;
} elsif (ref $args eq 'ARRAY') {
return join ',',
map { substr($_||'', 0, 200) }
@$args;
}
} else {
return $args;
}
}
sub end_scoreboard {
my $client = shift;
# Don't do anything if we're not configured to write to the scoreboard
my $scoreboard = $client->scoreboard;
return unless $scoreboard;
my $job = $client->current_job;
open(my $sb, '>>', $scoreboard)
or $client->debug("Could not append scoreboard '$scoreboard': $!");
print $sb "done=".time."\n";
close($sb);
return;
}
sub clean_scoreboard {
my $client = shift;
# Don't do anything if we're not configured to write to the scoreboard
my $scoreboard = $client->scoreboard;
return unless $scoreboard;
unlink($scoreboard);
}
sub DEMOLISH {
foreach my $arg (@_) {
# Call 'clean_scoreboard' on TheSchwartz objects
if (ref($arg) and $arg->isa('TheSchwartz::Moosified')) {
$arg->clean_scoreboard;
}
}
}
no Moose;
no Moose::Util::TypeConstraints;
1; # End of TheSchwartz::Moosified
__END__
=head1 NAME
TheSchwartz::Moosified - TheSchwartz based on Moose!
=head1 SYNOPSIS
use TheSchwartz::Moosified;
my $client = TheSchwartz::Moosified->new();
$client->databases([$dbh]);
# rest are the same as TheSchwartz
# in some place we insert job into TheSchwartz::Moosified
# in another place we run this job
# 1, insert job in cgi/Catalyst
use TheSchwartz::Moosified;
my $client = TheSchwartz::Moosified->new();
$client->databases([$dbh]);
$client->insert('My::Worker::A', { args1 => 1, args2 => 2 } );
# 2, defined the heavy things in My::Worker::A
package My::Worker::A;
use base 'TheSchwartz::Moosified::Worker';
sub work {
my ($class, $job) = @_;
# $job is an instance of TheSchwartz::Moosified::Job
my $args = $job->args;
# do heavy things like resize photos, add 1 to 2 etc.
$job->completed;
}
# 3, run the worker in a non-stop script
use TheSchwartz::Moosified;
my $client = TheSchwartz::Moosified->new();
$client->databases([$dbh]);
$client->can_do('My::Worker::A');
$client->work();
=head1 DESCRIPTION
L<TheSchwartz> is a powerful job queue. This module is a Moose implemention.
read more on L<TheSchwartz>
=head1 SETTING
=over 4
=item * C<databases>
Databases containing TheSchwartz jobs, shuffled before each use.
my $dbh1 = DBI->conncet(@dbi_info);
my $dbh2 = $schema->storage->dbh;
my $client = TheSchwartz::Moosified->new( databases => [ $dbh1, $dbh2 ] );
# or
my $client = TheSchwartz::Moosified->new();
$client->databases( [ $dbh1, $dbh2 ] );
=item * C<verbose>
controls debug logging.
my $client = TheSchwartz::Moosified->new( verbose => 1 );
# or
my $client = TheSchwartz::Moosified->new();
$client->verbose( 1 );
$client->verbose( sub {
my $msg = shift;
print STDERR "[INFO] $msg\n";
} );
=item * C<prefix>
optional prefix for tables. compatible with L<TheSchwartz::Simple>
my $client = TheSchwartz::Moosified->new( prefix => 'theschwartz_' );
=item * C<scoreboard>
save job info to file. by default, the file will be saved at $tmpdir/theschwartz/scoreboard.$$
my $client = TheSchwartz::Moosified->new( scoreboard => 1 );
# or
my $client = TheSchwartz::Moosified->new();
# be sure the file is there
$client->scoreboard( "/home/fayland/theschwartz/scoreboard.log" );
=item * C<error_length>
optional, defaults to 255. Messages logged to the C<failure_log> (the
C<error> table) are truncated to this length. Setting this to zero means no
truncation (although the database you are using may truncate this for you).
=back
=head1 POSTING JOBS
The methods of TheSchwartz clients used by applications posting jobs to the
queue are:
=head2 C<$client-E<gt>insert( $job )>
Adds the given C<TheSchwartz::Job> to one of the client's job databases.
=head2 C<$client-E<gt>insert( $funcname, $arg )>
Adds a new job with funcname C<$funcname> and arguments C<$arg> to the queue.
=head1 WORKING
The methods of TheSchwartz clients for use in worker processes are:
=head2 C<$client-E<gt>can_do( $ability )>
Adds C<$ability> to the list of abilities C<$client> is capable of performing.
Subsequent calls to that client's C<work> methods will find jobs requiring the
given ability.
=head2 C<$client-E<gt>work_once()>
Find and perform one job C<$client> can do.
=head2 C<$client-E<gt>work_until_done()>
Find and perform jobs C<$client> can do until no more such jobs are found in
any of the client's job databases.
=head2 C<$client-E<gt>work( [$delay] )>
Find and perform any jobs C<$client> can do, forever. When no job is available,
the working process will sleep for C<$delay> seconds (or 5, if not specified)
before looking again.
=head2 C<$client-E<gt>find_job_for_workers( [$abilities] )>
Returns a C<TheSchwartz::Job> for a random job that the client can do. If
specified, the job returned matches one of the abilities in the arrayref
C<$abilities>, rather than C<$client>'s abilities.
=head2 C<$client-E<gt>find_job_with_coalescing_value( $ability, $coval )>
Returns a C<TheSchwartz::Job> for a random job for a worker capable of
C<$ability> and with a coalescing value of C<$coval>.
=head2 C<$client-E<gt>find_job_with_coalescing_prefix( $ability, $coval )>
Returns a C<TheSchwartz::Job> for a random job for a worker capable of
C<$ability> and with a coalescing value beginning with C<$coval>.
Note the C<TheSchwartz> implementation of this function uses a C<LIKE> query to
find matching jobs, with all the attendant performance implications for your
job databases.
=head1 SEE ALSO
L<TheSchwartz>, L<TheSchwartz::Simple>
=head1 AUTHOR
Fayland Lam, C<< <fayland at gmail.com> >>
Jeremy Stashewsky, C<< <jstash+cpan at gmail.com> >>
Luke Closs C<< <cpan at 5thplane.com> >>
=head1 COPYRIGHT & LICENSE
Copyright 2008,2016 Fayland Lam, all rights reserved.
This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.
=cut