The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# $Id$
#
# Copyright (c) 2007 Daisuke Maki <daisuke@endeworks.jp>
# All rights reserved.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

package POE::Component::MessageQueue::Statistics;
use strict;
use warnings;

sub new
{
	my $class = shift;
	my $self  = bless {
		statistics => {
			ID => sprintf(
				"POE::Component::MessageQueue version %s (PID: $$)",
				# hide from PAUSE
				eval join('::', '$POE', 'Component', 'MessageQueue', 'VERSION') 
			),
			total_stored  => 0,
			total_sent    => 0,
			subscriptions => 0,
			queues        => {},
		},
		store_info => {},
		publishers => [],
	}, $class;

	$self;
}

sub register
{
	my ($self, $mq) = @_;
	$mq->register_event( $_, $self ) 
		for qw(store dispatch remove recv subscribe unsubscribe);
}

sub add_publisher {
  my ($self, $pub) = @_; 
  push(@{$self->{publishers}}, $pub);
}

sub get_queue
{
	my ($self, $name) = @_;

	my $queue = $self->{statistics}{queues}{$name};

	unless ($queue)
	{
		$queue = $self->{statistics}{queues}{$name} = {};
		$queue->{$_} = 0 foreach qw(
			sent          stored
			total_stored  total_sent
			total_recvd   avg_secs_stored  avg_size_recvd
		);
	}

	return $queue;
}

sub get_topic
{
	my ($self, $name) = @_;
	my $topic = $self->{statistics}{topics}{$name};

	unless ($topic)
	{
		$topic = $self->{statistics}{topics}{$name} = {};
		$topic->{$_} = 0 foreach qw(sent total_sent total_recvd avg_size_recvd);
	}

	return $topic;
}

sub shutdown 
{
	my $self = shift;
	foreach my $pub (@{$self->{publishers}}) 
	{
		$pub->shutdown();
	}
}

sub notify
{
	my ($self, $event, $data) = @_;
	if(my $method = $self->can("notify_$event")) 
	{
		$method->($self, $data);
	}
	else
	{
		die "Tried to notify $event, which has no handler.";
	}
}

sub notify_store
{
	my ($self, $message) = @_;
	my $qname = $message->destination;

	return unless $qname =~ m(/queue/(.*));
	$qname = $1;

	$self->{store_info}->{$message->id} = {
		qname     => $qname,
		timestamp => time(),
	};

	my $global = $self->{statistics};
	$global->{total_stored}++;

	my $stats = $self->get_queue($qname);
	$stats->{stored}++;
	$stats->{total_stored}++;
	$stats->{last_stored} = scalar localtime();
}

sub reaverage {
	my ($total, $average, $size) = @_;
	return 0 if ($total <= 0);
	return ($average * ($total - 1) + $size) / $total;
}

sub get_destination
{
	my ($self, $data) = @_;
	my $d = $data->{destination};
	if ($d->name =~ m{/.*/(.*)})
	{
		return $d->isa('POE::Component::MessageQueue::Queue') ?
		                                 $self->get_queue($1) :
		                                 $self->get_topic($1) ;
	}
	return;
}

sub notify_recv
{
	my ($self, $data) = @_;
	my $stats = $self->get_destination($data);
	$stats->{total_recvd}++;

	# recalc the average
	$stats->{avg_size_recvd} = reaverage(
		$stats->{total_recvd},
		$stats->{avg_size_recvd},
		$data->{message}->size,
	);
}

sub notify_dispatch
{
	my ($self, $data) = @_;

	my $global = $self->{statistics};
	$global->{total_sent}++;

	my $stats = $self->get_destination($data);
	$stats->{total_sent}++;
	$stats->{sent}++;
	$stats->{last_sent} = scalar localtime();
}

sub notify_remove {
	my ($self, $id) = @_;
	use YAML;

	if (my $store_info = delete $self->{store_info}->{$id})
	{
		my $stats = $self->get_queue($store_info->{qname});
		$stats->{stored}--;
		$stats->{avg_secs_stored} = reaverage(
			$stats->{total_stored},
			$stats->{avg_secs_stored},
			(time() - $store_info->{timestamp})
		);
	}
}

sub notify_subscribe
{
	my ($self, $data) = @_;

	# Global
	my $h = $self->{statistics};
	$h->{subscriptions}++;

	# Per-queue
	my $stats = $self->get_destination($data);
	$stats->{subscriptions}++;
}

sub notify_unsubscribe
{
	my ($self, $data) = @_;

	# Global
	my $h = $self->{statistics};
	$h->{subscriptions}--;

	# Per-queue
	my $stats = $self->get_destination($data);
	$stats->{subscriptions}--;
}

sub notify_pump {}

1;

__END__

=head1 NAME

POE::Component::MessageQueue::Statistics - Gather MQ Usage Statistics 

=head1 SYNOPSIS

	my $statistics = POE::Component::MessageQueue::Statistics->new();
	$mq->register( $statistics );

=head1 DESCRIPTION

POE::Component::MessageQueue::Statistics is a simple observer that receives
events from the main POE::Component::MessageQueue object to collect usage
statistics.

By itself it will only *gather* statistics, and will not output anything.

To enable outputs, you need to create a separate Publish object:

	POE::Component::MessageQueue::Statistics::Publish::YAML->new(
		output => \*STDERR,
		statistics => $statistics
	);

Please refer to L<POE::Component::MessageQueue::Statistics::Publish> for details
on how to enable output

=head1 SEE ALSO

L<POE::Component::MessageQueue::Statistics::Publish>,
L<POE::Component::MessageQueue::Statistics::Publish::YAML>

=head1 AUTHOR

Daisuke Maki E<lt>daisuke@endeworks.jpE<gt>

=cut