The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Schedule::LongSteps::Storage::DBIxClass;
$Schedule::LongSteps::Storage::DBIxClass::VERSION = '0.022';
use Moose;
extends qw/Schedule::LongSteps::Storage/;

use DateTime;
use Log::Any qw/$log/;
use Scope::Guard;
use Action::Retry;

has 'schema' => ( is => 'ro', isa => 'DBIx::Class::Schema', required => 1);
has 'resultset_name' => ( is => 'ro', isa => 'Str', required => 1);

has 'limit_per_tick' => ( is => 'ro', isa => 'Int', default => 50 );

sub _get_resultset{
    my ($self) = @_;
    return $self->schema()->resultset($self->resultset_name());

around [ 'prepare_due_processes', 'create_process' ] => sub{
    my ($orig, $self, @rest ) = @_;

    # Transfer the current autocommit nature of the DBH
    # as a transation might have been created on this DBH outside
    # of this schema. A transaction on DBI sets AutoCommit to false
    # on the DBH. transaction_depth is just a boolean on the storage.

    # First restore transaction depth as it was.
    my $pre_transaction_depth = $self->schema()->storage()->transaction_depth();
    my $guard = Scope::Guard->new(
            $log->trace("Restoring transaction_depth = $pre_transaction_depth");
            $self->schema()->storage()->transaction_depth( $pre_transaction_depth );

    my $current_transaction_depth = $self->schema()->storage()->dbh()->{AutoCommit} ? 0 : 1;
    $log->trace("Setting transaction_depth as NOT dbh AutoCommit = ".$current_transaction_depth);
    $self->schema()->storage()->transaction_depth( $current_transaction_depth );
    return $self->$orig( @rest );

=head1 NAME

Schedule::LongSteps::Storage::DBIxClass - DBIx::Class based storage.


First instantiate a storage with your L<DBIx::Class::Schema> and the name
of the resultset that represent the stored process:

  my $storage = Schedule::LongSteps::Storage::DBIxClass->new({
                   schema => $dbic_schema,
                   resultset_name => 'LongstepsProcess'

Then build and use a L<Schedule::LongSteps> object:

  my $long_steps = Schedule::LongSteps->new({ storage => $storage });




=item schema

You DBIx::Class::Schema. Mandatory.

=item resultset_name

The name of the resultset holding the processes in your Schema. See section 'RESULTSET REQUIREMENTS'. Mandatory.

=item limit_per_tick

The maximum number of processes that will actually run each time you
call $longsteps->run_due_processes(). Use that to control how long it takes to run
a single call to $longsteps->run_due_processes().

Note that you can have an arbitrary number of processes all doing $longsteps->run_due_processes() AT THE SAME TIME.

This will ensure that no process step is run more than one time.

Default to 50.



The resultset to use with this storage MUST contain the following columns, constraints and indices:


=item id

A unique primary key auto incrementable identifier

=item process_class

A VARCHAR long enough to hold  your L<Schedule::LongSteps::Process> class names. NOT NULL.

=item what

A VARCHAR long enough to hold the name of one of your steps. Can be NULL.

=item status

A VARCHAR(50) NOT NULL, defaults to 'pending'

=item run_at

A Datetime (or timestamp with timezone in PgSQL). Will hold a UTC Timezoned date of the next run. Default to NULL.

Please index this so it is fast to select a range.

=item run_id

A CHAR or VARCHAR (at least 36). Default to NULL.

Please index this so it is fast to select rows with a matching run_id

=item state

A Reasonably long TEXT field (or JSON field in supporting databases) capable of holding
a JSON dump of pure Perl data. NOT NULL.

You HAVE to implement inflating and deflating yourself. See L<DBIx::Class::InflateColumn::Serializer::JSON>
or similar techniques.

See t/fullblown.t for a full blown working example.

=item error

A reasonably long TEXT field capable of holding a full stack trace in case something goes wrong. Defaults to NULL.



=head2 prepare_due_processes

See L<Schedule::LongSteps::Storage::DBIxClass>


sub prepare_due_processes{
    my ($self) = @_;

    my $now = DateTime->now();
    my $rs = $self->_get_resultset();
    my $dtf = $self->schema()->storage()->datetime_parser();

    my $uuid = $self->uuid()->create_str();
    $log->info("Creating batch ID $uuid");

    # Note that we do not use the SELECT FOR UPDATE technique here.
    # Instead this generates a single UPDATE statement like this one:
    # UPDATE longsteps_process SET run_id = ?, status = ? WHERE ( id IN ( SELECT FROM longsteps_process me WHERE ( ( run_at <= ? AND run_id IS NULL ) ) LIMIT ? ) )
    my $stuff = sub{
            run_at => { '<=' => $dtf->format_datetime( $now ) },
            run_id => undef,
        }, {
            rows => $self->limit_per_tick(),
        } )
                run_id => $uuid,
                status => 'running'

    # And return them as individual results.
    return $rs->search({
        run_id => $uuid,

=head2 create_process

See L<Schedule::LongSteps::Storage>

This override adds retrying in case of deadlock detection.


sub create_process{
    my ($self, $process_properties) = @_;
    return $self->_retry_transaction(
            return $self->_get_resultset()->create($process_properties);

=head2 find_process

See L<Schedule::LongSteps::Storage>


sub find_process{
    my ($self, $process_id) = @_;
    return $self->_get_resultset()->find({ id => $process_id });

=head2 update_process

Overrides L<Schedule::LongSteps::Storage#update_process> to add
some retrying in case of DB deadlock detection.


override 'update_process' => sub{
    my ($self, $process, $properties) = @_;
    return $self->_retry_transaction( sub{
                                          $log->trace("Attempting to update process ".$process->id());
                                          $process->update( $properties );
                                      } );

sub _retry_transaction{
    my ($self, $code) = @_;

    my $retry = Action::Retry->new(
        attempt_code => $code,
        retry_if_code => sub{
            my $exception = $_[0];
            # The driver tells us to retry the transaction.
            # For instance:

            # Note that if this is false, then the Action::Retry code
            # will set $@ to the last error and return whatever the code has returned (most
            # probably undef in this case.
            # This is managed by the error testing after the call to 'run'
            return !! ( ( $exception || '' )  =~ m/try restarting transaction/ );
        strategy => 'Fibonacci',
    my $ret = $retry->run();
    if( my $err = $@ ){
    return $ret;
