The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Plack::Server::AnyEvent;
use strict;
use warnings;
use 5.008_001;
our $VERSION = '0.04';

use Scalar::Util qw(blessed weaken);
use Try::Tiny;
use Carp;

use Socket qw(IPPROTO_TCP TCP_NODELAY);
use Errno qw(EAGAIN EINTR);
use IO::Handle;

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::Util qw(WSAEWOULDBLOCK);

use HTTP::Status;

use Plack::HTTPParser qw(parse_http_request);
use Plack::Util;

use Plack::Middleware::ContentLength;
use Plack::Middleware::Chunked;

use constant HAS_AIO => !$ENV{PLACK_NO_SENDFILE} && try {
    require AnyEvent::AIO;
    require IO::AIO;
    1;
};

use Plack::Server::AnyEvent::Writer;

sub new {
    my($class, @args) = @_;

    return bless {
        host => undef,
        port => undef,
        no_delay => 1,
        @args,
    }, $class;
}

sub register_service {
    my($self, $app) = @_;

    $app = Plack::Middleware::ContentLength->wrap($app);
#    $app = Plack::Middleware::Chunked->wrap($app);

    $self->{listen_guard} = $self->_create_tcp_server($app);
}

sub _create_tcp_server {
    my ( $self, $app ) = @_;

    return tcp_server $self->{host}, $self->{port}, $self->_accept_handler($app), sub {
        my ( $fh, $host, $port ) = @_;
        $self->{prepared_host} = $host;
        $self->{prepared_port} = $port;
        warn "Accepting requests at http://$host:$port/\n";
        return 0;
    };
}

sub _accept_handler {
    my ( $self, $app ) = @_;

    return sub {
        my ( $sock, $peer_host, $peer_port ) = @_;

        return unless $sock;

		$self->{exit_guard}->begin;

        if ( $self->{no_delay} ) {
            setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, 1)
                or die "setsockopt(TCP_NODELAY) failed:$!";
        }

        my $headers = "";

        my $try_parse = sub {
            if ( $self->_try_read_headers($sock, $headers) ) {
                my $env = {
                    SERVER_PORT         => $self->{prepared_port},
                    SERVER_NAME         => $self->{prepared_host},
                    SCRIPT_NAME         => '',
                    'psgi.version'      => [ 1, 0 ],
                    'psgi.errors'       => *STDERR,
                    'psgi.url_scheme'   => 'http',
                    'psgi.nonblocking'  => Plack::Util::TRUE,
                    'psgi.streaming'    => Plack::Util::TRUE,
                    'psgi.run_once'     => Plack::Util::FALSE,
                    'psgi.multithread'  => Plack::Util::FALSE,
                    'psgi.multiprocess' => Plack::Util::FALSE,
                    'psgi.input'        => $sock,
                    'REMOTE_ADDR'       => $peer_host,
                };

                my $reqlen = parse_http_request($headers, $env);

                if ( $reqlen < 0 ) {
                    die "bad request";
                } else {
                    return $env;
                }
            }

            return;
        };

        local $@;
        unless ( eval {
            if ( my $env = $try_parse->() ) {
                # the request data is already available, no need to parse more
                $self->_run_app($app, $env, $sock);
            } else {
                # there's not yet enough data to parse the request,
                # set up a watcher
                $self->_create_req_parsing_watcher( $sock, $try_parse, $app );
            };

            1;
        }) {
            $self->_bad_request($sock);
        }
    };
}

# returns a closure that tries to parse
# this is not a method because it needs a buffer per socket
sub _try_read_headers {
    my ( $self, $sock, undef ) = @_;

    # FIXME add a timer to manage read timeouts
    local $/ = "\012";

    read_more: for my $headers ( $_[2] ) {
        if ( defined(my $line = <$sock>) ) {
            $headers .= $line;

            if ( $line eq "\015\012" or $line eq "\012" ) {
                # got an empty line, we're done reading the headers
                return 1;
            } else {
                # try to read more lines using buffered IO
                redo read_more;
            }
        } elsif ($! and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK ) {
            die $!;
        }
    }

    # did not read to end of req, wait for more data to arrive
    return;
}

sub _create_req_parsing_watcher {
    my ( $self, $sock, $try_parse, $app ) = @_;

    my $headers_io_watcher;
    $headers_io_watcher = AE::io $sock, 0, sub {
        try {
            if ( my $env = $try_parse->() ) {
                undef $headers_io_watcher;
                $self->_run_app($app, $env, $sock);
            }
        } catch {
            undef $headers_io_watcher;
            $self->_bad_request($sock);
        }
    };
}

