The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#
# Copyright 2007 Paul Driver <frodwith@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

package POE::Component::MessageQueue::Storage::Double;
use Moose::Role;
use MooseX::MultiInitArg;

# These guys just call a method on both front and back stores and have a
# simple no-arg completion callback.  No reason to write them all!
foreach my $method (qw(empty disown_destination disown_all)) {
	__PACKAGE__->meta->add_method($method, sub {
		my $self = shift;
		my $last = pop;
		if(ref $last eq 'CODE')
		{
			my @args = @_;
			$self->front->$method(@args, sub {
				$self->back->$method(@args, $last);
			});
		}
		else
		{
			$self->front->$method(@_, $last);
			$self->back->$method(@_, $last);
		}
	});
}

with qw(POE::Component::MessageQueue::Storage);
use POE::Component::MessageQueue::Storage::BigMemory;

has front => (
	metaclass => 'MultiInitArg',
	init_args => ['front_store'],
	is        => 'ro',
	does      => qw(POE::Component::MessageQueue::Storage),
	default   => sub {POE::Component::MessageQueue::Storage::BigMemory->new()},
	required  => 1,
);

has back => (
	is        => 'ro',
	metaclass => 'MultiInitArg',
	init_args => [qw(back_store storage)],
	does      => qw(POE::Component::MessageQueue::Storage),
	required  => 1,
);

# Any true value for a given ID means the message is in the front store.
# (value may be useful data, like message size)
has front_info => (
	is => 'ro',
	isa => 'HashRef',
	default => sub { {} },
	traits  => ['Hash'],
	handles => {
		'in_front'     => 'exists',
		'get_front'    => 'get',
		'set_front'    => 'set',
		'clear_front'  => 'clear',
		'delete_front' => 'delete',
	},
);

after 'set_logger' => sub {
	my ($self, $logger) = @_;
	$self->front->set_logger($logger);
	$self->back->set_logger($logger);
};

sub in_back 
{
	my ($self, $id) = @_;
	return 1 unless $self->in_front($id);
	return $self->get_front($id)->{persisted};
}

sub _split_ids
{
	my ($self, $ids) = @_;
	my (@fids, @bids);
	foreach my $id (@$ids)
	{
		push (@fids, $id) if $self->in_front($id);
		push (@bids, $id) if $self->in_back($id);
	}
	return (\@fids, \@bids);
}

sub _doboth
{
	my ($self, $ids, $do_front, $do_back, $callback) = @_;
	my ($fids, $bids) = $self->_split_ids($ids);

	if (@$fids && @$bids)
	{
		$do_front->($fids, sub {$do_back->($bids, $callback)});
	}
	elsif(@$fids)
	{
		$do_front->($fids, $callback);
	}
	elsif(@$bids)
	{
		$do_back->($bids, $callback);
	}
	else
	{
		goto $callback;
	}
}

sub remove
{
	my ($self, $aref, $callback) = @_;
	$self->_doboth(
		$aref, 
		sub {
			my ($ids, $callback) = @_;
			$self->delete_front($ids);
			$self->front->remove($ids, $callback);
		},
		sub {
			my ($ids, $callback) = @_;
			$self->back->remove($ids, $callback);
		},
		$callback,
	);
}

sub claim
{
	my ($self, $aref, $client, $callback) = @_;
	$self->_doboth(
		$aref,
		sub {$self->front->claim($_[0], $client, $_[1])},
		sub {$self->back ->claim($_[0], $client, $_[1])},
	  $callback,
	);
}

sub get
{
	my ($self, $ids, $callback) = @_;
	my ($fids, $bids) = $self->_split_ids($ids);
	$self->front->get($fids, sub {
		goto $callback unless @$bids; # Avoid backstore call
		my $got_front = $_[0];
		$self->back->get($bids, sub {
			my $got_back = $_[0];
			push(@$got_back, @$got_front);
			goto $callback;
		});
	});
}

sub get_all
{
	my ($self, $callback) = @_;
	my %messages; # store in a hash to ensure uniqueness
	$self->front->get_all(sub {
		$messages{$_->id} = $_ foreach @{$_[0]};
		$self->back->get_all(sub {
			$messages{$_->id} = $_ foreach @{$_[0]};
			@_ = ([values %messages]);
			goto $callback;	
		});
	});
}

sub get_oldest
{
	my ($self, $callback) = @_;
	$self->front->get_oldest(sub {
		my $f = $_[0];
		$self->back->get_oldest(sub {
			my $b = $_[0];
			@_ = (
				($f && $b) ? 
				($f->timestamp < $b->timestamp ? $f : $b) :
				($f || $b)
			);
			goto $callback;
		});
	});
}

sub claim_and_retrieve
{
	my ($self, $destination, $client_id, $callback) = @_;

	$self->front->claim_and_retrieve($destination, $client_id, sub {
		if (my $msg = $_[0])
		{
			# We don't need to claim unless it's in the backstore already
			goto $callback unless ($self->in_back($msg->id));
			$self->back->claim($msg->id, $client_id, sub {
				@_ = ($msg);
				goto $callback;
			});
		}
		else
		{
			$self->back->claim_and_retrieve($destination, $client_id, sub {
				my $msg = $_[0];
				goto $callback
					if (not defined $msg or not $self->in_front($msg->id));

				$self->front->claim($msg->id, $client_id, sub {
					@_ = ($msg);
					goto $callback;
				});
			});
		}
	});
}

1;

__END__

=pod

=head1 NAME

POE::Component::MessageQueue::Storage::Double -- Stores composed of two other
stores.
 
=head1 DESCRIPTION

Refactor mercilessly, as they say.  They also say don't repeat yourself.  This
module contains functionality for any store that is a composition of two 
stores.  At least Throttled and Complex share this trait, and it doesn't make 
any sense to duplicate code between them.

=head1 CONSTRUCTOR PARAMETERS

=over 2

=item front => SCALAR

=item back => SCALAR

Takes a reference to a storage engine to use as the front store / back store.

=back

=head1 Unimplemented Methods

=over 2

=item store

This isn't implemented because Complex and Throttled differ here.  Perhaps
your storage differs here as well.  This is essentially where you specify
policy about what goes in which store.  Be sure you update the front_info hash
when you store something!

=item storage_shutdown

And this is where you specify policy about what happens when you die.  You
lucky person, you.

=back

=cut