#!/usr/bin/env perl
=head1 NAME
tstatd - Logs real-time accounting daemon
SYNOPSIS
tstatd [ options ] plugin [zone1:]wildcard1 .. [zoneN:]wildcardN
OPTIONS
=over
=item -a I<zone>, --agregate-zone=I<zone>
Agregate data from all anonymous logs (wildcards without explicit
zone specified) into I<zone>. Default behavior is to create new
zone for each anonymous log from its file name.
=item -b I<file>, --database-file=I<file>
Use I<file> as persistent storage to keep accumulated data across
daemon restarts. Default is auto generated from daemon name,
specified identity and '.db' suffix.
=item --basename
Use only base name (excluding directories and suffix) of anonymous log file
for auto-created zones.
=item -c I<dir>, --change-dir=I<dir>
Change current directory to I<dir> before wildcards expanding.
=item -d, --debug
Composition of options: C<--foreground> and C<--log-level=debug>.
=item -f, --foreground
Don't detach daemon from control terminal, logging to C<stderr> instead
log file or syslog.
=item --log-facility=I<name>
Use I<name> as C<facility> for syslog logging (see syslog (3) for list
of available values). Default is 'daemon'.
=item --log-level=I<level>
Set minimal logging level to I<level> (see syslog (3) for list of available
values). Default is 'notice'.
=item --log-file=I<file>
Use logging to I<file> instead of syslog logging (which is default).
=item -e I<num>, --expand-period=I<num>
Do wildcards re-expanding and checking for new and missed logs
every I<num> seconds. Default is '60'.
=item -h, --help
Print brief help message about available options.
=item -i I<string>, --identity=I<string>
Just a string used in title of daemon process, syslog ident (see syslog(3)),
C<--database-file> and C<--pid-file>. Idea behind this options - multiple
C<tstatd> instances running simultaneosly.
=item -l [I<address>:]I<port>, --listen=[I<address>:]I<port>
Specify I<address> and I<port> for TCP listen socket binding.
Default is '127.0.0.1:3638'.
=item --multiple
With this option specified same log file could be included into several
zones (if log name satisifies several wildcards). Default behavior is to
include log file only in first satisified zone.
=item -n I<num>, --windows-num=I<num>
Set number of sliding-windows to I<num>. Default is '60'.
=item -o I<string>, --options=I<string>
Comma-separated plugin supported options (like a mount (8) options).
=item --override-from=I<file>
Load content of I<file> into plugin package namespace.
This is way to easy customize plugin behavior without creating
another plugin.
=item -p I<file>, --pid-file=I<file>
Use I<file> to keep daemon process id. Default is auto generated
from daemon name, specified identity and '.pid' suffix.
=item --parse-error=I<level>
Do logging with I<level> (see syslog (3) for available values) about
all unparsed log lines. Hint: use 'none' for ignoring such lines.
Default is defining by plugin and usually is 'debug'.
=item -r I<pattern>, --regex=I<pattern>
Use I<pattern> instead of plugin default regular expression for
matching log lines.
=item --regex-from=I<file>
Load regular expression from I<file> and use instead of plugin default
regular expression for matching log lines.
=item -s I<num>, --store-period=I<num>
Store accumulated data in a persistent storage every I<num> seconds.
Default is '60'.
=item --timer=I<zone>:I<timer>:I<num>
Create named I<timer> firing every I<num> seconds for I<zone>.
=item -u <user>, --user=I<user>
Change effective privileges of daemon process to I<user>.
=item -v, --version
Print version information of C<tstatd> and exit.
=item -w I<num>, --window-size=<num>
Set size (duration) of sliding window to I<num> seconds.
Default is '10'.
=back
=head1 SEE ALSO
L<Tail::Stat>
=head1 AUTHOR
Oleg A. Mamontov, C<< <oleg@mamontov.net> >>
=head1 COPYRIGHT & LICENSE
This program is free software; you can redistribute it and/or modify it
under the terms of either: the GNU General Public License as published
by the Free Software Foundation; or the Artistic License.
See http://dev.perl.org/licenses/ for more information.
=cut
use strict;
use warnings qw(all);
use Cwd qw(getcwd realpath);
use DateTime;
use File::Basename qw(fileparse);
use FindBin;
use Getopt::Long qw(:config no_auto_abbrev bundling);
use JSON::XS;
use List::Util qw(min);
use Log::Dispatch;
use Log::Dispatch::File;
use Log::Dispatch::Screen;
use Log::Dispatch::Syslog;
use Pid::File::Flock;
use POE qw(Wheel::FollowTail Wheel::ListenAccept Wheel::ReadWrite);
use POSIX qw(setsid setuid strftime);
use Socket;
use Tail::Stat;
use Tie::Hash::Indexed;
# parse command line
my %opts;
GetOptions(\%opts, qw/
agregate-zone|a=s
basename
database-file|b=s
change-dir|c=s
debug|d
foreground|f
log-facility=s
log-file=s
log-level=s
expand-period|e=i
help|h
identity|i=s
listen|l=s
windows-num|n=i
multiple
options|o=s
override-from=s@
parse-error=s
pid-file|p=s
regex|r=s
regex-from=s
timer=s@
store-period|s=i
user|u=s
version|v
window-size|w=i
/) or die usage();
# explicitly requested help
die usage() if $opts{help};
# version requested
print version() and exit if $opts{version};
# no arguments
die usage() if @ARGV < 2;
# try to load requested plugin
my $pname = shift @ARGV;
my $pclass = "Tail::Stat::Plugin::$pname";
eval "require $pclass" or die "can't load plugin '$pname': $@\n";
# parameters defaults & validation
if (exists $opts{'agregate-zone'}) {
die "invalid zone: '$opts{'agregate-zone'}'\n"
if $opts{'agregate-zone'} =~ /[^a-z0-9_-]/;
}
if (exists $opts{identity}) {
die "invalid identity: '$opts{identity}'\n" if $opts{identity} =~ /[^\w]/;
}
$opts{'database-file'} ||= $FindBin::RealScript.($opts{identity} ? '-'.$opts{identity} : '').'.db';
$opts{'database-file'} = realpath $opts{'database-file'};
if (exists $opts{'change-dir'}) {
die "no such directory: '$opts{'change-dir'}'\n"
unless -d $opts{'change-dir'};
}
$opts{'log-facility'} ||= 'daemon';
die "invalid log facility: '$opts{'log-facility'}'\n"
unless $opts{'log-facility'} =~ /^(auth|authpriv|cron|daemon|kern|local[0-7]|mail|news|syslog|user|uucp)$/;
$opts{'log-level'} ||= 'notice';
die "invalid log level: '$opts{'log-level'}'\n"
unless $opts{'log-level'} =~ /^(debug|info|notice|warning|error|critical|alert|emergency)$/;
if ($opts{debug}) {
$opts{'log-level'} = 'debug';
delete $opts{'log-file'};
$opts{foreground} = 1;
}
$opts{'log-file'} = realpath $opts{'log-file'} if exists $opts{'log-file'};
$opts{'expand-period'} = 60 unless exists $opts{'expand-period'};
die "invalid expand period: '$opts{'expand-period'}'\n"
if $opts{'expand-period'} =~ /[^\d]/;
$opts{'listen'} ||= '127.0.0.1:3638';
$opts{'windows-num'} = 60 unless exists $opts{'windows-num'};
die "invalid windows number: '$opts{'windows-num'}'\n"
if $opts{'windows-num'} =~ /[^\d]/;
$opts{'store-period'} = 10 unless exists $opts{'store-period'};
die "invalid store period: '$opts{'store-period'}'\n"
if $opts{'store-period'} =~ /[^\d]/;
$opts{'window-size'} = 10 unless exists $opts{'window-size'};
die "invalid window size: '$opts{'window-size'}'\n"
if $opts{'window-size'} =~ /[^\d]/;
$opts{'parse-error'} ||= $pclass->parse_error;
die "invalid parse error: '$opts{'parse-error'}'\n"
unless $opts{'parse-error'} =~ /^(debug|info|none|notice|warning|error|critical|alert|emergency)$/;
$opts{'pid-file'} ||= $FindBin::RealScript.($opts{identity} ? '-'.$opts{identity} : '').'.pid';
$opts{'pid-file'} = realpath $opts{'pid-file'};
if (exists $opts{'regex-from'}) {
die "options regex and regex-from are mutually exclusive\n"
if exists $opts{regex};
local $/;
open FH, $opts{'regex-from'} or
die "can't read regex: $!\n";
$opts{regex} = <FH>;
}
# loading overrides
for ( @{ $opts{'override-from'} } ) {
local $/;
open FH, $_ or
die "can't read override from '$_': $!\n";
eval "package $pclass; use strict; use warnings qw(all); ".<FH>;
die "can't apply overrides from '$_': $@\n" if $@;
}
defined (my $uid = $opts{'user'} ? getpwnam($opts{'user'}) : $>) or
die "unknown user: $opts{'user'}\n";
# grouping log files by zones, order does matter
# due to support '--multiple' option
my %zones;
tie %zones, 'Tie::Hash::Indexed';
for (@ARGV) {
/^([\w\d\_-]+):(.+)/ && do {
push @{$zones{$1}}, $2;
next;
};
push @{$zones{
$opts{'agregate-zone'} ||
( $opts{basename} ? fileparse($_,qr/\.[^\.]+/) : $_ )
}}, $_;
}
# parsing timers
my %timers;
my %units = (
w => [ 'week', 7 * 86_400 ],
d => [ 'day', 86_400 ],
h => [ 'hour', 3_600 ],
m => [ 'minute', 60 ],
s => [ 'second', 1 ],
);
for (@{ $opts{timer} }) {
my ($z,$n,$p,$u) = /^(\S+):(\S+):(\d+)(w|d|h|m|s)?$/ or
die "invalid timer format: $_\n";
die "no such zone '$z' for timer '$_'\n" unless exists $zones{$z};
die "zone '$z' already has timer '$n'\n" if exists $timers{$z}{$n};
$u ||= 's';
$timers{$z}{$n} = [ $p * $units{$u}[1], $units{$u}[0] ];
}
# listen socket
my $sock = IO::Socket::INET->new(
(
$opts{'listen'} =~ /:/ ?
( LocalAddr => $opts{'listen'} ) :
( LocalPort => $opts{'listen'} )
),
Listen => SOMAXCONN,
ReuseAddr => 1,
) or die "can't create listen socket: $!\n";
# set process privileges
setuid $uid or die "can't setuid to $opts{'user'}: $!\n" unless $uid == $>;
# set process title
$0 = $FindBin::RealScript.': '.$pname.($opts{identity} ? ' ['.$opts{identity}.']' : '');
# fork
unless ($opts{foreground}) {
defined(my $pid = fork) or die "can't fork: $!\n";
exit if $pid;
}
# protecting against second instance running
Pid::File::Flock->new($opts{'pid-file'}) unless $opts{foreground};
# daemonize
unless ($opts{foreground}) {
chdir '/' or die "can't chdir: $!\n";
die "can't create new session: $!\n" if setsid == -1;
open STDIN, '</dev/null' or die "can't close stdin\n";
open STDOUT, '>/dev/null' or die "can't close stdout\n";
open STDERR, '>/dev/null' or die "can't close stderr\n";
}
# logger
(my $log = Log::Dispatch->new)->add(logger());
$log->notice("starting up");
# catch perl warnings
$SIG{__WARN__} = sub { $log->warning(@_) };
# main POE session
POE::Session->create(
inline_states => {
# initializing
_start => sub {
$log->debug("initializing POE session");
# talk POE kernel adjust to the new situation
$_[KERNEL]->has_forked unless $opts{foreground};
# signals
$log->debug("setting up signal handlers");
$_[KERNEL]->sig(HUP => 'hangup');
$_[KERNEL]->sig(INT => 'interrupt');
$_[KERNEL]->sig(TERM => 'terminate');
$_[KERNEL]->sig(USR1 => 'rotate');
# statistics server
$log->debug("creating TCP server");
$_[HEAP]->{server} = POE::Wheel::ListenAccept->new(
Handle => $sock,
AcceptEvent => 'server_accept',
ErrorEvent => 'server_error',
);
# serializer
$log->debug("creating serializer");
$_[HEAP]->{serial} = JSON::XS->new->pretty;
# creating plugin instance
my %popts;
for (split /,/, $opts{options} || '') {
my ($k,$v) = split /=/;
$popts{$k} = defined $v ? $v : 1;
}
$popts{regex} = $opts{regex} if exists $opts{regex};
$_[HEAP]->{plugin} = $pclass->new(%popts);
# setting up zones
$_[HEAP]->{zones} = \%zones;
# load previous data
if (-f $opts{'database-file'}) {
$_[KERNEL]->call($_[SESSION], 'do_load') or return;
}
# create insufficient references
for (keys %zones) {
$_[HEAP]->{data}{$_}{public} ||= {};
$_[HEAP]->{data}{$_}{private} ||= {};
$_[HEAP]->{data}{$_}{windows} ||= [];
$_[HEAP]->{data}{$_}{windows}[0] ||= {};
# call plugin initialization code
$_[HEAP]->{plugin}->init_zone(
$_,
$_[HEAP]->{data}{$_}{public},
$_[HEAP]->{data}{$_}{private},
$_[HEAP]->{data}{$_}{windows}[0],
);
}
# expanding zones wildcards
$_[KERNEL]->call($_[SESSION], 'do_expand');
# creating named timers
$_[HEAP]->{timers} = \%timers;
for my $z ( keys %{ $_[HEAP]->{timers} } ) {
for my $n ( keys %{ $_[HEAP]->{timers}{$z} } ) {
$_[KERNEL]->call($_[SESSION], 'set_timer', $z, $n);
}
}
# schedule save task
if ($opts{'store-period'}) {
$log->debug("scheduling saving heartbeat at $opts{'store-period'} second(s)");
$_[KERNEL]->delay( save_heartbeat => $opts{'store-period'} );
}
# schedule expanding wildcards
if ($opts{'expand-period'}) {
$log->debug("scheduling expanding heartbeat at $opts{'expand-period'} second(s)");
$_[KERNEL]->delay( expand_heartbeat => $opts{'expand-period'} );
}
# schedule windows heartbeat
if ($opts{'window-size'} && $opts{'windows-num'}) {
$log->debug("scheduling windows heartbeat at $opts{'window-size'} second(s)");
$_[KERNEL]->delay( windows_heartbeat => $opts{'window-size'} );
}
},
# expanding task
expand_heartbeat => sub {
$log->debug("wildcards expanding heartbeat occurred");
# expanding zones wildcards
$_[KERNEL]->call($_[SESSION], 'do_expand');
# schedule next call
if ($opts{'expand-period'}) {
$log->debug("scheduling expanding heartbeat at $opts{'expand-period'} second(s)");
$_[KERNEL]->delay( expand_heartbeat => $opts{'expand-period'} );
}
},
# setting named timer
set_timer => sub {
my $t = $_[HEAP]->{timers}{$_[ARG0]}{$_[ARG1]};
my $next = DateTime->now(
time_zone => 'local'
)->add(
seconds => $t->[0]
)->truncate(
to => $t->[1]
);
$log->debug("setting named timer '$_[ARG1]' for zone '$_[ARG0] at '".$next->strftime('%Y-%m-%d %H:%M:%S')."'");
$_[KERNEL]->alarm_set( named_timer => $next->epoch, $_[ARG0], $_[ARG1] );
},
# named timer handler
named_timer => sub {
$log->debug("processing named timer '$_[ARG1]' for zone '$_[ARG0]");
$_[HEAP]->{plugin}->process_timer(
$_[ARG1],
$_[HEAP]->{data}{$_[ARG0]}{public},
$_[HEAP]->{data}{$_[ARG0]}{private},
$_[HEAP]->{data}{$_[ARG0]}{windows}
) ? do {
$log->debug("renewing timer '$_[ARG1]' for zone '$_[ARG0]");
$_[KERNEL]->call($_[SESSION], 'set_timer', $_[ARG0], $_[ARG1]);
} : do {
$log->debug("clearing timer '$_[ARG1]' for zone '$_[ARG0]'");
};
},
# expanding wildcards
do_expand => sub {
$log->debug("begin expanding wildcards");
my $cwd = getcwd;
if ($opts{'change-dir'}) {
chdir $opts{'change-dir'} or
$log->warning("can't change directory to '$opts{'change-dir'}'");
}
my %exif; # existing files
for my $zone ( keys %{ $_[HEAP]->{zones} } ) {
my @files;
push @files, map { realpath $_ } grep { -f } glob $_
for @{ $_[HEAP]->{zones}{$zone} };
$log->debug("found ".scalar(@files)." file(s) in zone '$zone'");
# create missing watchers
FILE:
for my $f (@files) {
$exif{$f}++;
# searching for already monitored file
for my $w ( values %{ $_[HEAP]->{watchers} } ) {
next unless $f eq $w->[0];
unless ($opts{multiple}) {
$log->debug("file '$f' already monitored, ignoring for zone '$zone'");
next FILE;
}
# searching for already subscribed zone
for my $z ( @{ $w->[1] } ) {
next unless $z eq $zone;
$log->debug("zone already subscribed for '$f'");
next FILE;
}
# subscribe to existing watcher
push @{ $w->[1] }, $zone;
$log->debug("zone subscribed for '$f'");
next FILE;
}
# create new watcher and subscribe zone
my $w = POE::Wheel::FollowTail->new(
Filename => $f,
Filter => POE::Filter::Line->new( InputLiteral => "\n" ),
ErrorEvent => 'watcher_err',
InputEvent => 'watcher_line',
ResetEvent => 'watcher_roll',
);
$log->debug("created new watcher [".$w->ID."] for '$f'");
$log->debug("zone subscribed for '$f'");
$_[HEAP]->{watchers}{$w->ID} = [ $f, [ $zone ], $w ];
}
}
# remove excess watchers
for my $w (values %{ $_[HEAP]->{watchers} } ) {
next if $exif{$w->[0]}; # file exists
delete $_[HEAP]->{watchers}{$w->[2]->ID};
$log->debug("excess watcher [".$w->[2]->ID."] removed for '".$w->[0]."'");
}
if ($opts{'change-dir'}) {
chdir $cwd or
$log->warning("can't restore directory to '$cwd'");
}
},
# new log line
watcher_line => sub {
my $w = $_[HEAP]->{watchers}{$_[ARG1]};
my @data = $_[HEAP]->{plugin}->process_line($_[ARG0]) or do {
return $log->log(
level => $opts{'parse-error'},
message => "can't parse: '$_[ARG0]' from '$w->[0]'"
) unless $opts{'parse-error'} eq 'none';
};
# subscribers loop
for my $z ( @{ $w->[1] } ) {
$_[HEAP]->{plugin}->process_data(
\@data,
$_[HEAP]->{data}{$z}{public},
$_[HEAP]->{data}{$z}{private},
$_[HEAP]->{data}{$z}{windows}[0]
);
}
},
# log rotating occurred
watcher_roll => sub {
my $w = $_[HEAP]->{watchers}{$_[ARG0]};
# clear tail fragment
$w->[2][ POE::Wheel::FollowTail::SELF_FILTER ][ POE::Filter::Line::FRAMING_BUFFER ] =~ s/[^\n]+\z//
if $w->[2][ POE::Wheel::FollowTail::SELF_FILTER ][ POE::Filter::Line::FRAMING_BUFFER ];
$log->info("rolled over '$w->[0]'");
},
# log tailing error
watcher_err => sub {
my $w = $_[HEAP]->{watchers}{$_[ARG3]};
$log->error("$_[ARG0] failed ($_[ARG1] during tail '$w->[0]': $_[ARG2]");
},
# windows processing
windows_heartbeat => sub {
$log->debug("windows heartbeat occurred");
# schedule windows processing
for ( keys %{ $_[HEAP]->{zones} } ) {
$_[KERNEL]->yield( do_window => $_ );
}
# schedule next call
$log->debug("scheduling windows heartbeat at $opts{'window-size'} second(s)");
$_[KERNEL]->delay( windows_heartbeat => $opts{'window-size'} );
},
do_window => sub {
# windows ring
my $wins = $_[HEAP]->{data}{$_[ARG0]}{windows};
# call plugin handler with last complete window
$_[HEAP]->{plugin}->process_window(
$_[HEAP]->{data}{$_[ARG0]}{public},
$_[HEAP]->{data}{$_[ARG0]}{private},
$wins
);
# slide windows
unshift @$wins, {};
$#$wins = min $#$wins, $opts{'windows-num'} - 1;
},
# periodically task
save_heartbeat => sub {
$log->debug("saving heartbeat occurred");
# save accumulated data
$_[KERNEL]->call($_[SESSION], 'do_save');
# schedule next call
$log->debug("scheduling saving heartbeat at $opts{'store-period'} second(s)");
$_[KERNEL]->delay( save_heartbeat => $opts{'store-period'} );
},
# loading stored data
do_load => sub {
$log->debug("loading stored data");
open FH, $opts{'database-file'} or do {
$log->error("can't open database file: $!");
return $_[KERNEL]->call($_[SESSION], 'shutdown');
};
local $/;
my $d = $_[HEAP]->{serial}->decode(<FH>) or do {
$log->error("can't read database file: $!");
return $_[KERNEL]->call($_[SESSION], 'shutdown');
};
# assign
$_[HEAP]->{data} = $d->{zones} || {};
},
# store accumulated data
do_save => sub {
$log->debug("storing accumulated data");
open FH, '>', $opts{'database-file'}.'~' or do {
return $log->warning("can't write database file: $!");
};
my $d = { zones => $_[HEAP]->{data} || {} };
print FH $_[HEAP]->{serial}->encode($d);
close FH;
if (-f $opts{'database-file'}) {
unlink $opts{'database-file'} or do {
return $log->warning("can't remove old database file: $!");
};
};
rename $opts{'database-file'}.'~', $opts{'database-file'} or do {
return $log->warning("can't rename new database file: $!");
};
},
# new client accepted
server_accept => sub {
my ($port,$addr) = sockaddr_in $_[ARG1];
$log->debug("client accepted from ".inet_ntoa($addr).":$port");
my $c = POE::Wheel::ReadWrite->new(
Handle => $_[ARG0],
InputEvent => 'client_input',
ErrorEvent => 'client_error',
);
$_[HEAP]->{clients}{$c->ID} = $c;
},
# server error occurred
server_error => sub {
$log->error("$_[ARG0] failed ($_[ARG1] during serving: $_[ARG2]");
$_[KERNEL]->call($_[SESSION], 'shutdown');
},
# got client command
client_input => sub {
$log->debug("got client command: '$_[ARG0]'");
my $cln = $_[HEAP]->{clients}{$_[ARG1]} or
return $log->warning("unknown client #$_[ARG1]");
for ($_[ARG0]) {
# zones list (active & inactive zones)
/^\s*zones\s*$/i and do {
$cln->put(
map { 'a:'.$_ }
keys %{ $_[HEAP]->{zones} },
);
$cln->put(
map { 'i:'.$_ }
grep { ! exists $_[HEAP]->{zones}{$_} }
keys %{ $_[HEAP]->{data} },
);
last;
};
# wildcards list (active zones only)
/^\s*globs\s+(\S+)\s*$/i and do {
my $z = $_[HEAP]->{zones}{$1} or do {
$log->warning("invalid client globs query: '$1'");
$cln->put('no such active zone');
last;
};
$cln->put( sort @{ $z } );
last;
};
# files list (active zones only)
/^\s*files\s+(\S+)\s*$/i and do {
$_[HEAP]->{zones}{$1} or do {
$log->warning("invalid client files query: '$1'");
$cln->put('no such active zone');
last;
};
my @f;
for my $w ( values %{ $_[HEAP]->{watchers} } ) {
for my $z ( @{ $w->[1] } ) {
next unless $z eq $1;
push @f, [ $w->[0], $w->[2]->tell, -s $w->[0] ];
last;
}
}
for ( sort { $a->[0] cmp $b->[0] } @f ) {
$cln->put( join ':', $_->[1] eq '0 but true' ? 0 : $_->[1], $_->[2], $_->[0] );
}
last;
};
# zone dump (active & inactive zones)
/^\s*dump\s+(\S+)\s*$/i and do {
$_[HEAP]->{zones}{$1} or $_[HEAP]->{data}{$1} or do {
$log->warning("invalid client dump query: '$1'");
$cln->put('no such zone');
last;
};
my $wmax = $#{ $_[HEAP]->{data}{$1}{windows} };
$cln->put( $_[HEAP]->{plugin}->dump_zone( $1,
$_[HEAP]->{data}{$1}{public},
$_[HEAP]->{data}{$1}{private},
[ @{ $_[HEAP]->{data}{$1}{windows} }[1..$wmax] ],
));
last;
};
# zone statistics (active & inactive zones)
/^\s*stats\s+(\S+)\s*$/i and do {
$_[HEAP]->{zones}{$1} or $_[HEAP]->{data}{$1} or do {
$log->warning("invalid client stats query: '$1'");
$cln->put('no such zone');
last;
};
my $wmax = $#{ $_[HEAP]->{data}{$1}{windows} };
$cln->put( $_[HEAP]->{plugin}->stats_zone( $1,
$_[HEAP]->{data}{$1}{public},
$_[HEAP]->{data}{$1}{private},
[ @{ $_[HEAP]->{data}{$1}{windows} }[1..$wmax] ],
));
last;
};
# remove zone statistics (inactive zones only)
/^\s*wipe\s+(\S+)\s*$/i and do {
if ($1 eq '*') {
for ( keys %{ $_[HEAP]->{data} } ) {
next if exists $_[HEAP]->{zones}{$_};
delete $_[HEAP]->{data}{$_};
}
} else {
$_[HEAP]->{data}{$1} or do {
$log->warning("invalid client wipe query: '$1'");
$cln->put('no such inactive zone');
last;
};
$_[HEAP]->{zones}{$1} and do {
$log->warning("invalid client wipe query: '$1'");
$cln->put('zone is active');
last;
};
delete $_[HEAP]->{data}{$1};
}
$cln->put( 'ok' );
$_[KERNEL]->call($_[SESSION], 'do_save');
last;
};
# disconnect request
/^\s*quit\s*$/i and do {
return delete $_[HEAP]->{clients}->{$_[ARG1]};
};
# invalid command
$log->warning("invalid client command: '$_[ARG0]'");
$_[HEAP]->{clients}{$_[ARG1]}->put('error');
}
# force buffer flush
$_[HEAP]->{clients}{$_[ARG1]}->flush;
$log->debug("buffer flushed");
},
# client errors (disconnect included)
client_error => sub {
$_[ARG1] ?
$log->error("$_[ARG0] ($_[ARG1] from client: $_[ARG2]") :
$log->debug("client disconnected");
# drop client connection
delete $_[HEAP]->{clients}->{$_[ARG3]};
},
# got SIGHUP
hangup => sub {
$log->notice("got SIGHUP, re-expanging zones wildcards");
# expanding zones wildcards
$_[KERNEL]->call($_[SESSION], 'do_expand');
# keep signal handled
$_[KERNEL]->sig_handled;
},
# got SIGINT
interrupt => sub {
$log->notice("got SIGINT, terminating");
# keep signal handled
$_[KERNEL]->sig_handled;
# shutting down
$_[KERNEL]->call($_[SESSION], 'shutdown');
},
# got SIGTERM
terminate => sub {
$log->notice("got SIGTERM, terminating");
# shutting down
$_[KERNEL]->call($_[SESSION], 'shutdown');
# keep signal handled
$_[KERNEL]->sig_handled;
},
# got SIGUSR1
rotate => sub {
$log->notice("got SIGUSR1, re-opening log file");
# drop & create logger
$log->remove('main');
$log->add(logger());
# keep signal handled
$_[KERNEL]->sig_handled;
},
# graceful exit
shutdown => sub {
$log->debug("gracefully shutting down");
# store statistics
$_[KERNEL]->call($_[SESSION], 'do_save');
# drop timers
$log->debug("removing alarms");
$_[KERNEL]->alarm_remove_all;
# drop server
$log->debug("shutting down server");
delete $_[HEAP]->{server};
# drop clients
$log->debug("disconnecting clients");
delete $_[HEAP]->{clients}{$_} for keys %{ $_[HEAP]->{clients} };
# drop watchers
$log->debug("shutting down watchers");
for (values %{ $_[HEAP]->{watchers} }) {
delete $_[HEAP]->{watchers}{$_->[2]->ID};
$log->debug("shutdown watcher [".$_->[2]->ID."] for '$_->[0]'");
}
},
},
);
# go!
POE::Kernel->run;
$log->notice("exit");
# log object create
sub logger {
return Log::Dispatch::Screen->new(
name => 'main',
callbacks => [ \&pfmt, \&lfmt, \&dfmt ],
min_level => $opts{'log-level'},
stderr => 1
) if $opts{foreground};
return Log::Dispatch::File->new(
name => 'main',
callbacks => [ \&pfmt, \&lfmt, \&dfmt ],
min_level => $opts{'log-level'},
filename => $opts{'log-file'},
mode => '>>'
) if $opts{'log-file'};
return Log::Dispatch::Syslog->new(
name => 'main',
callbacks => [ \&pfmt, ],
min_level => $opts{'log-level'},
facility => $opts{'log-facility'},
ident => $FindBin::RealScript.($opts{identity} ? '-'.$opts{identity} : '' )
);
};
# log formatting routines
sub pfmt {
my %m = @_;
sprintf "$$: %s\n", $m{message};
}
sub lfmt {
my %m = @_;
sprintf "[%s] %s", $m{level}, $m{message};
}
sub dfmt {
my %m = @_;
sprintf "%s %s", strftime("%Y/%m/%d %H:%M:%S",localtime), $m{message};
}
sub usage {
<<EOM;
Usage: $FindBin::RealScript [ options ] plugin [zone1:]wildcard1 .. [zoneN:]wildcardN
More information available under 'Tail::Stat' man page.
Options:
-a, --agregate-zone=ZONE agregating anonymous logs to zone
-b, --database-file=FILE persistent database file
--basename create anonymous zones from base name of log files
-c, --change-dir=DIR change directory before wildards expanding
-d, --debug implies: --foreground --log-level=debug
-f, --foreground no detach, logging to stderr
--log-facility=NAME set facility for syslog logging
--log-level=LEVEL minimum logging level
--log-file=FILE logging to file instead syslog
-e, --expand-period=SECONDS zones wildcard expand period
-h, --help show this help message
-i, --identity=STRING add string to process title, default pid-file,
default database-file and syslog ident
-l, --listen=[ADDR:]PORT TCP statistic server listen socket
--multiple log includes in all expanded wildcard
-n, --windows-num=NUM number of sliding windows (default 60)
-o, --options=STRING comma-separated plugin supported options
(like a mount (8) options)
--override-from=FILE load overriding methods from file
-p, --pid-file=FILE pid file path
--parse-error=LEVEL logging level for unparsed lines
-r, --regex=PATTERN override plugin regular expression
--regex-from=FILE read regular expression from file
-s, --store-period=SECONDS data store period (default 60)
--timer=ZONE:NAME:PERIOD add named timer with fixed period
-u, --user=LOGIN change effective process uid to
-v, --version print version and exit
-w, --window-size=SECONDS size of one sliding window (default 10)
EOM
}
sub version {
<<EOM;
$FindBin::RealScript version $Tail::Stat::VERSION
Copyright (C) 2010 Oleg A. Mamontov
This program is free software; you can redistribute it and/or modify it
under the terms of either: the GNU General Public License as published
by the Free Software Foundation; or the Artistic License.
See http://dev.perl.org/licenses/ for more information.
EOM
}