The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package App::derived;

use strict;
use warnings;
use 5.008005;
use File::Temp qw/tempfile/;
use File::Copy;
use Proclet;
use JSON ();
use Log::Minimal;

our $VERSION = '0.06';

my $_JSON = JSON->new()
    ->utf8(1)
    ->shrink(1)
    ->space_before(0)
    ->space_after(0)
    ->indent(0);

sub new {
    my $class = shift;
    my %opt = ref $_[0] ? %{$_[0]} : @_;
    my %args = (
        proclet  => Proclet->new(enable_log_worker => ($ENV{LM_DEBUG} ? 1 : 0)),
        interval => 10,
        host     => 0,
        port     => 12306,
        timeout  => 10,
        services => {},
        %opt
    );
    bless \%args, $class;
}

sub add_service {
    my $self = shift;
    my ($key, $cmd) = @_;
    my ($tmpfh,$tmpfile) = tempfile(UNLINK=>0, EXLOCK=>0);
    print $tmpfh $_JSON->encode({
        status=>"INIT",
        persec => '0E0',
        latest => '0E0',
    });
    close $tmpfh;
    $self->{services}->{$key} = {
        cmd => ['/bin/bash', '-c', $cmd],
        file => $tmpfile,
        prev => undef,
    };
    debugf("add service: %s", $key);
    $self->{proclet}->service(
        code => sub {
            $0 = "$0 worker $key";
            $self->worker($key);
            exit;
        },
        tag => $key.'_worker',
    );
}

sub add_plugin {
    my $self = shift;
    my ( $plugin, $args) = @_;

    my %args = (
        %$args,
        _services => $self->{services},
        _proclet => $self->{proclet},
    );    
    my $instance = $plugin->new(\%args);
    $instance->init();
}

sub run {
    my $self = shift;
    $self->{proclet}->run;
}

sub DESTROY {
    my $self = shift;
    for my $key ( keys %{$self->{services}} ) {
        unlink $self->{services}->{$key}->{file};
    }
}

sub worker {
    my ($self, $service_key) = @_;
    srand();
    my $service = $self->{services}->{$service_key};
    my $n = time;
    $n = $n - ( $n % $self->{interval}) + $self->{interval} + int(rand($self->{interval}));; #next + random
    my $stop = 1;
    local $SIG{TERM} = sub { $stop = 0 };

    while ( $stop ) {
        while ( $stop ) {
            last if time >= $n;
            select undef, undef, undef, 0.1 ## no critic;
        }
        $n = $n + $self->{interval};
        local $Log::Minimal::AUTODUMP = 1;
        debugf("exec command for %s => %s", $service_key, $service);
        my ($result, $exit_code) = cap_cmd($service->{cmd});
        debugf("command [%s]: exit_code:%s result:%s", $service_key, $exit_code, $result);
        if ( ! defined $result ) {
            atomic_write($service->{file}, {
                status => "ERROR",
                persec => undef,
                latest => undef,
                raw => undef,
                exit_code => $exit_code,
                last_update => time,
            });
            next;
        }
    
        my $orig = $result;
        $result =~ s!^[^0-9]+!!;
        {
            no warnings;
            $result = int($result);
        }
        if ( ! defined $service->{prev} ) {
            $service->{prev} = $result;
            atomic_write( $service->{file}, {
                status => "CALCURATE",
                persec => "E0E",
                latest => $result,
                raw => $orig,
                exit_code => $exit_code,
                last_update => time,
            });
            next;
        }
        my $derive = ($result - $service->{prev}) / $self->{interval};
        atomic_write( $service->{file}, {
            status => "OK",
            persec => $derive,
            latest => $result,
            raw => $orig,
            exit_code => $exit_code,
            last_update => time,
        });
        $service->{prev} = $result;
    }
}

sub cap_cmd {
    my ($cmdref) = @_;
    pipe my $logrh, my $logwh
        or die "Died: failed to create pipe:$!";
    my $pid = fork;
    if ( ! defined $pid ) {
        die "Died: fork failed: $!";
    } 

    elsif ( $pid == 0 ) {
        #child
        close $logrh;
        open STDOUT, '>&', $logwh
            or die "Died: failed to redirect STDOUT";
        close $logwh;
        exec @$cmdref;
        die "Died: exec failed: $!";
    }
    close $logwh;
    my $result;
    while(<$logrh>){
        chomp;chomp;
        $result .= $_;
    }
    close $logrh;
    while (wait == -1) {}
    my $exit_code = $?;
    $exit_code = $exit_code >> 8;
    if ( $exit_code != 0 ) {
        warnf("Error: command exited with code: $exit_code");
    }
    return ($result, $exit_code);
}

sub atomic_write {
    my ($writefile, $body) = @_;
    my ($tmpfh,$tmpfile) = tempfile(UNLINK=>0);
    print $tmpfh $_JSON->encode($body);
    close($tmpfh);
    move( $tmpfile, $writefile);
}


1;
__END__

=encoding utf8

=head1 NAME

App::derived - run command periodically and calculate rate and check from network

=head1 SYNOPSIS

  $ cat CmdsFile
  slowqueries: mysql -NB -e 'show global status like "Slow_queries%"'
  $ derived -MMemcahced,port=11211,host=127.0.0.1 CmdsFile

  $ telnet localhost port
  get slowqueris
  VALUE slowqueris 0 3
  0.2  # slow queries/sec

=head1 DESCRIPTION

derived runs commands periodically and capture integer value. And calculate per-second rate. 
You can retrieve these values from integrated memcached-protocol server or pluggable workers.

You can monitoring the variation of metrics through this daemon.

See detail for perldoc "derived"

=head1 AUTHOR

Masahiro Nagano E<lt>kazeburo@gmail.comE<gt>

=head1 SEE ALSO

<derived>

=head1 LICENSE

Copyright (C) Masahiro Nagano

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut