The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Hopkins::Queue;

use strict;

=head1 NAME

Hopkins::Queue - hopkins queue states and methods

=head1 DESCRIPTION

Hopkins::Queue contains all of the POE event handlers and
supporting methods for the initialization and management of
each configured hopkins queue.

=cut

use POE;
use Class::Accessor::Fast;

use Cache::FileCache;
use DateTime::Format::ISO8601;
use Tie::IxHash;

use Hopkins::Constants;
use Hopkins::Worker;
use Hopkins::Work;

use base 'Class::Accessor::Fast';

__PACKAGE__->mk_accessors(qw(kernel config cache name alias onerror onfatal concurrency works halted frozen error));

=head1 STATES

=over 4

=item new

=cut

sub new
{
	my $self = shift->SUPER::new(@_);

	Hopkins->log_debug('creating queue ' . $self->name);

	$self->alias('queue.' . $self->name);
	$self->works(new Tie::IxHash);

	$self->onerror(undef)
		unless $self->onerror
		and grep { $self->onerror eq $_ } qw(halt freeze shutdown flush);

	$self->cache(new Cache::FileCache {
		cache_root		=> $self->config->fetch('state/root')->stringify,
		namespace		=> 'queue/' . $self->name,
		directory_umask	=> 0077
	});

	$self->read_state;
	$self->write_state;

	return $self;
}

sub read_state
{
	my $self = shift;

	$self->frozen($self->cache->get('frozen') ? 1 : 0);
	$self->halted($self->cache->get('halted') ? 1 : 0);
	$self->error($self->cache->get('error'));

	my $aref = $self->cache->get('works');

	return if not ref $aref eq 'ARRAY';

	foreach my $href (@$aref) {
		my $work = new Hopkins::Work;
		my $date = undef;

		$work->queue($self);
		$work->options($href->{options});

		if (my $id = $href->{id}) {
			$work->id($id);
		} else {
			Hopkins->log_error('unable to determine task ID when reading state');
			next;
		}

		if (my $date = Hopkins->parse_datetime($href->{date_enqueued})) {
			$work->date_enqueued($date);
		} else {
			Hopkins->log_error('unable to parse date/time information when reading state');
		}

		# FIXME: date_to_execute not yet implemented

		if (my $date = Hopkins->parse_datetime($href->{date_to_execute})) {
			$work->date_to_execute($date);
		} else {
			#Hopkins->log_error('unable to parse date/time information when reading state');
			$work->date_to_execute(DateTime->now(time_zone => 'local'));
		}

		if (my $val = $href->{date_started}) {
			if (my $date = Hopkins->parse_datetime($href->{date_started})) {
				$work->date_started($date);
			} else {
				Hopkins->log_error('unable to parse date/time information when reading state');
			}
		}

		# attempt to locate the referenced task.  if the
		# configuration has changed and we can't locate the
		# referenced task, we'll want to halt the queue
		# until an operator can take a look at it.

		if (my $task = $self->config->get_task_info($href->{task})) {
			$work->task($task);
		} else {
			Hopkins->log_error("unable to locate task '$href->{task}' when reading state");

			$self->kernel->post(store => notify => task_aborted => $work->serialize);
		}

		# if the task was already started or has an invalid
		# vconfiguration, mark it as orphaned.  otherwise,
		# go ahead and queue it up for execution.

		if ($work->date_started or not defined $work->task) {
			$self->kernel->post(store => notify => task_orphaned => $work->serialize);
		} else {
			$self->enqueue($work, write => 0);
		}
	}
}

sub spawn
{
	my $self = shift;

	Hopkins->log_debug('spawning queue ' . $self->name);

	# this passive queue will act as an on-demand task
	# execution queue, waiting for enqueue events to be
	# posted to the kernel.

	#POE::Component::JobQueue->spawn
	#(
	#	Alias		=> $self->alias,
	#	WorkerLimit	=> $self->concurrency,
	#	Worker		=> sub { $self->spawn_worker(@_) },
	#	Passive		=> { Prioritizer => \&Hopkins::Queue::prioritize },
	#);

	#foreach my $work ($self->tasks->Values) {
	#	$self->kernel->post($self->alias => enqueue => dequeue => $work);
	#}

	# this active queue will poll hopkins periodically,
	# checking the parent Hopkins::Queue object for new
	# tasks to spawn.

	POE::Component::JobQueue->spawn
	(
		Alias		=> $self->alias,
		WorkerLimit	=> $self->concurrency,
		Worker		=> sub { $self->fetch_and_spawn_worker(@_) },
		Active		=>
		{
			#PollInterval	=> $global->{poll},
			PollInterval	=> 15,
			AckAlias		=> 'manager',
			AckState		=> 'complete'
		}
	);
}

=item contents

=cut

sub contents
{
	my $self = shift;
	my $args = ref $_[0] eq 'HASH' ? shift : { @_ };

	my $date	= $args->{executing};
	my @works	= $self->works->Values;

	return $date ? grep { $_->date_to_execute < $date } @works : @works;
}

=item find

=cut

sub find
{
	my $self	= shift;
	my $id		= shift;

	return $self->works->FETCH($id);
}

=item enqueue

=cut

sub enqueue
{
	my $self = shift;
	my $work = shift;
	my $args = { @_ };

	$self->works->Push($work->id => $work);
	$self->write_state unless $args->{write} and $args->{write} == 0;
}

=item dequeue

=cut

