The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Qudo::Driver::Skinny;
use DBIx::Skinny;

sub init_driver {
    my ($class, $master) = @_;

    for my $database (@{$master->{databases}}) {
        my $connection = $class->new($database);
        $master->set_connection($database->{dsn}, $connection);
    }
}

sub lookup_job {
    my ($self, $job_id) = @_;

    my $rs = $self->_search_job_rs(limit => 1);

    $rs->add_where('job.id' => $job_id);

    my $itr = $rs->retrieve('job');

    return $self->_get_job_data($itr);
}

sub find_job {
    my ($self, $limit, $func_ids) = @_;

    my $rs = $self->_search_job_rs(limit => $limit);

    $rs->add_where('job.func_id' => $func_ids);

    my $servertime = $self->get_server_time;
    $rs->add_where('job.grabbed_until' => { '<=', => $servertime});
    $rs->add_where('job.run_after'     => { '<=', => $servertime});

    my $itr = $rs->retrieve('job');

    return $self->_get_job_data($itr);
}

sub _search_job_rs {
    my ($self, %args) = @_;

    my $rs = $self->resultset(
        {
            select => [qw/job.id job.arg job.uniqkey job.func_id job.grabbed_until job.retry_cnt job.priority/],
            from   => 'job',
            limit  => $args{limit},
        }
    );
    $rs->order({column => 'job.priority', desc => 'DESC'});

    return $rs;
}

sub _get_job_data {
    my ($self, $itr) = @_;
    sub {
        my $job = $itr->next or return;
        return +{
            job_id            => $job->id,
            job_arg           => $job->arg,
            job_uniqkey       => $job->uniqkey,
            job_grabbed_until => $job->grabbed_until,
            job_retry_cnt     => $job->retry_cnt,
            job_priority      => $job->priority,
            func_id           => $job->func_id,
        };
    };
}

sub grab_a_job {
    my ($self, %args) = @_;

    return $self->update('job',
        {
            grabbed_until => $args{grabbed_until},
        },
        {
            id            => $args{job_id},
            grabbed_until => $args{old_grabbed_until},
        }
    );

}

sub logging_exception {
    my ($self, $args) = @_;
    $self->insert('exception_log', $args);
    return;
}

sub set_job_status {
    my ($self, $args) = @_;
    $self->insert('job_status', $args);
    return;
}

sub get_server_time {
    my $self = shift;
    my $unixtime_sql = $self->dbd->sql_for_unixtime;
    return $self->dbh->selectrow_array("SELECT $unixtime_sql");
}

sub enqueue {
    my ($self, $args) = @_;
    my $job = $self->insert('job', $args);
    return $job ? $job->id : undef;
}

sub reenqueue {
    my ($self, $job_id, $args) = @_;
    $self->update('job', $args, {id => $job_id});
}

sub dequeue {
    my ($self, $args) = @_;
    $self->delete('job', $args);
}

sub func_from_name {
    my ($self, $funcname) = @_;
    my $row = $self->find_or_create('func',{ name => $funcname });
    return { id => $row->id, name => $row->name };
}

sub func_from_id {
    my ($self, $funcid) = @_;
    my $row = $self->single('func',{ id => $funcid });
    return { id => $row->id, name => $row->name };
}

sub retry_from_exception_log {
    my ($self, $exception_log_id) = @_;

    $self->update('exception_log',
        {
            retried => 1,
        },
        {
            id => $exception_log_id,
        },
    );
}

sub exception_list {
    my ($self, $args) = @_;

    my $rs = $self->resultset(
        {
            select => [qw/exception_log.id
                          exception_log.func_id
                          exception_log.exception_time
                          exception_log.message
                          exception_log.uniqkey
                          exception_log.arg
                          exception_log.retried
                      /],
            from   => [qw/exception_log/],
            limit  => $args->{limit},
            offset => $args->{offset},
        }
    );

    if ($args->{funcs}) {
        $rs->from([]);
        $rs->add_join(
            exception_log => {
                type      => 'inner',
                table     => 'func',
                condition => 'exception_log.func_id = func.id',
            }
        );
        $rs->add_where('func.name' => $args->{funcs});
    }
    my $itr = $rs->retrieve;

    my @exception_list;
    while (my $row = $itr->next) {
        push @exception_list, $row->get_columns;
    }
    return \@exception_list;
}

sub job_status_list {
    my ($self, $args) = @_;

    my $rs = $self->resultset(
        {
            select => [qw/job_status.id
                          job_status.func_id
                          job_status.arg
                          job_status.uniqkey
                          job_status.status
                          job_status.job_start_time
                          job_status.job_end_time
                      /],
            from   => [qw/job_status/],
            limit  => $args->{limit},
            offset => $args->{offset},
        }
    );

    if ($args->{funcs}) {
        $rs->from([]);
        $rs->add_join(
            job_status => {
                type      => 'inner',
                table     => 'func',
                condition => 'job_status.func_id = func.id',
            }
        );
        $rs->add_where('func.name' => $args->{funcs});
    }
    my $itr = $rs->retrieve;

    my @job_status_list;
    while (my $row = $itr->next) {
        push @job_status_list, $row->get_columns;
    }
    return \@job_status_list;
}

sub job_count {
    my ($self, $funcs) = @_;

    my $rs = $self->resultset(
        {
            from => [qw/job/],
        }
    );
    $rs->add_select('COUNT(job.id)' => 'count');

    if ($funcs) {
        $rs->from([]);
        $rs->add_join(
            job => {
                type      => 'inner',
                table     => 'func',
                condition => 'job.func_id = func.id',
            }
        );
        $rs->add_where('func.name' => $funcs);
    }

    return $rs->retrieve->first->count;
}

1;