The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/env perl

=head1 NAME

tstatd - Logs real-time accounting daemon

SYNOPSIS

tstatd [ options ] plugin [zone1:]wildcard1 .. [zoneN:]wildcardN

OPTIONS

=over

=item -a I<zone>, --agregate-zone=I<zone>

Agregate data from all anonymous logs (wildcards without explicit
zone specified) into I<zone>. Default behavior is to create new
zone for each anonymous log from its file name.

=item -b I<file>, --database-file=I<file>

Use I<file> as persistent storage to keep accumulated data across
daemon restarts. Default is auto generated from daemon name,
specified identity and '.db' suffix.

=item --basename

Use only base name (excluding directories and suffix) of anonymous log file
for auto-created zones.

=item -c I<dir>, --change-dir=I<dir>

Change current directory to I<dir> before wildcards expanding.

=item -d, --debug

Composition of options: C<--foreground> and C<--log-level=debug>.

=item -f, --foreground

Don't detach daemon from control terminal, logging to C<stderr> instead
log file or syslog.

=item --log-facility=I<name>

Use I<name> as C<facility> for syslog logging (see syslog (3) for list
of available values). Default is 'daemon'.

=item --log-level=I<level>

Set minimal logging level to I<level> (see syslog (3) for list of available
values). Default is 'notice'.

=item --log-file=I<file>

Use logging to I<file> instead of syslog logging (which is default).

=item -e I<num>, --expand-period=I<num>

Do wildcards re-expanding and checking for new and missed logs
every I<num> seconds. Default is '60'.

=item -h, --help

Print brief help message about available options.

=item -i I<string>, --identity=I<string>

Just a string used in title of daemon process, syslog ident (see syslog(3)),
C<--database-file> and C<--pid-file>. Idea behind this options - multiple
C<tstatd> instances running simultaneosly.

=item -l [I<address>:]I<port>, --listen=[I<address>:]I<port>

Specify I<address> and I<port> for TCP listen socket binding.
Default is '127.0.0.1:3638'.

=item --multiple

With this option specified same log file could be included into several
zones (if log name satisifies several wildcards). Default behavior is to
include log file only in first satisified zone.

=item -n I<num>, --windows-num=I<num>

Set number of sliding-windows to I<num>. Default is '60'.

=item -o I<string>, --options=I<string>

Comma-separated plugin supported options (like a mount (8) options).

=item --override-from=I<file>

Load content of I<file> into plugin package namespace.
This is way to easy customize plugin behavior without creating
another plugin.

=item -p I<file>, --pid-file=I<file>

Use I<file> to keep daemon process id. Default is auto generated
from daemon name, specified identity and '.pid' suffix.

=item --parse-error=I<level>

Do logging with I<level> (see syslog (3) for available values) about
all unparsed log lines. Hint: use 'none' for ignoring such lines.
Default is defining by plugin and usually is 'debug'.

=item -r I<pattern>, --regex=I<pattern>

Use I<pattern> instead of plugin default regular expression for
matching log lines.

=item --regex-from=I<file>

Load regular expression from I<file> and use instead of plugin default
regular expression for matching log lines.

=item -s I<num>, --store-period=I<num>

Store accumulated data in a persistent storage every I<num> seconds.
Default is '60'.

=item --timer=I<zone>:I<timer>:I<num>

Create named I<timer> firing every I<num> seconds for I<zone>.

=item -u <user>, --user=I<user>

Change effective privileges of daemon process to I<user>.

=item -v, --version

Print version information of C<tstatd> and exit.

=item -w I<num>, --window-size=<num>

Set size (duration) of sliding window to I<num> seconds.
Default is '10'.

=back


=head1 SEE ALSO

L<Tail::Stat>


=head1 AUTHOR

Oleg A. Mamontov, C<< <oleg@mamontov.net> >>


=head1 COPYRIGHT & LICENSE

This program is free software; you can redistribute it and/or modify it
under the terms of either: the GNU General Public License as published
by the Free Software Foundation; or the Artistic License.

See http://dev.perl.org/licenses/ for more information.


=cut

use strict;
use warnings qw(all);