sub _bad_request {
    my ( $self, $sock ) = @_;

    $self->_write_psgi_response(
        $sock,
        [
            400,
            [ 'Content-Type' => 'text/plain' ],
            [ ],
        ],
    );

    return;
}

sub _run_app {
    my($self, $app, $env, $sock) = @_;

    my $res = Plack::Util::run_app $app, $env;

    if ( ref $res eq 'ARRAY' ) {
        $self->_write_psgi_response($sock, $res);
    } elsif ( blessed($res) and $res->isa("AnyEvent::CondVar") ) {
        $res->cb(sub { $self->_write_psgi_response($sock, shift->recv) });
    } elsif ( ref $res eq 'CODE' ) {
        $res->(
            sub {
                my $res = shift;

                if ( @$res < 2 ) {
                    croak "Insufficient arguments";
                } elsif ( @$res == 2 ) {
                    my ( $status, $headers ) = @$res;

                    $self->_flush($sock);

                    my $writer = Plack::Server::AnyEvent::Writer->new($sock, $self->{exit_guard});

                    my $buf = $self->_format_headers($status, $headers);
                    $writer->write($$buf);

                    return $writer;
                } else {
                    my ( $status, $headers, $body, $post ) = @$res;
                    my $cv = $self->_write_psgi_response($sock, [ $status, $headers, $body ]);
                    $cv->cb(sub { $post->() }) if $post;
                }
            },
            $sock,
        );
    } else {
        croak("Unknown response type: $res");
    }
}

sub _write_psgi_response {
    my ( $self, $sock, $res ) = @_;

    if ( ref $res eq 'ARRAY' ) {
		if ( scalar @$res == 0 ) {
			# no response
			$self->{exit_guard}->end;
			return;
		}

        my ( $status, $headers, $body ) = @$res;

		my $cv = AE::cv;

		$self->_write_headers( $sock, $status, $headers )->cb(sub {
			local $@;
			if ( eval { $_[0]->recv; 1 } ) {
				$self->_write_body($sock, $body)->cb(sub {
					$self->{exit_guard}->end;
					local $@;
					eval { $cv->send($_[0]->recv); 1 } or $cv->croak($@);
				});
			}
		});

		return $cv;
    } else {
        no warnings 'uninitialized';
        warn "Unknown response type: $res";
        return $self->_write_psgi_response($sock, [ 204, [], [] ]);
    }
}

sub _write_headers {
    my ( $self, $sock, $status, $headers ) = @_;

    $self->_write_buf( $sock, $self->_format_headers($status, $headers) );
}

sub _format_headers {
    my ( $self, $status, $headers ) = @_;

    my $hdr = sprintf "HTTP/1.0 %d %s\015\012", $status, HTTP::Status::status_message($status);

    my $i = 0;

    my @delim = ("\015\012", ": ");

    foreach my $str ( @$headers ) {
        $hdr .= $str . $delim[++$i % 2];
    }

    $hdr .= "\015\012";

    return \$hdr;
}

# this flushes just the output buffer, not the input buffer (unlike
# $handle->flush)
sub _flush {
	my ( $self, $sock ) = @_;

    local $| = 1;
    print $sock '';
}

# helper routine, similar to push write, but respects buffering, and refcounts
# itself
sub _write_buf {
    my($self, $socket, $data) = @_;

    no warnings 'uninitialized';

    # try writing immediately
    if ( (my $written = syswrite($socket, $$data)) < length($$data) ) {
        my $done = defined(wantarray) && AE::cv;

        # either the write failed or was incomplete

        if ( !defined($written) and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
            # a real write error occured, like EPIPE
            $done->croak($!) if $done;
            return $done;
        }

        # the write was either incomplete or a non fatal error occured, so we
        # need to set up an IO watcher to wait until we can properly write

        my $length = length($$data);

        my $write_watcher;
        $write_watcher = AE::io $socket, 1, sub {
            write_more: {
                my $out = syswrite($socket, $$data, $length - $written, $written);

                if ( defined($out) ) {
                    $written += $out;

                    if ( $written == $length ) {
                        undef $write_watcher;
                        $done->send(1) if $done;
                    } else {
                        redo write_more;
                    }
                } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
                    $done->croak($!) if $done;
                    undef $write_watcher;
                }
            }
        };

        return $done;
    } elsif ( defined wantarray ) {
        my $done = AE::cv;
        $done->send(1);
        return $done;
    }
}

sub _write_body {
    my ( $self, $sock, $body ) = @_;

    if ( ref $body eq 'ARRAY' ) {
        my $buf = join "", @$body;
        return $self->_write_buf($sock, \$buf);
    } elsif ( Plack::Util::is_real_fh($body) ) {
        # real handles use nonblocking IO
        # either AIO or using watchers, with sendfile or with copying IO
        $self->_write_real_fh($sock, $body);
    } elsif ( blessed($body) and $body->can("string_ref") ) {
        # optimize IO::String to not use its incredibly slow getline
        if ( my $pos = $body->tell ) {
            my $str = substr ${ $body->string_ref }, $pos;
            return $self->_write_buf($sock, \$str);
        } else {
            return $self->_write_buf($sock, $body->string_ref);
        }
    } else {
        # like Plack::Util::foreach, but nonblocking on the output
        # handle

        my $handle = AnyEvent::Handle->new( fh => $sock );

        my $ret = AE::cv;

        $handle->on_error(sub {
            my $err = $_[2];
            $handle->destroy;
            $ret->send($err);
        });

        $handle->on_drain(sub {
            local $/ = \4096;
            if ( defined( my $buf = $body->getline ) ) {
                $handle->push_write($buf);
            } elsif ( $! ) {
                $ret->croak($!);
                $handle->destroy;
            } else {
                $body->close;
                $handle->on_drain(sub {
                    shutdown $handle->fh, 1;
                    $handle->destroy;
                    $ret->send(1);
                });
            }
        });

        return $ret;
    }
}

# when the body handle is a real filehandle we use this routine, which is more
# careful not to block when reading the response too

# FIXME support only reading $length bytes from $body, instead of until EOF
# FIXME use len = 0 param to sendfile
# FIXME use Sys::Sendfile in nonblocking mode if AIO is not available
# FIXME test sendfile on non file backed handles
# FIXME this is actually pretty broken on linux
sub _write_real_fh {
    my ( $self, $sock, $body ) = @_;

    if ( HAS_AIO and -s $body ) {
        my $cv = AE::cv;
        my $offset = 0;
        my $length = -s $body;
        $sock->blocking(1);
        my $sendfile; $sendfile = sub {
            IO::AIO::aio_sendfile( $sock, $body, $offset, $length - $offset, sub {
                my $ret = shift;
                $offset += $ret if $ret > 0;
                if ($offset >= $length || ($ret == -1 && ! ($! == EAGAIN || $! == EINTR))) {
                    if ( $ret == -1 ) {
                        $cv->croak($!);
                    } else {
                        $cv->send(1);
                    }

                    undef $sendfile;
                    undef $sock;
                } else {
                    $sendfile->();
                }
            });
        };
        $sendfile->();
        return $cv;
    } else {
        # $body is a real filehandle, so set up a watcher for it
        # this is basically sendfile in userspace
        my $sock_handle = AnyEvent::Handle->new( fh => $sock );
        my $body_handle = AnyEvent::Handle->new( fh => $body );

        my $cv = AE::cv;

        my $err = sub {
            $cv->croak($_[2]);

            for ( $sock_handle, $body_handle ) {
                $_->destroy;
            }
        };

        $sock_handle->on_error($err);
        $body_handle->on_error($err);

        $body_handle->on_eof(sub {
            $body_handle->destroy;
            $sock_handle->on_drain(sub {
                shutdown $sock_handle->fh, 1;
                $sock_handle->destroy;
                $cv->send(1);
            });
        });

        $sock_handle->on_drain(sub {
            $body_handle->push_read(sub {
                $sock_handle->push_write($_[0]{rbuf});
                $_[0]{rbuf} = '';
            });
        });

        return $cv;
    }
}

sub run {
    my $self = shift;
    $self->register_service(@_);

    my $exit = $self->{exit_guard} = AnyEvent->condvar;
	$exit->begin;

	my $w; $w = AE::signal QUIT => sub { $exit->end; undef $w };

	$exit->recv;
}

# ex: set sw=4 et:

1;
__END__

=head1 NAME

Plack::Server::AnyEvent - DEPRECATED. Use Twiggy

=head1 WARNING

This software is deprecated. Use L<Twiggy> instead.

=head1 SYNOPSIS

  my $server = Plack::Server::AnyEvent->new(
      host => $host,
      port => $port,
  );
  $server->run($app);

=head1 DESCRIPTION

Plack::Server::AnyEvent is a Plack server implementation using
AnyEvent. This server runs in a non-blocking event loop and suitable
for event-driven web applications like streaming API servers.

=head1 LICENSE

This module is licensed under the same terms as Perl itself.

=head1 AUTHOR

Tokuhiro Matsuno

Yuval Kogman

Tatsuhiko Miyagawa

=head1 SEE ALSO

L<Twiggy>

=cut