The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Job::Machine::DB;
$Job::Machine::DB::VERSION = '0.26';
use strict;
use warnings;
use Carp qw/croak confess/;
use DBI;
use Data::Serializer;

use constant QUEUE_PREFIX    => 'jm:';
use constant RESPONSE_PREFIX => 'jmr:';


sub new {
	my ($class, %args) = @_;
	croak "No connect information" unless $args{dbh} or $args{dsn};
	croak "invalid queue" if ref $args{queue} and ref $args{queue} ne 'ARRAY';

	$args{dbh_inherited} = 1 if $args{dbh};
	$args{user}     ||= undef;
	$args{password} ||= undef;
	$args{db_attr}  ||= undef;
	$args{dbh}      ||= DBI->connect($args{dsn},$args{user},$args{password},$args{db_attr});
	$args{database_schema}   ||= 'jobmachine';
	return bless \%args, $class;
}


sub serializer {
	my ($self) = @_;
	my $args = $self->{serializer_args} || {};
	$args->{serializer} ||= $self->{serializer} || 'Sereal';
	return $self->{serialize} ||= Data::Serializer->new(%$args);
}


sub listen {
	my ($self, %args) = @_;
	my $queue = $args{queue} || return undef;

	my $prefix = $args{reply} ?  RESPONSE_PREFIX :  QUEUE_PREFIX;
	for my $q (ref $queue ? @$queue : ($queue)) {
		$self->{dbh}->do(qq{listen "$prefix$q";});
	}
}


sub unlisten {
	my ($self, %args) = @_;
	my $queue = $args{queue} || return undef;

	my $prefix = $args{reply} ?  RESPONSE_PREFIX :  QUEUE_PREFIX;
	for my $q (ref $queue ? @$queue : ($queue)) {
		$self->{dbh}->do(qq{unlisten "$prefix$q";});
	}
}


sub notify {
	my ($self, %args) = @_;
	my $queue = $args{queue} || return undef;
	my $payload = $args{payload};
	my $prefix = $args{reply} ?  RESPONSE_PREFIX :  QUEUE_PREFIX;
	$queue = $prefix . $queue;
	my $sql = qq{SELECT pg_notify(?,?)};
	my $task = $self->select_first(
		sql => $sql,
		data => [ $queue, $payload],
	);
}


sub get_notification {
	my ($self,$timeout) = @_;
	my $dbh = $self->dbh;
	my $notifies = $dbh->func('pg_notifies');
	return $notifies;
}


sub set_listen {
	my ($self,$timeout) = @_;
	my $dbh = $self->dbh;
	my $notifies = $dbh->func('pg_notifies');
	if (!$notifies) {
		my $fd = $dbh->{pg_socket};
		vec(my $rfds='',$fd,1) = 1;
		my $n = select($rfds, undef, undef, $timeout);
		$notifies = $dbh->func('pg_notifies');
	}
	return $notifies || [0,0];
}


sub fetch_work_task {
	my $self = shift;
	my $queue = ref $self->{queue} ? $self->{queue} : [$self->{queue}];
	$self->{current_table} = 'task';
	my $elems = join(',', ('?') x @$queue);
	my $sql = qq{
		UPDATE "$self->{database_schema}".$self->{current_table} t
		SET status=100,
			modified=default
		FROM "jobmachine".class cx
		WHERE t.class_id = cx.class_id
		AND task_id = (
			SELECT min(task_id)
			FROM "$self->{database_schema}".$self->{current_table} t
			JOIN "jobmachine".class c USING (class_id)
			WHERE t.status=0
			AND c.name IN ($elems)
			AND (t.run_after IS NULL
			OR t.run_after > now())
		)
		AND t.status=0
		RETURNING *
		;
	};
	my $task = $self->select_first(
		sql => $sql,
		data => $queue
	) || return;

	$self->{task_id} = $task->{task_id};
	$task->{data} = $self->serializer->deserialize(delete $task->{parameters});
	return $task;
}


sub insert_task {
	my ($self,$data,$queue) = @_;
	my $class = $self->fetch_class($queue);
	$self->{current_table} = 'task';
	my $frozen = $self->serializer->serialize($data);
	my $sql = qq{
		INSERT INTO "$self->{database_schema}".$self->{current_table}
			(class_id,parameters,status)
		VALUES (?,?,?)
		RETURNING task_id
	};
	$self->insert(sql => $sql,data => [$class->{class_id},$frozen,0]);
}


