The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Twiggy::Server;
use strict;
use warnings;

use Scalar::Util qw(blessed weaken);
use Try::Tiny;
use Carp;

use Socket qw(IPPROTO_TCP TCP_NODELAY);
use Errno qw(EAGAIN EINTR);
use IO::Handle;

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::Util qw(WSAEWOULDBLOCK);

use HTTP::Status;
use Plack::HTTPParser qw(parse_http_request);
use Plack::Util;

use constant DEBUG => $ENV{TWIGGY_DEBUG};
use constant HAS_AIO => !$ENV{PLACK_NO_SENDFILE} && try {
    require AnyEvent::AIO;
    require IO::AIO;
    1;
};

open my $null_io, '<', \'';

sub new {
    my($class, @args) = @_;

    return bless {
        no_delay => 1,
        timeout => 300,
        read_chunk_size => 4096,
        @args,
    }, $class;
}

sub start_listen {
    my ($self, $app) = @_;
    my @listen = @{$self->{listen} || [ ($self->{host} || '') . ":" . ($self->{port} || 0) ]};
    for my $listen (@listen) {
        push @{$self->{listen_guards}}, $self->_create_tcp_server($listen, $app);
    }
}

sub register_service {
    my($self, $app) = @_;

    $self->start_listen($app);

    $self->{exit_guard} = AE::cv {
        # Make sure that we are not listening on a socket anymore, while
        # other events are being flushed
        delete $self->{listen_guards};
    };
    $self->{exit_guard}->begin;
}

sub _create_tcp_server {
    my ( $self, $listen, $app ) = @_;

    my($host, $port, $is_tcp);
    if ($listen =~ /:\d+$/) {
        ($host, $port) = split /:/, $listen;
        $host = undef if $host eq '';
        $port = undef if $port == 0;
        $is_tcp = 1;
    } else {
        $host = "unix/";
        $port = $listen;
    }

    my($listen_host, $listen_port);

    return tcp_server $host, $port, $self->_accept_handler($app, $is_tcp, \$listen_host, \$listen_port),
        $self->_accept_prepare_handler(\$listen_host, \$listen_port);
}

sub _accept_prepare_handler {
    my($self, $listen_host_r, $listen_port_r) = @_;

    return sub {
        my ( $fh, $host, $port ) = @_;
        DEBUG && warn "Listening on $host:$port\n";
        $$listen_host_r = $host;
        $$listen_port_r = $port;
        $self->{server_ready}->({
            host => $host,
            port => $port,
            server_software => 'Twiggy',
        }) if $self->{server_ready};

        return $self->{backlog} || 0;
    };
}

sub _accept_handler {
    my ( $self, $app, $is_tcp, $listen_host_r, $listen_port_r ) = @_;

    return sub {
        my ( $sock, $peer_host, $peer_port ) = @_;

        DEBUG && warn "$sock Accepted connection from $peer_host:$peer_port\n";
        return unless $sock;
        $self->{exit_guard}->begin;

        if ( $is_tcp && $self->{no_delay} ) {
            setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, 1)
                or die "setsockopt(TCP_NODELAY) failed:$!";
        }

        my $headers = "";
        my $try_parse = sub {
            if ( $self->_try_read_headers($sock, $headers) ) {
                my $env = {
                    SERVER_NAME         => $$listen_host_r,
                    SERVER_PORT         => $$listen_port_r,
                    SCRIPT_NAME         => '',
                    REMOTE_ADDR         => $peer_host,
                    'psgi.version'      => [ 1, 0 ],
                    'psgi.errors'       => *STDERR,
                    'psgi.url_scheme'   => 'http',
                    'psgi.nonblocking'  => Plack::Util::TRUE,
                    'psgi.streaming'    => Plack::Util::TRUE,
                    'psgi.run_once'     => Plack::Util::FALSE,
                    'psgi.multithread'  => Plack::Util::FALSE,
                    'psgi.multiprocess' => Plack::Util::FALSE,
                    'psgi.input'        => undef, # will be set by _run_app()
                    'psgix.io'          => $sock,
                    'psgix.input.buffered' => Plack::Util::TRUE,
                };

                my $reqlen = parse_http_request($headers, $env);
                DEBUG && warn "$sock Parsed HTTP headers: request length=$reqlen\n";

                if ( $reqlen < 0 ) {
                    die "bad request";
                } else {
                    return $env;
                }
            }

            return;
        };

        local $@;
        unless ( eval {
            if ( my $env = $try_parse->() ) {
                # the request data is already available, no need to parse more
                $self->_run_app($app, $env, $sock);
            } else {
                # there's not yet enough data to parse the request,
                # set up a watcher
                $self->_create_req_parsing_watcher( $sock, $try_parse, $app );
            };

            1;
        }) {
            my $disconnected = ($@ =~ /^client disconnected/);
            $self->_bad_request($sock, $disconnected);
        }
    };
}

