The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#
# Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

package POE::Component::MessageQueue::Queue;

use POE;
use POE::Session;
use Moose;

with qw(POE::Component::MessageQueue::Destination);

sub flag { has $_[0] => (is => 'rw', default => 0) }
flag 'pumping';
flag 'pump_pending';
flag 'shutting_down';

sub stash {
	my $n = $_[0];
	has $n => (
		is      => 'rw',
		isa     => 'HashRef',
		default => sub { {} },
		traits  => ['Hash'],
		handles => {
			"set_$n"    => 'set',
			"del_$n"    => 'delete',
			"${n}_keys" => 'keys',
		}
	);
}

stash 'waiting';
stash 'serviced';

sub next_subscriber 
{
	my $self = $_[0];
	if (my @keys = $self->waiting_keys) 
	{
		my $id = pop(@keys);
		my $s = $self->del_waiting($id);
		if($s->client)
		{
		  $self->set_serviced($id, $s);
			return $s;
		}
		else
		{
			$self->delete_subscription($id);
			return $self->next_subscriber;
		}
	}
	else 
	{
		my $serviced = $self->serviced;
		$self->serviced($self->waiting);
		$self->waiting($serviced);
		return (keys %$serviced) && $self->next_subscriber;
	}
}

sub next_ready
{
  my $self = $_[0];
	my ($s, %seen);

	while ($s = $self->next_subscriber and !exists $seen{$s})
	{
		return $s if $s->ready;
		$seen{$s} = 1;
	}
	return;
}

after set_subscription => __PACKAGE__->can('set_waiting');
after delete_subscription => sub {
	my ($self, @args) = @_;
	$self->del_waiting(@args);
	$self->del_serviced(@args);
};

__PACKAGE__->meta->make_immutable();

sub BUILD
{
	my ($self, $args) = @_;
	POE::Session->create(
		object_states => [ $self => [qw(_start _shutdown _pump_state _pump_timer)]],
	);
}

sub _start
{
	my ($self, $kernel) = @_[OBJECT, KERNEL];
	$kernel->alias_set($self->name);

	$kernel->delay(_pump_timer => $self->parent->pump_frequency)
		if ($self->parent->pump_frequency);
}

sub shutdown { 
	my $self = $_[0];
	$self->shutting_down(1);
	$poe_kernel->post($self->name, '_shutdown') 
}

sub _shutdown
{
	my ($self, $kernel) = @_[OBJECT, KERNEL];
	$kernel->alias_remove($self->name);
	$kernel->alarm_remove_all();
}

# This is the pumping philosophy:  When we receive a pump request, we will
# give everyone a chance to claim a message.  If any pumps are asked for while
# this is happening, we will remember and do another pump when this one is
# finished (just one).  

# This means we're serializing claim and retrieve requests.  More work needs
# to be done to determine whether this is good or necessary.

sub is_persistent { return 1 }

sub _pump_state
{
	my $self = $_[OBJECT];
	return if $self->shutting_down;

	if (my $s = $self->next_ready)
	{
		$s->ready(0);

		$self->storage->claim_and_retrieve($self->name, $s->client->id, sub {
			if (my $msg = $_[0])
			{
				$self->dispatch_message($msg, $s);
				$poe_kernel->post($self->name, '_pump_state');
			}
			else
			{
				$s->ready(1);
				$self->_done_pumping();
			}
		});
	}
	else
	{
		$self->_done_pumping();
	}
}

sub _done_pumping
{
	my $self = $_[0];
	$self->pumping(0);
	$self->pump() if $self->pump_pending;
}

sub _pump_timer
{
	my ($self, $kernel) = @_[OBJECT, KERNEL];
	return if $self->shutting_down;

	# pump (the 1 means that we won't set pump_pending if
	# we are already in a pumping state).
	$self->pump(1);

	$kernel->delay(_pump_timer => $self->parent->pump_frequency);
}

sub pump
{
	my ($self, $skip_pending) = @_;

	if($self->pumping and not $skip_pending)
	{
		$self->pump_pending(1);
	}
	else
	{
		$self->log(debug => ' -- PUMP QUEUE: '.$self->name.' -- ');
		$self->notify('pump');
		$self->pump_pending(0);
		$self->pumping(1);
		$poe_kernel->call($self->name, '_pump_state');
	}
}

sub send
{
	my ($self, $message) = @_;
	return if $self->shutting_down;

	# If we already have a ready subscriber, we'll dispatch before we
	# store to give the subscriber a headstart on processing.
	if (not $message->has_delay and my $s = $self->next_ready)
	{
		my $mid = $message->id;
		my $cid = $s->client->id;
		if ($s->client_ack) 
		{
			$message->claim($cid);
			$self->log(info => "QUEUE: Message $mid claimed by $cid during send");
			$self->storage->store($message);
			$self->notify(store => $message);
		}
		else
		{
			$self->log(info => "QUEUE: Message $mid not stored, sent to $cid");
		}
		$self->dispatch_message($message, $s);
	}
	else
	{
		$self->storage->store($message, sub {
			$self->notify(store => $message);
			$self->pump();
		});
	}
}

1;