use Cwd qw(getcwd realpath);
use DateTime;
use File::Basename qw(fileparse);
use FindBin;
use Getopt::Long qw(:config no_auto_abbrev bundling);
use JSON::XS;
use List::Util qw(min);
use Log::Dispatch;
use Log::Dispatch::File;
use Log::Dispatch::Screen;
use Log::Dispatch::Syslog;
use Pid::File::Flock;
use POE qw(Wheel::FollowTail Wheel::ListenAccept Wheel::ReadWrite);
use POSIX qw(setsid setuid strftime);
use Socket;
use Tail::Stat;
use Tie::Hash::Indexed;

# parse command line
my %opts;
GetOptions(\%opts, qw/
	agregate-zone|a=s
	basename
	database-file|b=s
	change-dir|c=s
	debug|d
	foreground|f
	log-facility=s
	log-file=s
	log-level=s
	expand-period|e=i
	help|h
	identity|i=s
	listen|l=s
	windows-num|n=i
	multiple
	options|o=s
	override-from=s@
	parse-error=s
	pid-file|p=s
	regex|r=s
	regex-from=s
	timer=s@
	store-period|s=i
	user|u=s
	version|v
	window-size|w=i
/) or die usage();

# explicitly requested help
die usage() if $opts{help};

# version requested
print version() and exit if $opts{version};

# no arguments
die usage() if @ARGV < 2;

# try to load requested plugin
my $pname  = shift @ARGV;
my $pclass = "Tail::Stat::Plugin::$pname";
eval "require $pclass" or die "can't load plugin '$pname': $@\n";


# parameters defaults & validation
if (exists $opts{'agregate-zone'}) {
	die "invalid zone: '$opts{'agregate-zone'}'\n"
		if $opts{'agregate-zone'} =~ /[^a-z0-9_-]/;
}

if (exists $opts{identity}) {
	die "invalid identity: '$opts{identity}'\n" if $opts{identity} =~ /[^\w]/;
}

$opts{'database-file'} ||= $FindBin::RealScript.($opts{identity} ? '-'.$opts{identity} : '').'.db';
$opts{'database-file'} = realpath $opts{'database-file'};

if (exists $opts{'change-dir'}) {
	die "no such directory: '$opts{'change-dir'}'\n"
		unless -d $opts{'change-dir'};
}

$opts{'log-facility'} ||= 'daemon';
die "invalid log facility: '$opts{'log-facility'}'\n"
	unless $opts{'log-facility'} =~ /^(auth|authpriv|cron|daemon|kern|local[0-7]|mail|news|syslog|user|uucp)$/;

$opts{'log-level'} ||= 'notice';
die "invalid log level: '$opts{'log-level'}'\n"
	unless $opts{'log-level'} =~ /^(debug|info|notice|warning|error|critical|alert|emergency)$/;

if ($opts{debug}) {
	$opts{'log-level'} = 'debug';
	delete $opts{'log-file'};
	$opts{foreground}  = 1;
}

$opts{'log-file'} = realpath $opts{'log-file'} if exists $opts{'log-file'};

$opts{'expand-period'} = 60 unless exists $opts{'expand-period'};
die "invalid expand period: '$opts{'expand-period'}'\n"
	if $opts{'expand-period'} =~ /[^\d]/;

$opts{'listen'} ||= '127.0.0.1:3638';

$opts{'windows-num'} = 60  unless exists $opts{'windows-num'};
die "invalid windows number: '$opts{'windows-num'}'\n"
	if $opts{'windows-num'} =~ /[^\d]/;

$opts{'store-period'} = 10 unless exists $opts{'store-period'};
die "invalid store period: '$opts{'store-period'}'\n"
	if $opts{'store-period'} =~ /[^\d]/;

$opts{'window-size'} = 10 unless exists $opts{'window-size'};
die "invalid window size: '$opts{'window-size'}'\n"
	if $opts{'window-size'} =~ /[^\d]/;

$opts{'parse-error'} ||= $pclass->parse_error;
die "invalid parse error: '$opts{'parse-error'}'\n"
	unless $opts{'parse-error'} =~ /^(debug|info|none|notice|warning|error|critical|alert|emergency)$/;

$opts{'pid-file'} ||= $FindBin::RealScript.($opts{identity} ? '-'.$opts{identity} : '').'.pid';
$opts{'pid-file'} = realpath $opts{'pid-file'};