sub set_task_status {
	my ($self,$status) = @_;
	my $id = $self->task_id;
	$self->{current_table} = 'task';
	my $sql = qq{
		UPDATE "$self->{database_schema}".$self->{current_table}
		SET status=?
		WHERE task_id=?
	};
	$self->update(sql => $sql,data => [$status,$id]);
}


sub fetch_class {
	my ($self,$queue) = @_;
	$self->{current_table} = 'class';
	my $sql = qq{
		SELECT *
		FROM "$self->{database_schema}".$self->{current_table}
		WHERE name=?
	};
	return $self->select_first(sql => $sql,data => [$queue]) || $self->insert_class($queue);
}


sub fetch_task {
	my ($self,$id) = @_;
	$self->{current_table} = 'task';
	my $sql = qq{
		SELECT t.*, c.name
		FROM "$self->{database_schema}".$self->{current_table} t
		JOIN "$self->{database_schema}".class c USING (class_id)
		WHERE task_id=?
	};
	my $task = $self->select_first(sql => $sql,data => [$id]) or return;

	$task->{frozen} = $task->{parameters};
	$task->{parameters} = $self->serializer->deserialize($task->{parameters});
	return $task;
}


sub insert_class {
	my ($self,$queue) = @_;
	my $sql = qq{
		INSERT INTO "$self->{database_schema}".$self->{current_table}
			(name)
		VALUES (?)
		RETURNING class_id
	};
	$self->select_first(sql => $sql,data => [$queue]);
}


sub insert_result {
	my ($self,$data) = @_;
	$self->{current_table} = 'result';
	my @columns = qw/task_id result/;
	my @values = ($self->{task_id});
	if (ref $data eq 'HASH') {
		push @columns, 'resulttype';
		my $type = delete $data->{type};
		push @values, $self->serializer->serialize($data), $type;
	} else {
		push @values, $self->serializer->serialize($data);
	}
	my $columns = join ', ', @columns;
	my $qs = join(',', ('?') x @columns);
	my $sql = qq{
		INSERT INTO "$self->{database_schema}".$self->{current_table}
			($columns)
		VALUES ($qs)
		RETURNING result_id
	};
	$self->insert(sql => $sql,data => \@values);
}


sub fetch_result {
	my ($self,$result_id) = @_;
	$self->{current_table} = 'result';
	my $sql = qq{
		SELECT *
		FROM "$self->{database_schema}".$self->{current_table}
		WHERE result_id=?
	};
	my $result = $self->select_first(sql => $sql,data => [$result_id]) || return;

	my $r = $self->serializer->deserialize($result->{result});
	$result->{result} = $r;
	return $result;
}


sub fetch_first_result {
	my ($self,$task_id) = @_;
	$self->{current_table} = 'result';
	my $sql = qq{
		SELECT *
		FROM "$self->{database_schema}".$self->{current_table}
		WHERE task_id=?
		ORDER BY result_id DESC
	};
	my $result = $self->select_first(sql => $sql,data => [$task_id]) || return;

	return $self->serializer->deserialize($result->{result});
}


sub fetch_results {
	my ($self,$id) = @_;
	$self->{current_table} = 'result';
	my $sql = qq{
		SELECT *
		FROM "$self->{database_schema}".$self->{current_table}
		WHERE task_id=?
		ORDER BY result_id DESC
	};
	my $results = $self->select_all(sql => $sql,data => [$id]) || return;

	return [map { {id => $_->{result_id}, type => $_->{resulttype}, result => $self->serializer->deserialize($_->{result}) } } @{ $results } ];
}


sub get_statuses {
	my ($self) = @_;
	$self->{current_table} = 'task';
	my $sql = qq{
		SELECT status
		FROM "$self->{database_schema}".$self->{current_table}
		GROUP BY status
	};
	my $stats = $self->select_all(sql => $sql) || return;
	return $stats;
}


sub get_classes {
	my ($self) = @_;
	$self->{current_table} = 'class';
	my $sql = qq{
		SELECT *
		FROM "$self->{database_schema}".$self->{current_table}
	};
	my $stats = $self->select_all(sql => $sql) || return;
	return $stats;
}


