# Schedule::Load::Reporter.pm -- distributed lock handler
# See copyright, etc in below POD section.
######################################################################
package Schedule::Load::Reporter;
require 5.004;
require Exporter;
@ISA = qw(Exporter);
use Socket;
use IO::Socket;
use IO::Select; # IO::Select is ok instead of IO::Poll as we only have at max 2 handles
use POSIX;
use Proc::ProcessTable;
use Unix::Processors;
use Storable qw();
use Schedule::Load qw (:_utils);
use Schedule::Load::FakeReporter;
use Sys::Hostname;
use Time::HiRes qw (gettimeofday);
use IPC::PidStat;
use Config;
use strict;
use vars qw($VERSION $RSCHLIB $Debug %User_Names %Pid_Inherit
@Pid_Time_Base @Pid_Time $Os_Linux
$Distrust_Pctcpu $Divide_Pctcpu_By_Cpu $ProcTimeToSec
$Exister
);
use Carp;
######################################################################
#### Configuration Section
# Other configurable settings.
$Debug = $Schedule::Load::Debug;
$VERSION = '3.064';
$RSCHLIB = '/usr/local/lib'; # Edited by Makefile
$Os_Linux = $Config{osname} =~ /linux/i;
$Distrust_Pctcpu = $Config{osname} !~ /solaris/i; # Only solaris has instantanous reporting
$Divide_Pctcpu_By_Cpu = 0; # Older linuxes may require this
$ProcTimeToSec = ($Config{osname} =~ /linux/i) ? 1e-6 : 1e-3; # Fix in Proc::ProcessTable 0.40
######################################################################
#### Globals
# This is the self elemenst sent over the socket:
# $self->{const}{config_element_name} = value # Such as things from ENV
# $self->{load}{load_element} = value # Overall loading info
# $self->{proc}{process#}{proc_element} = value # Per process info
# Cache of user name based on UID
%User_Names = ();
# Cache of fixed loads based on PID
%Pid_Inherit = ();
######################################################################
#### Creator
sub start {
# Establish the reporter
@_ >= 1 or croak 'usage: Schedule::Load::Reporter->start ({options})';
my $proto = shift;
my $class = ref($proto) || $proto;
my $self = {
%Schedule::Load::_Default_Params,
#Documented
#Undocumented
timeout=>$Debug?2:30, # Sec before host socket connect times out
alive_time=>$Debug?10:30, # Sec to send alive message (must be sooner than Chooser's ping_dead_time)
stats_interval=>$Debug?2:60, # Sec between polling of interval based plugin statistics
const_changed=>0, # const or stored has changed, update in chooser
plugins => [], # Plugin objects
@_};
bless $self, $class;
# More defaults (can't be above due to needing other elements)
$self->{const}{hostname} ||= hostname();
$self->{const}{slreportd_hostname} ||= hostname();
$self->{const}{slreportd_version} ||= $VERSION;
$self->{stored_filename} ||= ($RSCHLIB."/rschedule/slreportd_".$self->{const}{hostname}."_store");
(defined $self->{dhost}) or croak 'Require a host parameter';
#foreach (@{$self->{dhost}}) { print "Host $_\n"; }
my $select = IO::Select->new();
$Exister = new IPC::PidStat();
$select->add($Exister->fh);
$self->pt(); # Create process table
$self->fake_pt(); # Create process table
# Load constants
$self->_fill_const;
$self->_fill_stored;
$self->_fill_dynamic;
my $inbuffer = '';
my $poll_interval = $self->{alive_time}; # How often to wake up while loop, at maximum
$poll_interval = $self->{stats_interval} if $poll_interval > $self->{stats_interval};
$poll_interval ||= 1; # as 0 would busy-wait!
my $last_alive_sec = 0;
my $last_stats_sec = 0;
foreach my $plugin (@{$self->{plugins}}) {
# Call twice, as some stats are interval based
$plugin->poll(); # Initialize plugin stats
$plugin->poll(); # Initialize plugin stats
}
service_loop:
while (1) {
my ($now_sec, $now_usec) = gettimeofday();
# See if chooser is alive
if ($self->{socket}
&& (($now_sec - $last_alive_sec) >= $self->{alive_time})) {
_alive_check ($self);
$last_alive_sec = $now_sec;
}
# See if stats need polling
if (($now_sec - $last_stats_sec) >= $self->{stats_interval}) {
foreach my $plugin (@{$self->{plugins}}) {
$plugin->poll($now_sec, $now_usec);
}
$last_stats_sec = $now_sec;
}
if (! $self->{socket}) {
# Open the socket to first found host
foreach my $host (@{$self->{dhost}}) {
last if ($self->_open_host($host));
}
$select->remove($select->handles);
$select->add($Exister->fh);
$select->add($self->{socket}) if $self->{socket};
$inbuffer = '';
}
# Wait for someone to become active
# or send a alive message every 60 secs (in case slchoosed goes down & up)
sleep($poll_interval) if ($select->count() == 0); # select won't block if no fd's
foreach my $fh ($select->can_read ($poll_interval)) {
print "Servicing input\n" if $Debug;
if ($fh == $Exister->fh) {
_exist_traffic();
}
else {
# Snarf input
if ($inbuffer !~ /\n/) {
my $data='';
my $rv = $fh->sysread($data, POSIX::BUFSIZ);
if (!defined $rv || (length $data == 0)) {
# May have disconnected; force an alive check
$last_alive_sec = 0;
next service_loop;
}
$inbuffer .= $data;
}
while ($inbuffer =~ s/(.*?)\n//) {
my $line = $1;
chomp $line;
print "REQ $line\n" if $Debug;
my ($cmd, $params) = _pthaw($line, $Debug);
# Commands
if ($cmd eq "report_get_dynamic") {
$self->_fill_and_send;
} elsif ($cmd eq "report_fwd_set") {
$self->_set_stored($params);
} elsif ($cmd eq "report_fwd_comment") {
$self->_comment($params);
} elsif ($cmd eq "report_fwd_fixed_load") {
$self->_fixed_load($params);
} elsif ($cmd eq "report_restart") {
# Overall fork loop will deal with it.
warn "-Info: report_restart\n" if $Debug;
exit(0);
} else {
warn "%Error: Bad request from server: $line\n" if $Debug;
}
}
}
}
}
}
######################################################################
######################################################################
#### Accessors
sub pt {
my $self = shift;
if (!$self->{pt}) {
$self->{pt} = new Proc::ProcessTable( 'cache_ttys' => 1 );
}
return $self->{pt};
}
sub fake_pt {
my $self = shift;
if (!$self->{fake_pt}) {
$self->{fake_pt} = Schedule::Load::FakeReporter::ProcessTable
->new (reportref=>$self);
}
return $self->{fake_pt};
}
######################################################################
######################################################################
#### Sending
sub _open_host {
my $self = shift;
my $host = shift;
# Open a socket to the given host return true if successful
print "Trying host $host $self->{port}\n" if $Debug;
my $fh = Schedule::Load::Socket->new(
PeerAddr => $host,
PeerPort => $self->{port},
Timeout => $self->{timeout},
);
$self->{socket} = $fh;
$self->{socket} = undef if (!$fh || !$fh->connected());
if ($self->{socket}) {
# Send constants to the host, that will tell it we live
$self->{stored_read} = 0; # Reread stored info in case redundant reporters
$self->{const_changed} = 1;
$self->{const}{_update} = 0;
$self->_fill_and_send;
$self->{const}{_update} = 1; # So chooser can skip calling start function
}
print " Host $host $self->{port} is ".($self->{socket}?"up":"down")."!\n" if $Debug;
return $self->{socket};
}
sub _alive_check {
my $self = shift;
my $msg = "report_ping\n";
# Send a line to the socket to see if all is well.
# This also keeps at least part of the reporter paged-in.
my $fh = $self->{socket};
# Below may die if slchoosed goes down:
# Our fork() loop will catch it and restart
my $ok = $fh->send_and_check($msg);
if (!$ok || !$fh || !$fh->connected()) {
print "Disconnect\n" if $Debug;
$self->{socket} = undef;
}
}
######################################################################
######################################################################
######################################################################
#### Send_Hash loading
sub _fill_and_send {
my $self = shift;
# Fill dynamic values and send
$self->_fill_stored;
$self->_fill_dynamic;
if ($self->{const_changed}) {
$self->{const_changed} = 0;
$self->_send_hash('const');
$self->_send_hash('stored');
}
# Dynamic must be last, it triggers sending info back to user
$self->_send_hash('dynamic');
}
sub _fill_const {
my $self = shift;
# fill constant values into self
# (Values that don't change with loading -- known at startup)
$self->{const_changed} = 1;
# Load our required keys
$self->{const}{cpus} ||= Unix::Processors->max_online();
$self->{const}{physical_cpus} ||= Unix::Processors->max_physical();
$self->{const}{max_clock} ||= Unix::Processors->max_clock();
$self->{const}{osname} ||= $Config{osname};
$self->{const}{osvers} ||= $Config{osvers};
$self->{const}{archname} ||= $Config{archname};
foreach my $field (qw(reservable)) {
$self->{const}{$field} = 0 if !defined $self->{const}{$field};
}
# Look for some special processes (assume init makes them)
foreach my $p (@{$self->pt->table}) {
if ($p->fname eq "nicercizerd") {
$self->{const}{nicercizerd} = 1;
}
}
}
sub _fill_dynamic_pid {
my $self = shift;
my $p = shift; # Processtable entry
my $pctcpu = shift;
# Fill a single PID into the dynamic structures
# Create hash
$self->{dynamic}{proc}{$p->pid}{pid} = $p->pid;
my $procref = $self->{dynamic}{proc}{$p->pid};
# Copy the process table
# We look inside the private hash, I've requested a new
# version of ProcessTable to get around this intrusion.
foreach (keys %{$p}) {
$procref->{$_} = $p->{$_};
}
$procref->{pctcpu} = $pctcpu;
# Elements that require special work
if ($Os_Linux) {
# Something funky is going on with linux
$procref->{nice} = $p->priority / 1;
$procref->{nice0} = $procref->{nice};
} else {
$procref->{nice0} = $procref->{nice} - 20;
}
$procref->{time} = $p->time * $ProcTimeToSec;
my $state = $p->state;
$state = "cpu".$p->onpro if ($state eq "onprocessor");
$procref->{state} = $state;
my $uid = $p->uid;
$uid ||= $p->euid if (exists ($p->{euid}));
$procref->{uname} = $User_Names{$uid};
if (!defined $procref->{uname}) { # Cache user names
$procref->{uname} = getpwuid($uid) || $uid;
$User_Names{$uid} = $procref->{uname};
}
}
sub _fill_dynamic {
my $self = shift;
# fill process and system loading values into self
$self->{dynamic} = {total_load => 0,
fixed_load => 0,
report_load => 0,
total_pctcpu => 0,
total_size => 0,
total_rss => 0,
};
my ($sec, $usec) = gettimeofday();
@Pid_Time_Base = ($sec,$usec) if !defined $Pid_Time_Base[0];
my $deltastamp = ($sec-$Pid_Time_Base[0]) + 1e-6*($usec-$Pid_Time_Base[1]);
@Pid_Time_Base = ($sec,$usec);
# Fill in plugin statistics
foreach my $plugin (@{$self->{plugins}}) {
my $stats = $plugin->stats;
foreach my $key (keys %{$stats}) {
$self->{dynamic}{$key} = $stats->{$key};
}
}
# Note the $p refs cannot be cached, they change when a new table call occurs
my @pidlist;
if (!$self->{fake}) {
push @pidlist, @{$self->pt->table};
}
push @pidlist, @{$self->fake_pt->table};
my %pidinfo = ();
# Find all parental references (should cache this at some point)
foreach my $p (@pidlist) {
$pidinfo{$p->pid}{parent} = $p->ppid;
}
# Push all logit's down towards parents
foreach my $p (@pidlist) {
# See which PIDs we will log
my $pctcpu = $p->pctcpu || 0;
$pctcpu = 0 if ($pctcpu eq "inf"); # Linux
if ($Distrust_Pctcpu) {
my $ustime = ($p->utime+$p->stime);
if (!$ustime
|| !defined $Pid_Time[$p->pid]
|| $p->start != $Pid_Time[$p->pid][0]) {
# Can't calculate, as p->start is wrong (on linux). We'll assume the
# pctcpu is ok.
#$pctcpu = $ustime / (1000*($sec-$p->start));
printf "PIDSTART %d SINCESTART %d-%d=%d UTIME %d LOAD %f\n"
,$p->pid, $sec, $p->start, $sec-$p->start, $ustime, $pctcpu
if 0;
} else {
$pctcpu = 100*(( ($ustime-$Pid_Time[$p->pid][1])
* $ProcTimeToSec)
/ $deltastamp # Seconds
);
$pctcpu /= $self->{const}{cpus} if $Divide_Pctcpu_By_Cpu;
printf "PIDCONT %d PCT %s CLOCK %d UTIME %d-%d=%d LOAD %f\n"
,$p->pid, $p->pctcpu||0, $deltastamp,
,$ustime, $Pid_Time[$p->pid][1], $ustime-$Pid_Time[$p->pid][1],
,$pctcpu if $Debug;
}
$Pid_Time[$p->pid] = [$p->start, $ustime];
}
$pidinfo{$p->pid}{pctcpu} = $pctcpu;
my $logit = ($pctcpu >= $self->{min_pctcpu}
&& $p->pid != $$); # Ignore ourself (hopefully not TOO much cpu time!)
$pidinfo{$p->pid}{logit} = $logit;
if ($p->uid) { # not root (speed things up)
my $searchpid = $p->pid;
#my $indent = 0;
while ($searchpid) {
#printf " %s %s\n", $p->pid, " "x($indent++). $searchpid;
$pidinfo{$searchpid}{logit_somechild} = 1 if $logit;
$searchpid = $pidinfo{$searchpid}{parent};
}
}
}
foreach my $p (@pidlist) {
my $fixed_load = undef;
my $cmndcomment = undef;
my $logit = $pidinfo{$p->pid}{logit};
if ($p->uid) { # not root
my $searchpid = $p->pid;
while ($searchpid) {
if (defined $Pid_Inherit{$searchpid}) {
if ((!defined $fixed_load)
&& defined $Pid_Inherit{$searchpid}{fixed_load}) {
$fixed_load = $Pid_Inherit{$searchpid};
if ($searchpid == $p->pid
&& !$pidinfo{$searchpid}{logit_somechild}
) {
$logit = 1; # Show this fixed_load process, he has no children to show
}
printf "Found fixed_load %s\n", $p->pid if $Debug;
}
if ((!defined $cmndcomment)
&& defined $Pid_Inherit{$searchpid}{cmndcomment}) {
$cmndcomment = $Pid_Inherit{$searchpid}{cmndcomment};
}
}
$searchpid = $pidinfo{$searchpid}{parent};
}
}
# Load any processes with lots of time, or with fixed_loading
# that isn't otherwise accounted for
my $pctcpu = $pidinfo{$p->pid}{pctcpu};
$pctcpu = 0 if $pctcpu eq 'nan';
if ($logit) {
_fill_dynamic_pid ($self, $p, $pctcpu);
$self->{dynamic}{proc}{$p->pid}{cmndcomment} = $cmndcomment if $cmndcomment;
}
# Count total loading
$self->{dynamic}{total_pctcpu} += $pctcpu;
if (($p->pid != $$)) { # Exclude ourself
my $load = ($self->{const}{load_pctcpu}
? ($pctcpu/100.0)
: (($p->state eq "run" || $p->state eq "onprocessor") ? 1:0));
$load = 1 if ($load > 0.90 && $load < 1.10); # 90% of a CPU really is close to full CPU, as slreportd takes some time itself
if ($load) {
$self->{dynamic}{total_load} += $load;
$self->{dynamic}{report_load} += $load if !defined $fixed_load;
#print "PID ",$p->pid," ADD LOAD $load PCT $pctcpu\n" if $Debug;
}
}
# Count memory
$self->{dynamic}{total_size} += _fix_overflow($p->size||0); # Float, so doesn't overflow
$self->{dynamic}{total_rss} += _fix_overflow($p->rss||0); # Float, so doesn't overflow
}
# Look for any fixed loads that died
# Also add up fixed loading across all fixed_loads
foreach my $pid (keys %Pid_Inherit) {
if (!defined $pidinfo{$pid}
&& $Pid_Inherit{$pid}{req_hostname} eq hostname()) { # Not a fake load on a remote host
delete $Pid_Inherit{$pid};
} else {
my $fixed_load = $Pid_Inherit{$pid}{fixed_load};
if (defined $fixed_load) {
printf "Added fixed load for %s\n", $pid if $Debug;
$self->{dynamic}{fixed_load} += $fixed_load;
}
}
}
$self->{dynamic}{report_load} += $self->{dynamic}{fixed_load};
}
sub _fixed_load {
my $self = shift;
my $params = shift;
my $load = $params->{load};
my $pid = $params->{pid};
print "Fixed load of $load PID $pid\n" if $Debug;
$load = $self->{const}{cpus} if $load<0; # Allow -1 for all CPUs
$Pid_Inherit{$pid}{fixed_load} = $load;
$Pid_Inherit{$pid}{pid} = $params->{pid};
$Pid_Inherit{$pid}{uid} = $params->{uid};
$Pid_Inherit{$pid}{req_pid} = $params->{req_pid};
$Pid_Inherit{$pid}{req_hostname} = $params->{req_hostname} || $params->{host} || hostname();
if ($load==0) {
delete $Pid_Inherit{$pid};
}
}
sub _comment {
my $self = shift;
my $params = shift;
my $cmndcomment = $params->{comment};
my $pid = $params->{pid};
print "Command Commentary '$cmndcomment' PID $pid\n" if $Debug;
$Pid_Inherit{$pid}{pid} = $pid;
$Pid_Inherit{$pid}{uid} = $params->{uid};
$Pid_Inherit{$pid}{cmndcomment} = $cmndcomment;
}
######################################################################
#### Math
sub _fix_overflow {
my $value = shift;
# Bug in Proc::ProcessTable before version 0.40 causes 32 bit overflow
my $float = 0.1 + $value;
$float = 4.0*1024*1024*1024 - $float if $float<0;
return $float;
}
######################################################################
#### Sending the hash to slchoosed
sub _send_hash {
my $self = shift;
my $field = shift;
# Send the hash over the file handle
my $fh = $self->{socket};
return if !$fh;
my $ok = $fh->send_and_check(_pfreeze("report_$field", $self->{$field}, $Debug));
if (!$ok || !$fh || !$fh->connected()) { $self->{socket} = undef; }
}
######################################################################
######################################################################
#### Existance
sub _exist_traffic {
# Handle UDP responses from our $Exister->pid_request calls.
print "UDP PidStat in...\n" if $Debug;
my ($pid,$exists,$onhost) = $Exister->recv_stat();
return if !defined $pid;
return if !defined $exists || $exists; # We only care about known-missing processes
print " UDP PidStat PID $onhost:$pid no longer with us. RIP.\n" if $Debug;
foreach my $pref (values %Pid_Inherit) {
if ($pref && $pref->{req_pid}==$pid && $pref->{req_hostname} eq $onhost) {
delete $Pid_Inherit{$pref->{pid}};
}
}
}
######################################################################
######################################################################
######################################################################
######################################################################
#### Stored configuration
sub _fill_stored {
my $self = shift;
# Get stored fields
# SHOULD: If already cached, check the file date and reread if needed
# BUT: Currently only this program changes it, so we don't care!
if (!$self->{stored_read}) {
$self->{stored} = {
reserved=>0,
};
if (defined $self->{stored_filename}
&& -r $self->{stored_filename}) {
print "Retrieve $self->{stored_filename}\n" if $Debug;
$self->{stored} = Storable::retrieve($self->{stored_filename});
}
$self->{const_changed} = 1;
$self->{stored_read} = 1;
}
}
sub _set_stored {
my $self = shift;
my $params = shift;
# Set a stored field to a given value
$self->_fill_stored(); # Make sure up-to-date
$self->{const_changed} = 1;
foreach my $var (keys %{$params}) {
my $value = $params->{$var};
next if $var eq "host";
print "_set_const($var = $value)\n" if $Debug;
if ($params->{set_const}) {
$self->{const}{$var} = $value;
} else {
$self->{stored}{$var} = $value;
}
}
if (!$params->{set_const}
&& defined $self->{stored_filename}) {
print "Store $self->{stored_filename}\n" if $Debug;
Storable::nstore $self->{stored}, $self->{stored_filename};
chmod 0666, $self->{stored_filename};
}
}
######################################################################
#### Package return
1;
######################################################################
__END__
=pod
=head1 NAME
Schedule::Load::Reporter - Distributed load reporting daemon
=head1 SYNOPSIS
use Schedule::Load::Reporter;
Schedule::Load::Reporter->start(dhost=>('host1', 'host2'),
port=>1234,);
=head1 DESCRIPTION
L<Schedule::Load::Reporter> on startup connects to the requested server
host and port. The server connected to can then poll this host for
information about system configuration and current loading conditions.
=over 4
=item start ([parameter=>value ...]);
Starts the reporter. Does not return.
=back
=head1 PARAMETERS
=over 4
=item dhost
List of daemon hosts that may be running the slchoosed server. The second
host is only used if the first is down, and so on down the list.
=item port
The port number of slchoosed. Defaults to 'slchoosed' looked up via
/etc/services, else 1752.
=item fake
Specifies load management should not be used, for reporting of a "fake"
hosts' status or scheduling a non-host related resource, like a license.
=item min_pctcpu
The minimum percentage of the CPU that a job must have to be included in
the list of top processes sent to the client. Defaults to 3. Setting to
0 will consume a lot of bandwidth.
=item stored_filename
The filename to store persistent items in, such as if this host is
reserved. Must be either local-per-machine, or have the hostname in it.
Defaults to /usr/local/lib/rschedule/slreportd_{hostname}_store. Set to
undef to disable persistence (thus if the machine reboots the reservation
is lost.) The path must be **ABSOLUTE** as the daemons do a chdir.
=back
=head1 DISTRIBUTION
The latest version is available from CPAN and from L<http://www.veripool.org/>.
Copyright 1998-2011 by Wilson Snyder. This package is free software; you
can redistribute it and/or modify it under the terms of either the GNU
Lesser General Public License Version 3 or the Perl Artistic License Version 2.0.
=head1 AUTHORS
Wilson Snyder <wsnyder@wsnyder.org>
=head1 SEE ALSO
L<Schedule::Load>, L<slreportd>
=cut