if (exists $opts{'regex-from'}) {
	die "options regex and regex-from are mutually exclusive\n"
		if exists $opts{regex};
	local $/;
	open FH, $opts{'regex-from'} or
		die "can't read regex: $!\n";
	$opts{regex} = <FH>;
}

# loading overrides
for ( @{ $opts{'override-from'} } ) {
	local $/;
	open FH, $_ or
		die "can't read override from '$_': $!\n";
	eval "package $pclass; use strict; use warnings qw(all); ".<FH>;
	die "can't apply overrides from '$_': $@\n" if $@;
}

defined (my $uid = $opts{'user'} ? getpwnam($opts{'user'}) : $>) or
	die "unknown user: $opts{'user'}\n";

# grouping log files by zones, order does matter
# due to support '--multiple' option
my %zones;
tie %zones, 'Tie::Hash::Indexed';
for (@ARGV) {
	/^([\w\d\_-]+):(.+)/ && do {
		push @{$zones{$1}}, $2;
		next;
	};
	push @{$zones{
		$opts{'agregate-zone'} ||
		( $opts{basename} ? fileparse($_,qr/\.[^\.]+/) : $_ )
	}}, $_;
}

# parsing timers
my %timers;
my %units = (
	w => [ 'week',   7 * 86_400 ],
	d => [ 'day',    86_400 ],
	h => [ 'hour',   3_600 ],
	m => [ 'minute', 60 ],
	s => [ 'second', 1 ],
);
for (@{ $opts{timer} }) {
	my ($z,$n,$p,$u) = /^(\S+):(\S+):(\d+)(w|d|h|m|s)?$/ or
		die "invalid timer format: $_\n";
	die "no such zone '$z' for timer '$_'\n" unless exists $zones{$z};
	die "zone '$z' already has timer '$n'\n" if exists $timers{$z}{$n};

	$u ||= 's';
	$timers{$z}{$n} = [ $p * $units{$u}[1], $units{$u}[0] ];
}

# listen socket
my $sock = IO::Socket::INET->new(
	(
		$opts{'listen'} =~ /:/ ?
		( LocalAddr => $opts{'listen'} ) :
		( LocalPort => $opts{'listen'} )
	),
	Listen    => SOMAXCONN,
	ReuseAddr => 1,
) or die "can't create listen socket: $!\n";

# set process privileges
setuid $uid or die "can't setuid to $opts{'user'}: $!\n" unless $uid == $>;

# set process title
$0 = $FindBin::RealScript.': '.$pname.($opts{identity} ? ' ['.$opts{identity}.']' : '');

# fork
unless ($opts{foreground}) {
	defined(my $pid = fork) or die "can't fork: $!\n";
	exit if $pid;
}

# protecting against second instance running
Pid::File::Flock->new($opts{'pid-file'}) unless $opts{foreground};

# daemonize
unless ($opts{foreground}) {
	chdir '/' or die "can't chdir: $!\n";
	die "can't create new session: $!\n" if setsid == -1;
	open STDIN,  '</dev/null' or die "can't close stdin\n";
	open STDOUT, '>/dev/null' or die "can't close stdout\n";
	open STDERR, '>/dev/null' or die "can't close stderr\n";
}

# logger
(my $log = Log::Dispatch->new)->add(logger());
$log->notice("starting up");

# catch perl warnings
$SIG{__WARN__} = sub { $log->warning(@_) };