sub get_tasks {
	my ($self,%args) = @_;
	$self->{current_table} = 'task';
	my ($where_clause, @where_args) = $self->where_clause($args{where});
	my $order_by = $self->order_by($args{order_by});
	my $sql = qq{
		SELECT t.*, c.name
		FROM "$self->{database_schema}".$self->{current_table} t
		JOIN "$self->{database_schema}".class c USING (class_id)
		$where_clause
		$order_by
	};
	my $tasks = $self->select_all(sql => $sql,data => \@where_args) || return;
	return $tasks;
}


sub revive_tasks {
	my ($self,$max) = @_;
	$self->{current_table} = 'task';
	my $status = 100;
	my $sql = qq{
		UPDATE "$self->{database_schema}".$self->{current_table}
		SET status=0
		WHERE status=?
		AND modified < now() - INTERVAL '$max seconds'
	};
	my $result = $self->do(sql => $sql,data => [$status]);
	return $result;
}


sub fail_tasks {
	my ($self,$retries) = @_;
	$self->{current_table} = 'result';
	my $limit = 100;
	my $sql = qq{
		SELECT task_id
		FROM "$self->{database_schema}".$self->{current_table}
		GROUP BY task_id
		HAVING count(*)>?
		LIMIT ?
	};
	my $result = $self->select_all(sql => $sql,data => [$retries,$limit]) || return 0;
	return 0 unless @$result;

	my $task_ids = join ',',map {$_->{task_id}} @$result;
	$self->{current_table} = 'task';
	my $status = 900;
	$sql = qq{
		UPDATE "$self->{database_schema}".$self->{current_table}
		SET status=?
		WHERE task_id IN ($task_ids)
	};
	$self->do(sql => $sql,data => [$status]);
	return scalar @$result;
}


sub remove_tasks {
	my ($self,$after) = @_;
	return 0 unless $after;

	$self->{current_table} = 'task';
	my $limit = 100;
	my $sql = qq{
		DELETE FROM "$self->{database_schema}".$self->{current_table}
		WHERE modified < now() - INTERVAL '$after days'
	};
	my $result = $self->do(sql => $sql,data => []);
	return $result;
}


sub select_first {
	my ($self, %args) = @_;
	my $sth = $self->dbh->prepare($args{sql}) || return 0;

	unless($sth->execute(@{$args{data}})) {
		my @c = caller;
		print STDERR "File: $c[1] line $c[2]\n";
		print STDERR $args{sql}."\n" if($args{sql});
		return 0;
	}
	my $r = $sth->fetchrow_hashref();
	$sth->finish();
	return ( $r );
}


sub select_all {
	my ($self, %args) = @_;
	my $sth = $self->dbh->prepare($args{sql}) || return 0;
	$self->set_bind_type($sth,$args{data} || []);
	unless($sth->execute(@{$args{data}})) {
		my @c = caller;
		print STDERR "File: $c[1] line $c[2]\n";
		print STDERR $args{sql}."\n" if($args{sql});
		return 0;
	}
	my @result;
	while( my $r = $sth->fetchrow_hashref) {
			push(@result,$r);
	}
	$sth->finish();
	return ( \@result );
}

sub set_bind_type {
	my ($self,$sth,$data) = @_;
	for my $i (0..scalar(@$data)-1) {
		next unless(ref($data->[$i]));

		$sth->bind_param($i+1, undef, $data->[$i]->[1]);
		$data->[$i] = $data->[$i]->[0];
	}
	return;
}

sub do {
	my ($self, %args) = @_;
	my $sth = $self->dbh->prepare($args{sql}) || return 0;

	$sth->execute(@{$args{data}});
	my $rows = $sth->rows;
	$sth->finish();
	return $rows;
}

sub insert {
	my ($self, %args) = @_;
	my $sth = $self->dbh->prepare($args{sql}) || return 0;

	$sth->execute(@{$args{data}});
	my $retval = $sth->fetch()->[0];
	$sth->finish();
	return $retval;
}

sub update {
	my $self = shift;
	$self->do(@_);
	return;
}

sub dbh {
	return $_[0]->{dbh} || confess "No database handle";
}

sub task_id {
	return $_[0]->{task_id} || confess "No task id";
}