# returns a closure that tries to parse
# this is not a method because it needs a buffer per socket
sub _try_read_headers {
    my ( $self, $sock, undef ) = @_;

    # FIXME add a timer to manage read timeouts
    local $/ = "\012";

    read_more: for my $headers ( $_[2] ) {
        if ( defined(my $line = <$sock>) ) {
            $headers .= $line;

            if ( $line eq "\015\012" or $line eq "\012" ) {
                # got an empty line, we're done reading the headers
                return 1;
            } else {
                # try to read more lines using buffered IO
                redo read_more;
            }
        } elsif ($! and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK ) {
            die $!;
        } elsif (!$!) {
            die "client disconnected";
        }
    }

    DEBUG && warn "$sock did not read to end of req, wait for more data to arrive\n";
    return;
}

sub _create_req_parsing_watcher {
    my ( $self, $sock, $try_parse, $app ) = @_;

    my $headers_io_watcher;

    my $timeout_timer = AE::timer $self->{timeout}, 0, sub {
        DEBUG && warn "$sock Timeout\n";
        undef $headers_io_watcher;
        undef $try_parse;
        undef $sock;
    } if $self->{timeout};

    $headers_io_watcher = AE::io $sock, 0, sub {
        try {
            if ( my $env = $try_parse->() ) {
                undef $headers_io_watcher;
                undef $timeout_timer;
                $self->_run_app($app, $env, $sock);
            }
        } catch {
            undef $headers_io_watcher;
            undef $timeout_timer;
            my $disconnected = /^client disconnected/;
            $self->_bad_request($sock, $disconnected);
        }
    };
}

sub _bad_request {
    my ( $self, $sock, $disconnected ) = @_;

    return unless defined $sock and defined fileno $sock;

    my $response = [
        400,
        [ 'Content-Type' => 'text/plain' ],
        [ ],
    ];

    # if client is already gone, don't try to write to it
    $response = [] if $disconnected;

    $self->_write_psgi_response($sock, $response);

    return;
}

sub _read_chunk {
    my ($self, $sock, $remaining, $cb) = @_;

    my $data = '';
    my $read_chunk_size = $self->{read_chunk_size};

    my $try_read = sub {
        READ_MORE: {
            my $read_size = $remaining > $read_chunk_size ? $read_chunk_size : $remaining;
            my $rlen = read($sock, $data, $read_size, length($data));

            if (defined $rlen and $rlen > 0) {
                $remaining -= $rlen;

                if ($remaining <= 0) {
                    $cb->($data);
                    return 1;
                } else {
                    redo READ_MORE;
                }
            } elsif (defined $rlen) {
                $cb->($data);
                return 1;
            } elsif ($! and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
                die $!;
            } elsif (!$!) {
                die "client disconnected";
            }
        }

        return;
    };

    unless ($try_read->()) {
        my $rw; $rw = AE::io($sock, 0, sub {
            try {
                if ($try_read->()) {
                    undef $rw;
                }
            } catch {
                undef $rw;
                $self->_bad_request($sock);
            };
        });
    }
}

