package Gearman::SlotManager;
# ABSTRACT: Managing Worker's lifecycle with Slots
our $VERSION = '0.3'; # VERSION
use namespace::autoclean;
use Devel::GlobalDestruction;
use Log::Log4perl qw(:easy);
#Log::Log4perl->easy_init($DEBUG);
Log::Log4perl->easy_init($ERROR);
use Data::Dumper;
use Any::Moose;

use AnyEvent;
use AnyEvent::HTTPD;
use Scalar::Util qw(weaken);

use Gearman::Slot;
has slotmap=>(is=>'rw', isa=>'HashRef', default=>sub{ return {}; });
has config=>(is=>'rw', isa=>'HashRef',required=>1);
has idle_watcher=>(is=>'rw');
has httpd=>(is=>'rw');
has port=>(is=>'rw',default=>sub{return 55995;});
sub BUILD{
    my $self = shift;

    my $conf = $self->config;
    my %global = %{$conf->{'global'}};
    my %baseconf = (
        job_servers=>[''],
        min=>1,
        max=>1,
        workleft=>0,
    );
    %global = (%baseconf,%global);
    
    my %confs = %{$conf->{slots}};
    foreach my $worker (keys %confs){
        my %conf = %{$confs{$worker}};

        %conf = (%global,%conf);
        DEBUG Dumper(\%conf);

        my @slots;
        foreach (0 .. $conf{max}-1){
            my $slot = Gearman::Slot->new(
                job_servers=>$conf{job_servers},
                libs=>$conf{libs},
                workleft=>$conf{workleft},
                worker_package=>$worker,
                worker_channel=>$worker.'__'.$_,
                sbbaseurl=>'http://localhost:'.$self->port,
            );
            push( @slots, $slot);
        }
        $self->slotmap->{$worker} = {conf=>\%conf, slots=>\@slots};
    }

    my $httpd = AnyEvent::HTTPD->new(port=>$self->port);
    $httpd->reg_cb (
        '/busy'=>sub{
            my ($httpd,$req) = @_;
            DEBUG "SB busy ".$req->parm('channel');
            my ($key,$idx) = split(/__/,$req->parm('channel'));
            DEBUG "SB busy $key $idx";
            $self->slots($key)->[$idx]->is_busy(1);
            $req->respond({content=>['text/plain','ok']});
        },
        '/idle'=>sub{
            my ($httpd,$req) = @_;
            DEBUG "SB idle ".$req->parm('channel');
            my ($key,$idx) = split(/__/,$req->parm('channel'));
            DEBUG "SB idle $key $idx";
            $self->slots($key)->[$idx]->is_busy(0);
            $req->respond({content=>['text/plain','ok']});
        },
    );
    $self->httpd($httpd);
    weaken($self);
}

sub slots{
    my $self = shift;
    my $key = shift;
    return $self->slotmap->{$key}->{slots};
}

sub conf{
    my $self = shift;
    my $key = shift;
    return $self->slotmap->{$key}->{conf};
}

sub start{
    DEBUG __PACKAGE__." start";
    my $self = shift;
    foreach my $key (keys %{$self->slotmap}){
        my $slots = $self->slots($key);
        my $conf = $self->conf($key);
        my $min = $conf->{min};
        foreach my $i ( 0 .. $min-1 ){
            $slots->[$i]->start();
        }
    }
    my $iw = AE::timer 0,5, sub{$self->on_idle;};
    $self->idle_watcher($iw);
    #weaken($self);
}

sub on_idle{
    my $self = shift;
    DEBUG "ON_IDLE";
    foreach my $key (keys %{$self->slotmap}){
        my @slots = @{$self->slots($key)};
        my %conf = %{$self->conf($key)};
        my $idle = 0;
        my $running = 0;
        foreach my $s ( @slots ){
            $idle += $s->is_idle;
            $running += $s->is_running;
        }
        DEBUG "[$key] idle: $idle, running: $running";
        if( !$idle ){
            if( $running < $conf{max} ){
                DEBUG "expand $key";
                my @stopped = grep{$_->is_stopped}@slots;
                shift(@stopped)->start;
            }
        }
        else{
            if( $running > $conf{min} ){
                DEBUG "reduce $key";
                my @running = grep{$_->is_running}@slots;
                pop(@running)->stop;
            }
        }
    }

}

sub stop{
    DEBUG __PACKAGE__." stop";
    my $self = shift;
    $self->idle_watcher(undef);
    foreach my $key (keys %{$self->slotmap}){
        my $slots = $self->slots($key);
        foreach my $s ( @{$slots} ){
            $s->stop() unless $s->is_stopped;
        }
    }
}

sub DEMOLISH{
    return if in_global_destruction;
    DEBUG __PACKAGE__.' DEMOLISHED';
    
}

__PACKAGE__->meta->make_immutable;
1;


=pod

=head1 NAME

Gearman::SlotManager - Managing Worker's lifecycle with Slots

=head1 VERSION

version 0.3

=head1 SYNOPSIS

Will be updated soon.

See testManager.pl in Gearman::SlotManager directory.

=head1 AUTHOR

HyeonSeung Kim <sng2nara@hanmail.net>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2012 by HyeonSeung Kim.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut


__END__