# main POE session
POE::Session->create(
	inline_states => {

		# initializing
		_start => sub {
			$log->debug("initializing POE session");

			# talk POE kernel adjust to the new situation
			$_[KERNEL]->has_forked unless $opts{foreground};

			# signals
			$log->debug("setting up signal handlers");
			$_[KERNEL]->sig(HUP  => 'hangup');
			$_[KERNEL]->sig(INT  => 'interrupt');
			$_[KERNEL]->sig(TERM => 'terminate');
			$_[KERNEL]->sig(USR1 => 'rotate');

			# statistics server
			$log->debug("creating TCP server");
			$_[HEAP]->{server} = POE::Wheel::ListenAccept->new(
				Handle      => $sock,
				AcceptEvent => 'server_accept',
				ErrorEvent  => 'server_error',
			);

			# serializer
			$log->debug("creating serializer");
			$_[HEAP]->{serial} = JSON::XS->new->pretty;

			# creating plugin instance
			my %popts;
			for (split /,/, $opts{options} || '') {
				my ($k,$v) = split /=/;
				$popts{$k} = defined $v ? $v : 1;
			}
			$popts{regex} = $opts{regex} if exists $opts{regex};
			$_[HEAP]->{plugin} = $pclass->new(%popts);

			# setting up zones
			$_[HEAP]->{zones} = \%zones;

			# load previous data
			if (-f $opts{'database-file'}) {
				$_[KERNEL]->call($_[SESSION], 'do_load') or return;
			}

			# create insufficient references
			for (keys %zones) {
				$_[HEAP]->{data}{$_}{public}     ||= {};
				$_[HEAP]->{data}{$_}{private}    ||= {};
				$_[HEAP]->{data}{$_}{windows}    ||= [];
				$_[HEAP]->{data}{$_}{windows}[0] ||= {};

				# call plugin initialization code
				$_[HEAP]->{plugin}->init_zone(
					$_,
					$_[HEAP]->{data}{$_}{public},
					$_[HEAP]->{data}{$_}{private},
					$_[HEAP]->{data}{$_}{windows}[0],
				);
			}

			# expanding zones wildcards
			$_[KERNEL]->call($_[SESSION], 'do_expand');

			# creating named timers
			$_[HEAP]->{timers} = \%timers;
			for my $z ( keys %{ $_[HEAP]->{timers} } ) {
				for my $n ( keys %{ $_[HEAP]->{timers}{$z} } ) {
					$_[KERNEL]->call($_[SESSION], 'set_timer', $z, $n);
				}
			}

			# schedule save task
			if ($opts{'store-period'}) {
				$log->debug("scheduling saving heartbeat at $opts{'store-period'} second(s)");
				$_[KERNEL]->delay( save_heartbeat => $opts{'store-period'} );
			}

			# schedule expanding wildcards
			if ($opts{'expand-period'}) {
				$log->debug("scheduling expanding heartbeat at $opts{'expand-period'} second(s)");
				$_[KERNEL]->delay( expand_heartbeat => $opts{'expand-period'} );
			}

			# schedule windows heartbeat
			if ($opts{'window-size'} && $opts{'windows-num'}) {
				$log->debug("scheduling windows heartbeat at $opts{'window-size'} second(s)");
				$_[KERNEL]->delay( windows_heartbeat => $opts{'window-size'} );
			}
		},

		# expanding task
		expand_heartbeat => sub {
			$log->debug("wildcards expanding heartbeat occurred");

			# expanding zones wildcards
			$_[KERNEL]->call($_[SESSION], 'do_expand');

			# schedule next call
			if ($opts{'expand-period'}) {
				$log->debug("scheduling expanding heartbeat at $opts{'expand-period'} second(s)");
				$_[KERNEL]->delay( expand_heartbeat => $opts{'expand-period'} );
			}
		},

		# setting named timer
		set_timer => sub {
			my $t = $_[HEAP]->{timers}{$_[ARG0]}{$_[ARG1]};
			my $next = DateTime->now(
				time_zone => 'local'
			)->add(
				seconds => $t->[0]
			)->truncate(
				to => $t->[1]
			);
			$log->debug("setting named timer '$_[ARG1]' for zone '$_[ARG0] at '".$next->strftime('%Y-%m-%d %H:%M:%S')."'");
			$_[KERNEL]->alarm_set( named_timer => $next->epoch, $_[ARG0], $_[ARG1] );
		},

		# named timer handler
		named_timer => sub {
			$log->debug("processing named timer '$_[ARG1]' for zone '$_[ARG0]");
			$_[HEAP]->{plugin}->process_timer(
				$_[ARG1],
				$_[HEAP]->{data}{$_[ARG0]}{public},
				$_[HEAP]->{data}{$_[ARG0]}{private},
				$_[HEAP]->{data}{$_[ARG0]}{windows}
			) ? do {
				$log->debug("renewing timer '$_[ARG1]' for zone '$_[ARG0]");
				$_[KERNEL]->call($_[SESSION], 'set_timer', $_[ARG0], $_[ARG1]);
			} : do {
				$log->debug("clearing timer '$_[ARG1]' for zone '$_[ARG0]'");
			};
		},

		# expanding wildcards
		do_expand => sub {
			$log->debug("begin expanding wildcards");

			my $cwd = getcwd;
			if ($opts{'change-dir'}) {
				chdir $opts{'change-dir'} or
					$log->warning("can't change directory to '$opts{'change-dir'}'");
			}

			my %exif;  # existing files
			for my $zone ( keys %{ $_[HEAP]->{zones} } ) {
				my @files;
				push @files, map { realpath $_ } grep { -f } glob $_
					for @{ $_[HEAP]->{zones}{$zone} };
				$log->debug("found ".scalar(@files)." file(s) in zone '$zone'");

				# create missing watchers
				FILE:
				for my $f (@files) {
					$exif{$f}++;

					# searching for already monitored file
					for my $w ( values %{ $_[HEAP]->{watchers} } ) {
						next unless $f eq $w->[0];

						unless ($opts{multiple}) {
							$log->debug("file '$f' already monitored, ignoring for zone '$zone'");
							next FILE;
						}

						# searching for already subscribed zone
						for my $z ( @{ $w->[1] } ) {
							next unless $z eq $zone;
							$log->debug("zone already subscribed for '$f'");
							next FILE;
						}

						# subscribe to existing watcher
						push @{ $w->[1] }, $zone;
						$log->debug("zone subscribed for '$f'");
						next FILE;
					}

					# create new watcher and subscribe zone
					my $w = POE::Wheel::FollowTail->new(
						Filename   => $f,
						Filter     => POE::Filter::Line->new( InputLiteral => "\n" ),
						ErrorEvent => 'watcher_err',
						InputEvent => 'watcher_line',
						ResetEvent => 'watcher_roll',
					);
					$log->debug("created new watcher [".$w->ID."] for '$f'");
					$log->debug("zone subscribed for '$f'");
					$_[HEAP]->{watchers}{$w->ID} = [ $f, [ $zone ], $w ];
				}
			}

			# remove excess watchers
			for my $w (values %{ $_[HEAP]->{watchers} } ) {
				next if $exif{$w->[0]};  # file exists

				delete $_[HEAP]->{watchers}{$w->[2]->ID};
				$log->debug("excess watcher [".$w->[2]->ID."] removed for '".$w->[0]."'");
			}

			if ($opts{'change-dir'}) {
				chdir $cwd or
					$log->warning("can't restore directory to '$cwd'");
			}
		},

		# new log line
		watcher_line => sub {
			my $w = $_[HEAP]->{watchers}{$_[ARG1]};
			my @data = $_[HEAP]->{plugin}->process_line($_[ARG0]) or do {
				return $log->log(
					level   => $opts{'parse-error'},
					message => "can't parse: '$_[ARG0]' from '$w->[0]'"
				) unless $opts{'parse-error'} eq 'none';
			};
			# subscribers loop
			for my $z ( @{ $w->[1] } ) {
				$_[HEAP]->{plugin}->process_data(
					\@data,
					$_[HEAP]->{data}{$z}{public},
					$_[HEAP]->{data}{$z}{private},
					$_[HEAP]->{data}{$z}{windows}[0]
				);
			}
		},

		# log rotating occurred
		watcher_roll => sub {
			my $w = $_[HEAP]->{watchers}{$_[ARG0]};
			# clear tail fragment
			$w->[2][ POE::Wheel::FollowTail::SELF_FILTER ][ POE::Filter::Line::FRAMING_BUFFER ] =~ s/[^\n]+\z//
				if $w->[2][ POE::Wheel::FollowTail::SELF_FILTER ][ POE::Filter::Line::FRAMING_BUFFER ];
			$log->info("rolled over '$w->[0]'");
		},

		# log tailing error
		watcher_err => sub {
			my $w = $_[HEAP]->{watchers}{$_[ARG3]};
			$log->error("$_[ARG0] failed ($_[ARG1] during tail '$w->[0]': $_[ARG2]");
		},

		# windows processing
		windows_heartbeat => sub {
			$log->debug("windows heartbeat occurred");

			# schedule windows processing
			for ( keys %{ $_[HEAP]->{zones} } ) {
				$_[KERNEL]->yield( do_window => $_ );
			}

			# schedule next call
			$log->debug("scheduling windows heartbeat at $opts{'window-size'} second(s)");
			$_[KERNEL]->delay( windows_heartbeat => $opts{'window-size'} );
		},

		do_window => sub {
			# windows ring
			my $wins = $_[HEAP]->{data}{$_[ARG0]}{windows};

			# call plugin handler with last complete window
			$_[HEAP]->{plugin}->process_window(
				$_[HEAP]->{data}{$_[ARG0]}{public},
				$_[HEAP]->{data}{$_[ARG0]}{private},
				$wins
			);

			# slide windows
			unshift @$wins, {};
			$#$wins = min $#$wins, $opts{'windows-num'} - 1;
		},

		# periodically task
		save_heartbeat => sub {
			$log->debug("saving heartbeat occurred");

			# save accumulated data
			$_[KERNEL]->call($_[SESSION], 'do_save');

			# schedule next call
			$log->debug("scheduling saving heartbeat at $opts{'store-period'} second(s)");
			$_[KERNEL]->delay( save_heartbeat => $opts{'store-period'} );
		},

		# loading stored data
		do_load => sub {
			$log->debug("loading stored data");
			open FH, $opts{'database-file'} or do {
				$log->error("can't open database file: $!");
				return $_[KERNEL]->call($_[SESSION], 'shutdown');
			};
			local $/;
			my $d = $_[HEAP]->{serial}->decode(<FH>) or do {
				$log->error("can't read database file: $!");
				return $_[KERNEL]->call($_[SESSION], 'shutdown');
			};

			# assign
			$_[HEAP]->{data} = $d->{zones} || {};
		},

		# store accumulated data
		do_save => sub {
			$log->debug("storing accumulated data");
			open FH, '>', $opts{'database-file'}.'~' or do {
				return $log->warning("can't write database file: $!");
			};
			my $d = { zones => $_[HEAP]->{data} || {} };
			print FH $_[HEAP]->{serial}->encode($d);
			close FH;

			if (-f $opts{'database-file'}) {
				unlink $opts{'database-file'} or do {
					return $log->warning("can't remove old database file: $!");
				};
			};
			rename $opts{'database-file'}.'~', $opts{'database-file'} or do {
				return $log->warning("can't rename new database file: $!");
			};
		},

		# new client accepted
		server_accept => sub {
			my ($port,$addr) = sockaddr_in $_[ARG1];
			$log->debug("client accepted from ".inet_ntoa($addr).":$port");
			my $c = POE::Wheel::ReadWrite->new(
				Handle     => $_[ARG0],
				InputEvent => 'client_input',
				ErrorEvent => 'client_error',
			);
			$_[HEAP]->{clients}{$c->ID} = $c;
		},

		# server error occurred
		server_error => sub {
			$log->error("$_[ARG0] failed ($_[ARG1] during serving: $_[ARG2]");
			$_[KERNEL]->call($_[SESSION], 'shutdown');
		},

		# got client command
		client_input => sub {
			$log->debug("got client command: '$_[ARG0]'");
			my $cln = $_[HEAP]->{clients}{$_[ARG1]} or
				return $log->warning("unknown client #$_[ARG1]");

			for ($_[ARG0]) {

				# zones list (active & inactive zones)
				/^\s*zones\s*$/i and do {
					$cln->put(
						map { 'a:'.$_ }
						keys %{ $_[HEAP]->{zones} },
					);
					$cln->put(
						map { 'i:'.$_ }
						grep { ! exists $_[HEAP]->{zones}{$_} }
						keys %{ $_[HEAP]->{data} },
					);
					last;
				};

				# wildcards list (active zones only)
				/^\s*globs\s+(\S+)\s*$/i and do {
					my $z = $_[HEAP]->{zones}{$1} or do {
						$log->warning("invalid client globs query: '$1'");
						$cln->put('no such active zone');
						last;
					};
					$cln->put( sort @{ $z } );
					last;
				};

				# files list (active zones only)
				/^\s*files\s+(\S+)\s*$/i and do {
					$_[HEAP]->{zones}{$1} or do {
						$log->warning("invalid client files query: '$1'");
						$cln->put('no such active zone');
						last;
					};

					my @f;
					for my $w ( values %{ $_[HEAP]->{watchers} } ) {
						for my $z ( @{ $w->[1] } ) {
							next unless $z eq $1;
							push @f, [ $w->[0], $w->[2]->tell, -s $w->[0] ];
							last;
						}
					}
					for ( sort { $a->[0] cmp $b->[0] } @f ) {
						$cln->put( join ':', $_->[1] eq '0 but true' ? 0 : $_->[1], $_->[2], $_->[0] );
					}
					last;
				};

				# zone dump (active & inactive zones)
				/^\s*dump\s+(\S+)\s*$/i and do {
					$_[HEAP]->{zones}{$1} or $_[HEAP]->{data}{$1} or do {
						$log->warning("invalid client dump query: '$1'");
						$cln->put('no such zone');
						last;
					};
					my $wmax = $#{ $_[HEAP]->{data}{$1}{windows} };
					$cln->put( $_[HEAP]->{plugin}->dump_zone( $1,
						$_[HEAP]->{data}{$1}{public},
						$_[HEAP]->{data}{$1}{private},
						[ @{ $_[HEAP]->{data}{$1}{windows} }[1..$wmax] ],
					));
					last;
				};

				# zone statistics (active & inactive zones)
				/^\s*stats\s+(\S+)\s*$/i and do {
					$_[HEAP]->{zones}{$1} or $_[HEAP]->{data}{$1} or do {
						$log->warning("invalid client stats query: '$1'");
						$cln->put('no such zone');
						last;
					};
					my $wmax = $#{ $_[HEAP]->{data}{$1}{windows} };
					$cln->put( $_[HEAP]->{plugin}->stats_zone( $1,
						$_[HEAP]->{data}{$1}{public},
						$_[HEAP]->{data}{$1}{private},
						[ @{ $_[HEAP]->{data}{$1}{windows} }[1..$wmax] ],
					));
					last;
				};

				# remove zone statistics (inactive zones only)
				/^\s*wipe\s+(\S+)\s*$/i and do {
					if ($1 eq '*') {
						for ( keys %{ $_[HEAP]->{data} } ) {
							next if exists $_[HEAP]->{zones}{$_};
							delete $_[HEAP]->{data}{$_};
						}
					} else {
						$_[HEAP]->{data}{$1} or do {
							$log->warning("invalid client wipe query: '$1'");
							$cln->put('no such inactive zone');
							last;
						};
						$_[HEAP]->{zones}{$1} and do {
							$log->warning("invalid client wipe query: '$1'");
							$cln->put('zone is active');
							last;
						};
						delete $_[HEAP]->{data}{$1};
					}
					$cln->put( 'ok' );
					$_[KERNEL]->call($_[SESSION], 'do_save');
					last;
				};


				# disconnect request
				/^\s*quit\s*$/i and do {
					return delete $_[HEAP]->{clients}->{$_[ARG1]};
				};

				# invalid command
				$log->warning("invalid client command: '$_[ARG0]'");
				$_[HEAP]->{clients}{$_[ARG1]}->put('error');
			}

			# force buffer flush
			$_[HEAP]->{clients}{$_[ARG1]}->flush;
			$log->debug("buffer flushed");
		},

		# client errors (disconnect included)
		client_error => sub {
			$_[ARG1] ?
				$log->error("$_[ARG0] ($_[ARG1] from client: $_[ARG2]") :
				$log->debug("client disconnected");
			# drop client connection
			delete $_[HEAP]->{clients}->{$_[ARG3]};
		},

		# got SIGHUP
		hangup => sub {
			$log->notice("got SIGHUP, re-expanging zones wildcards");

			# expanding zones wildcards
			$_[KERNEL]->call($_[SESSION], 'do_expand');

			# keep signal handled
			$_[KERNEL]->sig_handled;
		},

		# got SIGINT
		interrupt => sub {
			$log->notice("got SIGINT, terminating");

			# keep signal handled
			$_[KERNEL]->sig_handled;

			# shutting down
			$_[KERNEL]->call($_[SESSION], 'shutdown');
		},

		# got SIGTERM
		terminate => sub {
			$log->notice("got SIGTERM, terminating");

			# shutting down
			$_[KERNEL]->call($_[SESSION], 'shutdown');

			# keep signal handled
			$_[KERNEL]->sig_handled;
		},

		# got SIGUSR1
		rotate => sub {
			$log->notice("got SIGUSR1, re-opening log file");

			# drop & create logger
			$log->remove('main');
			$log->add(logger());

			# keep signal handled
			$_[KERNEL]->sig_handled;
		},

		# graceful exit
		shutdown => sub {
			$log->debug("gracefully shutting down");

			# store statistics
			$_[KERNEL]->call($_[SESSION], 'do_save');

			# drop timers
			$log->debug("removing alarms");
			$_[KERNEL]->alarm_remove_all;

			# drop server
			$log->debug("shutting down server");
			delete $_[HEAP]->{server};

			# drop clients
			$log->debug("disconnecting clients");
			delete $_[HEAP]->{clients}{$_} for keys %{ $_[HEAP]->{clients} };

			# drop watchers
			$log->debug("shutting down watchers");
			for (values %{ $_[HEAP]->{watchers} }) {
				delete $_[HEAP]->{watchers}{$_->[2]->ID};
				$log->debug("shutdown watcher [".$_->[2]->ID."] for '$_->[0]'");
			}
		},
	},
);


