The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Mojo::IOLoop::Stream::HTTPClient;
use Mojo::Base 'Mojo::IOLoop::Stream';

use Mojo::Transaction::WebSocket;
use Mojo::Util 'term_escape';
use Mojo::WebSocket 'challenge';
use Scalar::Util 'weaken';

use constant DEBUG => $ENV{MOJO_CLIENT_DEBUG} || 0;

has request_timeout => 0;

sub new {
  my $self = shift->SUPER::new(@_);
  $self->on(read => sub { shift->_read_content(shift) });
  $self->on(close => sub { $_[0]->{closing}++ || $_[0]->_finish(1) });
  return $self;
}

sub process {
  my ($self, $tx) = @_;

  $self->{tx} = $tx;
  my $handle = $self->handle;
  unless ($handle->isa('IO::Socket::UNIX')) {
    $tx->local_address($handle->sockhost)->local_port($handle->sockport);
    $tx->remote_address($handle->peerhost)->remote_port($handle->peerport);
  }

  weaken $self;
  $tx->on(resume => sub { $self->_write_content });
  if (my $timeout = $self->request_timeout) {
    $self->{req_timeout} = $self->reactor->timer(
      $timeout => sub { $self->_error('Request timeout') });
  }
  $self->_write_content;
}

sub _error {
  my ($self, $err) = @_;
  $self->{tx}->res->error({message => $err}) if $self->{tx};
  $self->_finish(1);
}

sub _finish {
  my ($self, $close) = @_;

  # Remove request timeout and finish transaction
  $self->reactor->remove($self->{req_timeout}) if $self->{req_timeout};
  return ++$self->{closing} && $self->close unless my $tx = delete $self->{tx};

  # Premature connection close
  my $res = $tx->res->finish;
  if ($close && !$res->code && !$res->error) {
    $res->error({message => 'Premature connection close'});
  }

  # Upgrade connection to WebSocket
  if (my $ws = $self->_upgrade($tx)) {
    $self->emit(upgrade => $ws);
    return $ws->client_read($ws->handshake->res->content->leftovers);
  }

  ++$self->{closing} && $self->close_gracefully
    if $tx->error || !$tx->keep_alive;
  $res->error({message => $res->message, code => $res->code}) if $res->is_error;
  $tx->closed;
}

sub _read_content {
  my ($self, $chunk) = @_;

  # Corrupted connection
  return $self->close unless my $tx = $self->{tx};

  warn term_escape "-- Client <<< Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
  $tx->client_read($chunk);
  $self->_finish if $tx->is_finished;
}

sub _upgrade {
  my ($self, $tx) = @_;
  my $code = $tx->res->code // 0;
  return undef unless $tx->req->is_handshake && $code == 101;
  my $ws = Mojo::Transaction::WebSocket->new(handshake => $tx, masked => 1);
  return challenge($ws) ? $ws->established(1) : undef;
}

sub _url { shift->req->url->to_abs }

sub _write_content {
  my $self = shift;

  # Protect from resume event recursion
  return if !(my $tx = $self->{tx}) || $self->{write_lock};
  local $self->{write_lock} = 1;
  my $chunk = $tx->client_write;
  warn term_escape "-- Client >>> Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
  return unless length $chunk;
  $self->write($chunk => sub { $_[0]->_write_content });
}

1;

=encoding utf8

=head1 NAME

Mojo::IOLoop::Stream::HTTPClient - Non-blocking I/O HTTP client stream

=head1 SYNOPSIS

  use Mojo::IOLoop::Client;
  use Mojo::IOLoop::Stream::HTTPClient;
  use Mojo::Transaction::HTTP;
  
  # Create transaction
  my $tx = Mojo::Transaction::HTTP->new;
  $tx->req->method('GET')
  $tx->url->parse('https://mojolicious.org');
  $tx->on(
    finish => sub {
      my $tx = shift;
      say $tx->res->code;
    }
  );
  
  # Create socket connection
  my $client = Mojo::IOLoop::Client->new;
  $client->on(
    connect => sub {
      my $stream = Mojo::IOLoop::Stream::HTTPClient->new(pop);
      $stream->start;
      $stream->process($tx);
    }
  );
  $client->connect(address => 'mojolicious.org', port => 80);

  # Start reactor if necessary
  $stream->reactor->start unless $stream->reactor->is_running;

=head1 DESCRIPTION

L<Mojo::IOLoop::Stream::HTTPClient> is a container for I/O streams used by
L<Mojo::IOLoop> to support the HTTP protocol client-side.

=head1 EVENTS

L<Mojo::IOLoop::Stream::HTTPClient> inherits all events from
L<Mojo::IOLoop::Stream> and can emit the following new ones.

=head2 upgrade

  $stream->on(upgrade => sub {
    my ($stream, $ws) = @_;
    ...
  });

Emitted when the connection should be upgraded to the WebSocket protocol.

=head1 ATTRIBUTES

L<Mojo::IOLoop::Stream::HTTPClient> inherits all attributes from
L<Mojo::IOLoop::Stream> and implements the following ones.

=head2 request_timeout

  my $timeout = $stream->request_timeout;
  $stream     = $stream->request_timeout(5);

Maximum amount of time in seconds sending the request and receiving a whole
response may take before getting canceled, defaults to C<0>. Setting the value
to C<0> will allow to wait indefinitely.

=head1 METHODS

L<Mojo::IOLoop::Stream::HTTPClient> inherits all methods from
L<Mojo::IOLoop::Stream> and implements the following new ones.

=head2 new

  my $stream = Mojo::IOLoop::Stream::HTTPClient->new($handle);

Construct a new L<Mojo::IOLoop::Stream::HTTPClient> object.

=head2 process

  $stream->process(Mojo::Transaction::HTTP->new);

Process a L<Mojo::Transaction::HTTP> object with the current connection.

=head1 DEBUGGING

You can set the C<MOJO_CLIENT_DEBUG> environment variable to get some advanced
diagnostics information printed to C<STDERR>.

  MOJO_CLIENT_DEBUG=1

=head1 SEE ALSO

L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicious.org>.

=cut