sub disconnect {
	return $_[0]->{dbh}->disconnect if $_[0]->{dbh};
}

sub DESTROY {
	my $self = shift;
	$self->disconnect() unless $self->{dbh_inherited};
	return;
}


sub where_clause {
	my ($self, $where) = @_;
	my $where_clause = join(' AND ', ("$_ = ?") x keys %$where);
	$where_clause = "WHERE $where_clause" if $where_clause;
	return $where_clause, values %$where;
}


sub order_by {
	my ($self, $order) = @_;
	return unless ref $order eq 'HASH';

	my $order_by = join(',', ("$_") x keys %$order);
	$order_by = "ORDER BY $order_by" if $order_by;
	return $order_by;
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Job::Machine::DB

=head1 VERSION

version 0.26

=head1 NAME

Job::Machine::DB - Database class for Job::Machine

=head1 METHODS

=head2 new

  my $client = Job::Machine::DB->new(
	  dbh   => $dbh,
	  queue => 'queue.subqueue',

  );

  my $client = Job::Machine::Base->new(
	  dsn   => @dsn,
  );

=head2 serializer

Returns the serializer, default Data::Serializer

=head2 listen

Sets up the listener.  Quit listening to the named queues. If 'reply' is
passed, we listen to the related reply queue instead of the task queue.

Return undef immediately if no queue is provided.

 $self->listen( queue => 'queue_name' );
 $self->listen( queue => \@queues, reply => 1  );

=head2 unlisten

Quit listening to the named queues. If 'reply' is passed, we unlisten
to the related reply queue instead of the task queue.

Return undef immediately if no queue is provided.

 $self->unlisten( queue => 'queue_name' );
 $self->unlisten( queue => \@queues, reply => 1  );

=head2 notify

Sends an asynchronous notification to the named queue, with an optional
payload. If 'reply' is true, then the queue names are taken to be reply.

Return undef immediately if no queue name is provided.

 $self->notify( queue => 'queue_name' );
 $self->notify( queue => 'queue_name', reply => 1, payload => $data  );

=head2 get_notification

Retrieve one notification, if there is one

Retrievies the pending notifications.

 my $notifies = $self->get_notification();

The return value is an arrayref where each row looks like this:

 my ($name, $pid, $payload) = @$notify;

=head2 set_listen

Wait for a notification. The required parameter timeout tells for how long time to wait.

=head2 fetch_work_task

Fetch one work task from the task table

=head2 insert_task

Insert a row in the task table

=head2 set_task_status

Update the task with a new status

=head2 fetch_class

Fetch a class

=head2 fetch_task

Fetch a task

=head2 insert_class

Insert a row in the class table

=head2 insert_result

Insert a row in the result table

Argument

 data - either a scalar value that will be inserted as the result, or a hashref containing the type and result

=head2 fetch_result

Fetch a result using the result id

=head2 fetch_first_result

Fetch a result using the task id

=head2 fetch_results

Fetch all results of a given task

=head2 get_statuses

Fetch all distinct statuses

=head2 get_classes

Fetch all classes

=head2 get_tasks

Fetch all tasks, joined with the class for a suitable name

=head2 revive_tasks

	1. Find started tasks that have passed the time limit, most probably because of a dead worker. (status 100, modified < now - max_runtime)
	2. Trim status so task can be tried again

=head2 fail_tasks

	1. Find tasks that have failed too many times (# of result rows > $self->retries
	2. fail them (Set status 900)
	There's a hard limit (100) for how many tasks can be failed at one time for
	performance resons

=head2 remove_tasks

	3. Find tasks that should be removed (remove_task < now)
	- delete them
	- log

=head2 select_first

Select the first row from the given sql statement

=head2 select_all

Select all rows from the given sql statement

=head2 where_clause

Very light weight where clause builder

=head2 order_by

Very light weight order-by builder

=head1 AUTHOR

Kaare Rasmussen <kaare@cpan.org>.

=head1 COPYRIGHT

Copyright (C) 2009,2015, Kaare Rasmussen

This module is free software; you can redistribute it or modify it
under the same terms as Perl itself.

=head1 AUTHOR

Kaare Rasmussen <kaare at cpan dot net>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2015 by Kaare Rasmussen.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut