The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl
use strict;
use DBI;
use Getopt::Long;

my $dbname = "schwartz";
my $user = "root";
my $pass = "";
my $job = "";
my $max_age = 0;
my $max_count = 0;

=head1 NAME

schwartzmon - monitor The Schwartz

=head1 USAGE

Type

    schwartzmon --help

to get full usage.

=cut

sub usage {
    die <<USAGE;
Usage: schwartzmon <command> [OPTS]

Possible commands:
     queues           View past-due job queue depths.  (default cmd)
     errors           View errors.

Global options:
   --job=<JOBNAME>    Only look at one specific job name.  Else all are considered.
   --user=<user>      Connect to the database as this user
   --pass=<pass>      Connect to the database with this password
   --database=<db>    Connect to this database
   --dsn=<dsn>        Connect to the database using this DSN

Options for 'queues' command:
   --maxage=<n>       Don't complain if age of overdue job queue is <= 'n'
   --maxcount=<n>     Don't complain if depth of overdue job queue is <= 'n'

Options for 'errors' command:
   --follow | -f      Like 'tail -f' for tracking the error log
   --last=n           Show last 'n' errors from log
   --inlast=n         Show errors in last 'n' seconds

Verbosity:
   if no alerts, nothing is printed, and exit status is 0.

Exit status:
   0 if no alerts,
   non-zero if there are alerts, in which case the alerts are printed.

USAGE

}

my $opt_help = 0;
my ($opt_follow, $opt_last, $opt_inlast, $opt_func, $dsn);
usage() unless GetOptions("job=s" => \$job,
                          "maxage=i" => \$max_age,
                          "maxcount=i" => \$max_count,
                          "help"       => \$opt_help,
                          "follow|f"   => \$opt_follow,
                          "last=i"     => \$opt_last,
                          "inlast=i"   => \$opt_inlast,
                          "user=s"     => \$user,
                          "pass=s"     => \$pass,
                          "dsn=s"      => \$dsn,
                          "database=s" => \$dbname,
                          "func=s"     => \$opt_func,
                          );
usage() if $opt_help;

my $cmd = shift || "queues";
usage() unless $cmd =~ /^queues|errors$/;

my $dbset = DBSet->new;

$dsn ||= "DBI:mysql:$dbname";

$dbset->add(DBHandle->new({ dsn => $dsn, user => $user, pass => $pass}));

if ($cmd eq "queues") { queues($dbset); }
if ($cmd eq "errors") { errors($dbset); }
exit 0;

#################

sub queues {
    my $dbs = shift;
    my $some_alert = 0;
    $dbs->foreach(sub {
        my $db = shift;
        my $dbh = $db->dbh or next;

        my $funcmap = $dbh->selectall_hashref("SELECT funcid, funcname FROM funcmap", "funcid");

        foreach my $funcid (sort { $funcmap->{$a}{funcname} cmp $funcmap->{$b}{funcname} } keys %$funcmap) {
            my $funcname = $funcmap->{$funcid}{funcname};
            next if $job && $funcname ne $job;

            my $now = time();
            my $inf = $dbh->selectrow_hashref("SELECT COUNT(*) as 'ct', MIN(run_after) 'oldest' FROM job WHERE funcid=? AND run_after <= $now",
                                              undef, $funcid);
            my $behind = $inf->{ct} ? ($now - $inf->{oldest}) : 0;

            # okay by default, then we apply rules:
            my $okay = 1;
            $okay = 0 if $behind    > $max_age;
            $okay = 0 if $inf->{ct} > $max_count;
            next if $okay;
            $some_alert = 1;

            print "$funcname\n";
            print "  outstanding: $inf->{ct}\n";
            print "  behind_secs: $behind\n";
        }
    });
    exit($some_alert ? 1 : 0);
}