# go!
POE::Kernel->run;

$log->notice("exit");

# log object create
sub logger {
	return Log::Dispatch::Screen->new(
		name      => 'main',
		callbacks => [ \&pfmt, \&lfmt, \&dfmt ],
		min_level => $opts{'log-level'},
		stderr    => 1
	) if $opts{foreground};

	return Log::Dispatch::File->new(
		name      => 'main',
		callbacks => [ \&pfmt, \&lfmt, \&dfmt ],
		min_level => $opts{'log-level'},
		filename  => $opts{'log-file'},
		mode      => '>>'
	) if $opts{'log-file'};

	return Log::Dispatch::Syslog->new(
		name      => 'main',
		callbacks => [ \&pfmt, ],
		min_level => $opts{'log-level'},
		facility  => $opts{'log-facility'},
		ident     => $FindBin::RealScript.($opts{identity} ? '-'.$opts{identity} : '' )
	);
};

# log formatting routines
sub pfmt {
	my %m = @_;
	sprintf "$$: %s\n", $m{message};
}
sub lfmt {
	my %m = @_;
	sprintf "[%s] %s", $m{level}, $m{message};
}
sub dfmt {
	my %m = @_;
	sprintf "%s %s", strftime("%Y/%m/%d %H:%M:%S",localtime), $m{message};
}