sub _run_app {
    my($self, $app, $env, $sock) = @_;

    unless ($env->{'psgi.input'}) {
        if ($env->{CONTENT_LENGTH}) {
            $self->_read_chunk($sock, $env->{CONTENT_LENGTH}, sub {
                my ($data) = @_;
                open my $input, '<', \$data;
                $env->{'psgi.input'} = $input;
                $self->_run_app($app, $env, $sock);
            });
            return;
        } else {
            $env->{'psgi.input'} = $null_io;
        }
    }

    my $res = Plack::Util::run_app $app, $env;

    if ( ref $res eq 'ARRAY' ) {
        $self->_write_psgi_response($sock, $res);
    } elsif ( blessed($res) and $res->isa("AnyEvent::CondVar") ) {
        Carp::carp("Returning AnyEvent condvar is deprecated and will be removed in the next release of Twiggy. Use the streaming callback interface intstead.");
        $res->cb(sub { $self->_write_psgi_response($sock, shift->recv) });
    } elsif ( ref $res eq 'CODE' ) {
        my $created_writer;

        $res->(
            sub {
                my $res = shift;

                if ( @$res < 2 ) {
                    croak "Insufficient arguments";
                } elsif ( @$res == 2 ) {
                    my ( $status, $headers ) = @$res;

                    $self->_flush($sock);

                    my $writer = Twiggy::Writer->new($sock, $self->{exit_guard});
                    $created_writer = 1;

                    my $buf = $self->_format_headers($status, $headers);
                    $writer->write($$buf);

                    return $writer;
                } else {
                    my ( $status, $headers, $body, $post ) = @$res;
                    my $cv = $self->_write_psgi_response($sock, [ $status, $headers, $body ]);
                    $cv->cb(sub { $post->() }) if $post;
                }
            },
            $sock,
        );

        if($created_writer) {
            $self->{exit_guard}->end; # normally _write_psgi_response calls this, but it doesn't get called when we use a writer!
        }
    } else {
        croak("Unknown response type: $res");
    }
}

sub _write_psgi_response {
    my ( $self, $sock, $res ) = @_;

    if ( ref $res eq 'ARRAY' ) {
        if ( scalar @$res == 0 ) {
            # no response
            $self->{exit_guard}->end;
            return;
        }

        my ( $status, $headers, $body ) = @$res;

        my $cv = AE::cv;

        $self->_write_headers( $sock, $status, $headers )->cb(sub {
            local $@;
            if ( eval { $_[0]->recv; 1 } ) {
                $self->_write_body($sock, $body)->cb(sub {
                    shutdown $sock, 1;
                    close $sock;
                    $self->{exit_guard}->end;
                    local $@;
                    eval { $cv->send($_[0]->recv); 1 } or $cv->croak($@);
                });
            } else {
                $self->{exit_guard}->end;
                eval { $cv->send($_[0]->recv); 1 } or $cv->croak($@);
            }
        });

        return $cv;
    } else {
        no warnings 'uninitialized';
        warn "Unknown response type: $res";
        return $self->_write_psgi_response($sock, [ 204, [], [] ]);
    }
}

sub _write_headers {
    my ( $self, $sock, $status, $headers ) = @_;

    $self->_write_buf( $sock, $self->_format_headers($status, $headers) );
}

sub _format_headers {
    my ( $self, $status, $headers ) = @_;

    my $hdr = sprintf "HTTP/1.0 %d %s\015\012", $status, HTTP::Status::status_message($status);

    my $i = 0;

    my @delim = ("\015\012", ": ");

    foreach my $str ( @$headers ) {
        $hdr .= $str . $delim[++$i % 2];
    }

    $hdr .= "\015\012";

    return \$hdr;
}

# this flushes just the output buffer, not the input buffer (unlike
# $handle->flush)
sub _flush {
	my ( $self, $sock ) = @_;

    local $| = 1;
    print $sock '';
}

# helper routine, similar to push write, but respects buffering, and refcounts
# itself
sub _write_buf {
    my($self, $socket, $data) = @_;

    no warnings 'uninitialized';

    # try writing immediately
    if ( (my $written = syswrite($socket, $$data)) < length($$data) ) {
        my $done = defined(wantarray) && AE::cv;

        # either the write failed or was incomplete

        if ( !defined($written) and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
            # a real write error occured, like EPIPE
            $done->croak($!) if $done;
            return $done;
        }

        # the write was either incomplete or a non fatal error occured, so we
        # need to set up an IO watcher to wait until we can properly write

        my $length = length($$data);

        my $write_watcher;
        $write_watcher = AE::io $socket, 1, sub {
            write_more: {
                my $out = syswrite($socket, $$data, $length - $written, $written);

                if ( defined($out) ) {
                    $written += $out;

                    if ( $written == $length ) {
                        undef $write_watcher;
                        $done->send(1) if $done;
                    } else {
                        redo write_more;
                    }
                } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
                    $done->croak($!) if $done;
                    undef $write_watcher;
                }
            }
        };

        return $done;
    } elsif ( defined wantarray ) {
        my $done = AE::cv;
        $done->send(1);
        return $done;
    }
}