sub dequeue
{
	my $self = shift;
	my $work = shift;
	my $args = { @_ };

	my $id = UNIVERSAL::isa($work, 'Hopkins::Work') ? $work->id : $work;

	$self->works->Delete($id);
	$self->write_state unless $args->{write} and $args->{write} == 0;
}

=item write_state

write the queue's state to disk.

=cut

sub write_state
{
	my $self = shift;

	$self->cache->set(frozen => $self->frozen);
	$self->cache->set(halted => $self->halted);
	$self->cache->set(error => $self->error);
	$self->cache->set(works => [ map { $_->serialize } $self->works->Values ]);
}

=item stop

stops the queue, shutting down the PoCo::JobQueue session
if running by sending a stop event to it.

=cut

sub stop
{
	my $self = shift;

	$self->kernel->post($self->alias => 'stop') if $self->kernel;
}

=item spawn_worker

=cut

sub spawn_worker
{
	my $self = shift;

	my $args =
	{
		postback	=> shift,
		work		=> shift,
		queue		=> $self
	};

	new Hopkins::Worker $args;
}

=item fetch_and_spawn_worker

=cut

sub fetch_and_spawn_worker
{
	my $self		= shift;
	my $postback	= shift;

	return if $self->halted;

	#Hopkins->log_debug('polling ' . $self->name . ' queue for tasks to execute');

	my $now		= DateTime->now(time_zone => 'local');
	my @work	=
		sort Hopkins::Queue::prioritize
		grep { not defined $_->worker and $_->date_to_execute < $now }
		$self->works->Values;

	if (my $work = shift @work) {
		my $args =
		{
			postback	=> $postback->($work),
			work		=> $work,
			queue		=> $self
		};

		new Hopkins::Worker $args;
	}
}

=item prioritize

=cut

sub prioritize
{
	my $apri = $a->priority;
	my $bpri = $b->priority;

	$apri = 1 if $apri < 1;
	$apri = 9 if $apri > 9;
	$bpri = 1 if $bpri < 1;
	$bpri = 9 if $bpri > 9;

	return $apri <=> $bpri;
}

=item is_running

=cut

sub is_running
{
	my $self = shift;
	my $name = shift;

	return Hopkins->is_session_active($self->alias);
}

=item status

=cut

sub status
{
	my $self = shift;

	return HOPKINS_QUEUE_STATUS_HALTED	if $self->halted;
	return HOPKINS_QUEUE_STATUS_RUNNING	if $self->works->Length > 0;

	return HOPKINS_QUEUE_STATUS_IDLE;
}

=item status_string

=cut

sub status_string
{
	my $self = shift;

	for ($self->status) {
		$_ == HOPKINS_QUEUE_STATUS_IDLE && return 'idle';

		#$_ == HOPKINS_QUEUE_STATUS_RUNNING && $self->frozen
		#	&& return 'running (frozen)';

		#$_ == HOPKINS_QUEUE_STATUS_HALTED && $self->frozen
		#	&& return 'halted (frozen)';

		$_ == HOPKINS_QUEUE_STATUS_RUNNING		&& return 'running';
		$_ == HOPKINS_QUEUE_STATUS_HALTED		&& return 'halted';
	}
}

=item num_queued

=cut

sub num_queued
{
	my $self = shift;
	my $task = shift;

	return $self->works->Length if not defined $task;

	if (not ref $task eq 'Hopkins::Task') {
		Hopkins->log_warn('Hopkins::Queue->num_queued called with argument that is not a Hopkins::Task object');
		return 0;
	}

	return scalar grep { $_->task->name eq $task->name } $self->works->Values
}

=item start

=cut

sub start
{
	my $self = shift;

	$self->error(undef);
	$self->halted(0);

	return HOPKINS_QUEUE_ALREADY_RUNNING if $self->is_running;

	$self->spawn;

	return HOPKINS_QUEUE_STARTED;
}

=item halt

halts the queue.  no tasks will be executed, although tasks
may still be enqueued.

=cut

sub halt
{
	my $self = shift;

	$self->halted(1);
}

=item continue

reverses the action of halt by starting the queue back up.
the existing state of the frozen flag will be preserved.

=cut

sub continue
{
	my $self = shift;

	$self->halted(0);
}

=item freeze

freezes the queue.  no more tasks will be enqueued, but
currently queued tasks will be allowed to execute.

=cut

sub freeze
{
	my $self = shift;

	$self->frozen(1);
}

=item thaw

reverses the action of freeze by unsetting the frozen flag.
tasks will not be queable.  the existing halt state will be
preserved.

=cut

sub thaw
{
	my $self = shift;

	$self->frozen(0);
}

=item shutdown

shuts the queue down.  this is basically a shortcut for the
freeze and halt actions.  no more tasks will be executed and
no further tasks may be enqueud.

=cut

sub shutdown
{
	my $self = shift;

	$self->freeze;
	$self->halt;
}

=item flush

flush the queue of any tasks waiting to execute.  stops the
PoCo::JobQueue session (if running) and clears the internal
list of tasks.  if the queue was running prior to the flush,
the PoCo::JobQueue session is spun back up.

=cut

sub flush
{
	my $self = shift;

	$self->stop;
	$self->works->Delete($self->works->Keys);
	$self->start if not $self->halted;
}

=item DESTROY

=cut

sub DESTROY { shift->shutdown }

=back

=head1 AUTHOR

Mike Eldridge <diz@cpan.org>

=head1 LICENSE

This program is free software; you may redistribute it
and/or modify it under the same terms as Perl itself.

=cut

1;