The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Schedule::LongSteps::Storage::DBIxClass;
$Schedule::LongSteps::Storage::DBIxClass::VERSION = '0.022';
use Moose;
extends qw/Schedule::LongSteps::Storage/;

use DateTime;
use Log::Any qw/$log/;
use Scope::Guard;
use Action::Retry;

has 'schema' => ( is => 'ro', isa => 'DBIx::Class::Schema', required => 1);
has 'resultset_name' => ( is => 'ro', isa => 'Str', required => 1);

has 'limit_per_tick' => ( is => 'ro', isa => 'Int', default => 50 );

sub _get_resultset{
    my ($self) = @_;
    return $self->schema()->resultset($self->resultset_name());
}

around [ 'prepare_due_processes', 'create_process' ] => sub{
    my ($orig, $self, @rest ) = @_;

    # Transfer the current autocommit nature of the DBH
    # as a transation might have been created on this DBH outside
    # of this schema. A transaction on DBI sets AutoCommit to false
    # on the DBH. transaction_depth is just a boolean on the storage.

    # First restore transaction depth as it was.
    my $pre_transaction_depth = $self->schema()->storage()->transaction_depth();
    my $guard = Scope::Guard->new(
        sub{
            $log->trace("Restoring transaction_depth = $pre_transaction_depth");
            $self->schema()->storage()->transaction_depth( $pre_transaction_depth );
        });

    my $current_transaction_depth = $self->schema()->storage()->dbh()->{AutoCommit} ? 0 : 1;
    $log->trace("Setting transaction_depth as NOT dbh AutoCommit = ".$current_transaction_depth);
    $self->schema()->storage()->transaction_depth( $current_transaction_depth );
    return $self->$orig( @rest );
};


=head1 NAME

Schedule::LongSteps::Storage::DBIxClass - DBIx::Class based storage.

=head1 SYNOPSIS

First instantiate a storage with your L<DBIx::Class::Schema> and the name
of the resultset that represent the stored process:

  my $storage = Schedule::LongSteps::Storage::DBIxClass->new({
                   schema => $dbic_schema,
                   resultset_name => 'LongstepsProcess'
                });

Then build and use a L<Schedule::LongSteps> object:

  my $long_steps = Schedule::LongSteps->new({ storage => $storage });

  ...

=head1 ATTRIBUTES

=over

=item schema

You DBIx::Class::Schema. Mandatory.

=item resultset_name

The name of the resultset holding the processes in your Schema. See section 'RESULTSET REQUIREMENTS'. Mandatory.

=item limit_per_tick

The maximum number of processes that will actually run each time you
call $longsteps->run_due_processes(). Use that to control how long it takes to run
a single call to $longsteps->run_due_processes().

Note that you can have an arbitrary number of processes all doing $longsteps->run_due_processes() AT THE SAME TIME.

This will ensure that no process step is run more than one time.

Default to 50.

=back

=head1 RESULTSET REQUIREMENTS

The resultset to use with this storage MUST contain the following columns, constraints and indices:

=over

=item id

A unique primary key auto incrementable identifier

=item process_class

A VARCHAR long enough to hold  your L<Schedule::LongSteps::Process> class names. NOT NULL.

=item what

A VARCHAR long enough to hold the name of one of your steps. Can be NULL.

=item status

A VARCHAR(50) NOT NULL, defaults to 'pending'

=item run_at

A Datetime (or timestamp with timezone in PgSQL). Will hold a UTC Timezoned date of the next run. Default to NULL.

Please index this so it is fast to select a range.

=item run_id

A CHAR or VARCHAR (at least 36). Default to NULL.

Please index this so it is fast to select rows with a matching run_id

=item state

A Reasonably long TEXT field (or JSON field in supporting databases) capable of holding
a JSON dump of pure Perl data. NOT NULL.

You HAVE to implement inflating and deflating yourself. See L<DBIx::Class::InflateColumn::Serializer::JSON>
or similar techniques.

See t/fullblown.t for a full blown working example.

=item error

A reasonably long TEXT field capable of holding a full stack trace in case something goes wrong. Defaults to NULL.

=back

=cut

=head2 prepare_due_processes

See L<Schedule::LongSteps::Storage::DBIxClass>

=cut

sub prepare_due_processes{
    my ($self) = @_;

    my $now = DateTime->now();
    my $rs = $self->_get_resultset();
    my $dtf = $self->schema()->storage()->datetime_parser();

    my $uuid = $self->uuid()->create_str();
    $log->info("Creating batch ID $uuid");


    # Note that we do not use the SELECT FOR UPDATE technique here.
    # Instead this generates a single UPDATE statement like this one:
    # UPDATE longsteps_process SET run_id = ?, status = ? WHERE ( id IN ( SELECT me.id FROM longsteps_process me WHERE ( ( run_at <= ? AND run_id IS NULL ) ) LIMIT ? ) )
    my $stuff = sub{
        $rs->search({
            run_at => { '<=' => $dtf->format_datetime( $now ) },
            run_id => undef,
        }, {
            rows => $self->limit_per_tick(),
        } )
            ->update({
                run_id => $uuid,
                status => 'running'
            });
    };
    $stuff->();

    # And return them as individual results.
    return $rs->search({
        run_id => $uuid,
    })->all();
}

=head2 create_process

See L<Schedule::LongSteps::Storage>

This override adds retrying in case of deadlock detection.

=cut

sub create_process{
    my ($self, $process_properties) = @_;
    return $self->_retry_transaction(
        sub{
            return $self->_get_resultset()->create($process_properties);
        });
}

=head2 find_process

See L<Schedule::LongSteps::Storage>

=cut

sub find_process{
    my ($self, $process_id) = @_;
    return $self->_get_resultset()->find({ id => $process_id });
}

=head2 update_process

Overrides L<Schedule::LongSteps::Storage#update_process> to add
some retrying in case of DB deadlock detection.

=cut

override 'update_process' => sub{
    my ($self, $process, $properties) = @_;
    return $self->_retry_transaction( sub{
                                          $log->trace("Attempting to update process ".$process->id());
                                          $process->update( $properties );
                                      } );
};

sub _retry_transaction{
    my ($self, $code) = @_;

    my $retry = Action::Retry->new(
        attempt_code => $code,
        retry_if_code => sub{
            my $exception = $_[0];
            # The driver tells us to retry the transaction.
            # For instance: https://dev.mysql.com/doc/refman/5.6/en/innodb-deadlocks-handling.html

            # Note that if this is false, then the Action::Retry code
            # will set $@ to the last error and return whatever the code has returned (most
            # probably undef in this case.
            # This is managed by the error testing after the call to 'run'
            return !! ( ( $exception || '' )  =~ m/try restarting transaction/ );
        },
        strategy => 'Fibonacci',
    );
    my $ret = $retry->run();
    if( my $err = $@ ){
        confess($err);
    }
    return $ret;
}

__PACKAGE__->meta->make_immutable();