sub _write_body {
    my ( $self, $sock, $body ) = @_;

    if ( ref $body eq 'ARRAY' ) {
        my $buf = join "", @$body;
        return $self->_write_buf($sock, \$buf);
    } elsif ( Plack::Util::is_real_fh($body) ) {
        # real handles use nonblocking IO
        # either AIO or using watchers, with sendfile or with copying IO
        return $self->_write_real_fh($sock, $body);
    } elsif ( blessed($body) and $body->can("string_ref") ) {
        # optimize IO::String to not use its incredibly slow getline
        if ( my $pos = $body->tell ) {
            my $str = substr ${ $body->string_ref }, $pos;
            return $self->_write_buf($sock, \$str);
        } else {
            return $self->_write_buf($sock, $body->string_ref);
        }
    } else {
        return $self->_write_fh($sock, $body);
    }
}

# like Plack::Util::foreach, but nonblocking on the output
# handle
sub _write_fh {
    my ( $self, $sock, $body ) = @_;

    my $handle = AnyEvent::Handle->new( fh => $sock );
    my $ret = AE::cv;

    $handle->on_error(sub {
        my $err = $_[2];
        $handle->destroy;
        $ret->send($err);
    });

    no warnings 'recursion';
    $handle->on_drain( $self->_drain($body, $ret) );

    return $ret;
}

sub _drain {
    my ($self, $body, $ret) = @_;
    return sub {
        my $handle = shift;
        local $/ = \ $self->{read_chunk_size};
        if ( defined( my $buf = $body->getline ) ) {
            if (length($buf)) {
                $handle->push_write($buf);
            }
            else {
                # if on_drain is called and we don't do any
                # push_write, anyevent::handle thinks we are done.
                # this fails for the deflater mw, since one 4096 chunk
                # of getline might not generate a deflated packet yet,
                # which gets us an empty string here.
                return $self->_drain($body, $ret)->($handle);
            }
        } elsif ( $! ) {
            $ret->croak($!);
            $handle->destroy;
        } else {
            $body->close;
            $handle->on_drain(sub {
                shutdown $handle->fh, 1;
                $handle->destroy;
                $ret->send(1);
            });
        }

    }
}

# when the body handle is a real filehandle we use this routine, which is more
# careful not to block when reading the response too

# FIXME support only reading $length bytes from $body, instead of until EOF
# FIXME use len = 0 param to sendfile
# FIXME use Sys::Sendfile in nonblocking mode if AIO is not available
# FIXME test sendfile on non file backed handles
sub _write_real_fh {
    my ( $self, $sock, $body ) = @_;

    if ( HAS_AIO and -s $body ) {
        my $cv = AE::cv;
        my $offset = 0;
        my $length = -s $body;
        $sock->blocking(1);
        my $sendfile; $sendfile = sub {
            IO::AIO::aio_sendfile( $sock, $body, $offset, $length - $offset, sub {
                my $ret = shift;
                $offset += $ret if $ret > 0;
                if ($offset >= $length || ($ret == -1 && ! ($! == EAGAIN || $! == EINTR))) {
                    if ( $ret == -1 ) {
                        $cv->croak($!);
                    } else {
                        $cv->send(1);
                    }

                    undef $sendfile;
                    undef $sock;
                } else {
                    $sendfile->();
                }
            });
        };
        $sendfile->();
        return $cv;
    } else {
        return $self->_write_fh($sock, $body);
    }
}

sub run {
    my $self = shift;
    $self->register_service(@_);

    my $w; $w = AE::signal QUIT => sub { $self->{exit_guard}->end; undef $w };
    $self->{exit_guard}->recv;
}

package Twiggy::Writer;
use AnyEvent::Handle;

sub new {
    my ( $class, $socket, $exit ) = @_;

    $exit->begin if $exit;

    bless { handle => AnyEvent::Handle->new( fh => $socket ), exit_guard => $exit }, $class;
}

sub write { $_[0]{handle}->push_write($_[1]) }

sub close {
    my $self = shift;

    my $exit_guard = delete $self->{exit_guard};
    $exit_guard->end if $exit_guard;

    my $handle = delete $self->{handle};
    if ($handle) {
        $handle->on_drain;
        $handle->on_error;

        $handle->on_drain(sub {
            shutdown $_[0]->fh, 1;
            $_[0]->destroy;
            undef $handle;
        });
    }
}

sub DESTROY { $_[0]->close }

package Twiggy::Server;

1;
__END__