The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package AnyEvent::Task::Client;

use common::sense;

use Scalar::Util;

use AnyEvent;
use AnyEvent::Util;
use AnyEvent::Handle;
use AnyEvent::Socket;

use AnyEvent::Task::Client::Checkout;


sub new {
  my ($class, %arg) = @_;
  my $self = {};
  bless $self, $class;

  $self->{connect} = $arg{connect} || die "need connect";
  $self->{name} = $arg{name};

  $self->{min_workers} = defined $arg{min_workers} ? $arg{min_workers} : 2;
  $self->{max_workers} = defined $arg{max_workers} ? $arg{max_workers} : 20;
  $self->{min_workers} = $self->{max_workers} if $self->{min_workers} > $self->{max_workers};
  $self->{timeout} = $arg{timeout} if exists $arg{timeout};
  $self->{max_checkouts} = $arg{max_checkouts} if exists $arg{max_checkouts};
  $self->{dont_refork_after_error} = 1 if $arg{dont_refork_after_error};

  $self->{total_workers} = 0;
  $self->{connecting_workers} = {};
  $self->{available_workers} = {};
  $self->{occupied_workers} = {};
  $self->{workers_to_checkouts} = {}; # used to map errors detected on worker connection to checkout callbacks
  $self->{worker_checkout_counts} = {}; # used for max_checkouts "memory leak protection"

  $self->{pending_checkouts} = [];

  $self->populate_workers;

  return $self;
}



sub populate_workers {
  my ($self) = @_;

  Scalar::Util::weaken($self);

  return if $self->{total_workers} >= $self->{max_workers};

  my $workers_to_create = $self->{min_workers} - $self->{total_workers};
  if ($workers_to_create <= 0) {
    $workers_to_create = 0;
    $workers_to_create = 1 unless keys %{$self->{available_workers}} || keys %{$self->{connecting_workers}};
  }

  for (1 .. $workers_to_create) {
    $self->{total_workers}++;

    my $host = $self->{connect}->[0];
    my $service = $self->{connect}->[1];

    my $worker_guard;
    $self->{connecting_workers}->{0 + $worker_guard} = $worker_guard = tcp_connect $host, $service, sub {
      my $fh = shift;

      delete $self->{connecting_workers}->{0 + $worker_guard};

      if (!$fh) {
        $self->{total_workers}--;
        $self->install_populate_workers_timer;
        return;
      }

      delete $self->{populate_workers_timer};

      my $worker; $worker = new AnyEvent::Handle
                              fh => $fh,
                              on_read => sub { }, ## So we always have a read watcher and can instantly detect worker deaths
                              on_error => sub {
                                my ($worker, $fatal, $message) = @_;

                                my $checkout = $self->{workers_to_checkouts}->{0 + $worker};
                                $checkout->throw_fatal_error('worker connection suddenly died') if $checkout;

                                $self->destroy_worker($worker);
                                $self->populate_workers;
                              };

      $self->{worker_checkout_counts}->{0 + $worker} = 0;

      $self->make_worker_available($worker);

      $self->try_to_fill_pending_checkouts;
    };
  }

}


sub install_populate_workers_timer {
  my ($self) = @_;

  return if exists $self->{populate_workers_timer};

  $self->{populate_workers_timer} = AE::timer 0.2, 1, sub {
    $self->populate_workers;
  };
}


sub try_to_fill_pending_checkouts {
  my ($self) = @_;

  return unless @{$self->{pending_checkouts}};

  if (keys %{$self->{available_workers}}) {
    my @available_workers = values %{$self->{available_workers}};
    my $worker = shift @available_workers;
    $self->make_worker_occupied($worker);

    my $checkout = shift @{$self->{pending_checkouts}};
    $checkout->{worker} = $worker;

    $self->{workers_to_checkouts}->{0 + $worker} = $checkout;
    Scalar::Util::weaken($self->{workers_to_checkouts}->{0 + $worker});

    $checkout->_try_to_fill_requests;
    return $self->try_to_fill_pending_checkouts;
  }

  $self->populate_workers;
}



sub make_worker_occupied {
  my ($self, $worker) = @_;

  delete $self->{available_workers}->{0 + $worker};
  $self->{occupied_workers}->{0 + $worker} = $worker;

  $self->{worker_checkout_counts}->{0 + $worker}++;
}


sub make_worker_available {
  my ($self, $worker) = @_;

  if (exists $self->{max_checkouts}) {
    if ($self->{worker_checkout_counts}->{0 + $worker} >= $self->{max_checkouts}) {
      $self->destroy_worker($worker);
      return;
    }
  }

  ## Cancel any push_read callbacks installed while worker was occupied
  $worker->{_queue} = [];

  delete $self->{occupied_workers}->{0 + $worker};
  $self->{available_workers}->{0 + $worker} = $worker;
}


sub destroy_worker {
  my ($self, $worker) = @_;

  $worker->destroy;

  $self->{total_workers}--;
  delete $self->{available_workers}->{0 + $worker};
  delete $self->{occupied_workers}->{0 + $worker};
  delete $self->{worker_checkout_counts}->{0 + $worker};
}


sub checkout {
  my ($self, @args) = @_;

  my $checkout = AnyEvent::Task::Client::Checkout->_new( client => $self, @args, );

  push @{$self->{pending_checkouts}}, $checkout;

  $self->try_to_fill_pending_checkouts;

  return $checkout;
}

sub remove_pending_checkout {
  my ($self, $checkout) = @_;

  my @out;

  $self->{pending_checkouts} = [ grep { $_ != $checkout } @{$self->{pending_checkouts}} ];
}

1;