sub usage {
	<<EOM;

Usage: $FindBin::RealScript [ options ] plugin [zone1:]wildcard1 .. [zoneN:]wildcardN

More information available under 'Tail::Stat' man page.

Options:
    -a, --agregate-zone=ZONE     agregating anonymous logs to zone
    -b, --database-file=FILE     persistent database file
        --basename               create anonymous zones from base name of log files
    -c, --change-dir=DIR         change directory before wildards expanding
    -d, --debug                  implies: --foreground --log-level=debug
    -f, --foreground             no detach, logging to stderr
        --log-facility=NAME      set facility for syslog logging
        --log-level=LEVEL        minimum logging level
        --log-file=FILE          logging to file instead syslog
    -e, --expand-period=SECONDS  zones wildcard expand period
    -h, --help                   show this help message
    -i, --identity=STRING        add string to process title, default pid-file,
                                 default database-file and syslog ident
    -l, --listen=[ADDR:]PORT     TCP statistic server listen socket
        --multiple               log includes in all expanded wildcard
    -n, --windows-num=NUM        number of sliding windows (default 60)
    -o, --options=STRING         comma-separated plugin supported options
                                 (like a mount (8) options)
        --override-from=FILE     load overriding methods from file
    -p, --pid-file=FILE          pid file path
        --parse-error=LEVEL      logging level for unparsed lines
    -r, --regex=PATTERN          override plugin regular expression
        --regex-from=FILE        read regular expression from file
    -s, --store-period=SECONDS   data store period (default 60)
        --timer=ZONE:NAME:PERIOD add named timer with fixed period
    -u, --user=LOGIN             change effective process uid to
    -v, --version                print version and exit
    -w, --window-size=SECONDS    size of one sliding window (default 10)

EOM
}

sub version {
	<<EOM;

$FindBin::RealScript version $Tail::Stat::VERSION

Copyright (C) 2010 Oleg A. Mamontov

This program is free software; you can redistribute it and/or modify it
under the terms of either: the GNU General Public License as published
by the Free Software Foundation; or the Artistic License.

See http://dev.perl.org/licenses/ for more information.

EOM
}