The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package AnyEvent::Task::Client::Checkout;

use common::sense;

use Scalar::Util;

use Callback::Frame;


use overload fallback => 1,
             '&{}' => \&_invoked_as_sub;

our $AUTOLOAD;


sub _new {
  my ($class, %arg) = @_;
  my $self = {};
  bless $self, $class;

  $self->{client} = $arg{client};
  Scalar::Util::weaken($self->{client});

  $self->{timeout} = exists $arg{timeout} ? $arg{timeout} :
                     exists $arg{client}->{timeout} ? $arg{client}->{timeout} :
                     30;

  $self->{log_defer_object} = $arg{log_defer_object} if exists $arg{log_defer_object};

  $self->{pending_requests} = [];

  return $self;
}

sub AUTOLOAD {
  my $self = shift;

  my $type = ref($self) or die "$self is not an object";

  my $name = $AUTOLOAD;
  $name =~ s/.*://;

  $self->{last_name} = $name;

  return $self->_queue_request([ $name, @_, ]);
}

sub _invoked_as_sub {
  my $self = shift;

  return sub {
    $self->{last_name} = undef;

    return $self->_queue_request([ undef, @_, ]);
  };
}

sub _queue_request {
  my ($self, $request) = @_;

  unless (Callback::Frame::is_frame($request->[-1])) {
    my $name = undef;

    if (defined $self->{client}->{name} || defined $self->{last_name}) {
      $name = defined $self->{client}->{name} ? $self->{client}->{name} : 'ANONYMOUS CLIENT';
      $name .= ' -> ';
      $name .= defined $self->{last_name} ? $self->{last_name} : 'NO METHOD';
    }

    my %args = (code => $request->[-1]);

    $args{name} = $name if defined $name;

    $request->[-1] = frame(%args)
      unless Callback::Frame::is_frame($request->[-1]);
  }

  push @{$self->{pending_requests}}, $request;

  $self->_install_timeout_timer;

  $self->_try_to_fill_requests;

  return;
}

sub _install_timeout_timer {
  my ($self) = @_;

  return if !defined $self->{timeout};
  return if exists $self->{timeout_timer};

  $self->{timeout_timer} = AE::timer $self->{timeout}, 0, sub {
    $self->{client}->remove_pending_checkout($self);

    if (exists $self->{worker}) {
      $self->{client}->destroy_worker($self->{worker});
      delete $self->{worker};
    }

    $self->throw_fatal_error("timed out after $self->{timeout} seconds");
  };
}

sub _throw_error {
  my ($self, $err) = @_;

  $self->{error_occurred} = 1;

  my $current_cb;

  if ($self->{current_cb}) {
    $current_cb = $self->{current_cb};
  } elsif (@{$self->{pending_requests}}) {
    $current_cb = $self->{pending_requests}->[0]->[-1];
  } else {
    die "_throw_error called but no callback installed. Error thrown was: $err";
  }

  $self->{pending_requests} = undef;

  if ($current_cb) {
    frame(existing_frame => $current_cb,
          code => sub {
      die $err;
    })->();
  }
}

sub throw_fatal_error {
  my ($self, $err) = @_;

  $self->{fatal_error} = $err;

  $self->_throw_error($err);
}

sub _try_to_fill_requests {
  my ($self) = @_;

  return unless exists $self->{worker};
  return unless @{$self->{pending_requests}};

  my $request = shift @{$self->{pending_requests}};

  my $cb = pop @{$request};
  $self->{current_cb} = $cb;
  Scalar::Util::weaken($self->{current_cb});

  if ($self->{fatal_error}) {
    $self->_throw_error($self->{fatal_error});
    return;
  }

  my $method_name = $request->[0];

  if (!defined $method_name) {
    $method_name = '->()';
    shift @$request;
  }

  $self->_install_timeout_timer;

  $self->{worker}->push_write( json => [ 'do', {}, @$request, ], );

  my $timer;

  if ($self->{log_defer_object}) {
    $timer = $self->{log_defer_object}->timer($method_name);
  }

  $self->{cmd_handler} = sub {
    my ($handle, $response) = @_;

    undef $timer;

    my ($response_code, $meta, $response_value) = @$response;

    if ($self->{log_defer_object} && $meta->{ld}) {
      $self->{log_defer_object}->merge($meta->{ld});
    }

    if ($response_code eq 'ok') {
      local $@ = undef;
      $cb->($self, $response_value);
    } elsif ($response_code eq 'er') {
      $self->_throw_error($response_value);
    } else {
      die "Unrecognized response_code: $response_code";
    }

    delete $self->{timeout_timer};
    delete $self->{cmd_handler};

    $self->_try_to_fill_requests;
  };

  $self->{worker}->push_read( json => $self->{cmd_handler} );
}

sub DESTROY {
  my ($self) = @_;

  $self->{client}->remove_pending_checkout($self)
    if $self->{client};

  if (exists $self->{worker}) {
    my $worker = $self->{worker};
    delete $self->{client}->{workers_to_checkouts}->{0 + $worker} if $self->{client};
    delete $self->{worker};

    if ($self->{fatal_error} || ($self->{error_occurred} && $self->{client} && !$self->{client}->{dont_refork_after_error})) {
      $self->{client}->destroy_worker($worker) if $self->{client};
      $self->{client}->populate_workers if $self->{client};
    } else {
      $worker->push_write( json => [ 'dn', {} ] );
      $self->{client}->make_worker_available($worker) if $self->{client};
      $self->{client}->try_to_fill_pending_checkouts if $self->{client};
    }
  }

  $self->{pending_requests} = $self->{current_cb} = $self->{timeout_timer} = $self->{cmd_handler} = undef;
}


1;