sub errors {
    my $dbs = shift;

    if ($opt_follow) {
        follow_errors($dbs);
    }

    $opt_last = 100 unless $opt_last || $opt_inlast;

    my @rows;
    $dbs->foreach(sub {
        my $db = shift;
        my $dbh = $db->dbh
            or next;

        my $extra_where = '';
        if ($opt_func) {
            my $funcid = $db->funcid_of_func($opt_func) || 0;
            $extra_where = "AND funcid=$funcid";
        }

        my $sql;
        if ($opt_last) {
            $sql = "SELECT error_time, jobid, message FROM error WHERE 1=1 $extra_where " .
                "ORDER BY error_time DESC LIMIT $opt_last";
        } elsif ($opt_inlast) {
            my $since = time() - $opt_inlast;
            $sql = "SELECT error_time, jobid, message FROM error WHERE error_time >= $since $extra_where " .
                "ORDER BY error_time LIMIT 50000";
        }


        my $sth = $dbh->prepare($sql);
        $sth->execute;
        push @rows, $_ while $_ = $sth->fetchrow_hashref;
    });

    @rows = sort { $a->{error_time} <=> $b->{error_time} } @rows;
    if ($opt_last && @rows > $opt_last) {
        shift @rows while @rows > $opt_last;
    }

    foreach my $r (@rows) {
        print_error($r);
    }

}

sub follow_errors {
    my $dbs = shift;

    while (1) {
        $dbs->foreach(sub {
            my $db = shift;
            my $dbh = $db->dbh
                or next;
            my $notes = $db->notes;

            my $lastmax = $notes->{lastmax} || time();
            my $seen    = $notes->{seen}    ||= {};

            my $extra_where = '';
            if ($opt_func) {
                my $funcid = $db->funcid_of_func($opt_func) || 0;
                $extra_where = "AND funcid=$funcid";
            }

            my $sth = $dbh->prepare("SELECT error_time, jobid, message FROM error WHERE error_time >= ? $extra_where ORDER BY error_time");
            $sth->execute($lastmax);
            my @errors;
            push @errors, $_ while $_ = $sth->fetchrow_hashref;

            my $newmax = $lastmax;
            foreach my $r (@errors) {
                my $sig = join(",", map { $_, $r->{$_} } sort keys %$r);
                next if $seen->{$sig};
                $seen->{$sig} = $r->{error_time};
                print_error($r);
                $newmax = $r->{error_time} if $r->{error_time} > $newmax;
            }

            $notes->{lastmax} = $newmax;

            foreach my $sig (keys %$seen) {
                my $time = $seen->{$sig};
                delete $seen->{$sig} if $time < $newmax;
            }

        });
        sleep 1;
    }
}

sub print_error {
    my $r = shift;
    my $msg = $r->{message};
    $msg =~ s/\s+$//g;
    printf scalar(localtime($r->{error_time})) . " [$r->{jobid}]: $msg\n";
}


package DBSet;

sub new {
    return bless [];
}

sub add {
    my ($self, $db) = @_;
    push @$self, $db;
}

sub foreach {
    my ($self, $cb) = @_;
    foreach my $dbh (@$self) {
        $cb->($dbh);
    }
}

package DBHandle;

sub new {
    my ($class, $dbinf) = @_;
    return bless $dbinf, $class;
}

sub notes {
    my $self = shift;
    return $self->{notes} ||= {};
}

# returns DBI handle
sub dbh {
    my $self = shift;
    return $self->{_dbh} ||=
        DBI->connect($self->{dsn}, $self->{user}, $self->{pass})

}

sub funcid_of_func {
    my ($self, $func) = @_;
    my $notes = $self->notes;
    return $notes->{"funcid_of_$func"} if exists $notes->{"funcid_of_$func"};

    my $dbh = $self->dbh;
    return $notes->{"funcid_of_$func"} =
        $dbh->selectrow_array("SELECT funcid FROM funcmap WHERE funcname=?",
                              undef, $func);
}


=head1 COPYRIGHT, LICENSE & WARRANTY

This software is Copyright 2007, 2008 Six Apart Ltd, cpan@sixapart.com. All
rights reserved.

TheSchwartz is free software; you may redistribute it and/or modify it
under the same terms as Perl itself.

TheScwhartz comes with no warranty of any kind.

=cut