The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Monoceros::Server;

use strict;
use warnings;
use base qw/Plack::Handler::Starlet/;
use IO::Socket;
use IO::Select;
use IO::FDPass;
use Parallel::Prefork;
use AnyEvent;
use AnyEvent::Util qw(fh_nonblocking);
use Digest::MD5 qw/md5_hex/;
use Time::HiRes qw/time/;
use Carp ();
use Plack::TempBuffer;
use Plack::Util;
use Plack::HTTPParser qw( parse_http_request );
use POSIX qw(EINTR EAGAIN EWOULDBLOCK :sys_wait_h);
use Socket qw(IPPROTO_TCP TCP_NODELAY);

use constant WRITER => 0;
use constant READER => 1;

use constant S_SOCK => 0;
use constant S_TIME => 1;
use constant S_REQS => 2;
use constant S_IDLE => 3;

use constant MAX_REQUEST_SIZE => 131072;
my $null_io = do { open my $io, "<", \""; $io };

sub new {
    my $class = shift;
    my %args = @_;

    # setup before instantiation
    my $listen_sock;
    if (defined $ENV{SERVER_STARTER_PORT}) {
        my ($hostport, $fd) = %{Server::Starter::server_ports()};
        if ($hostport =~ /(.*):(\d+)/) {
            $args{host} = $1;
            $args{port} = $2;
        } else {
            $args{port} = $hostport;
        }
        $listen_sock = IO::Socket::INET->new(
            Proto => 'tcp',
        ) or die "failed to create socket:$!";
        $listen_sock->fdopen($fd, 'w')
            or die "failed to bind to listening socket:$!";
    }
    my $max_workers = 5;
    for (qw(max_workers workers)) {
        $max_workers = delete $args{$_}
            if defined $args{$_};
    }

    my $self = bless {
        host                 => $args{host} || 0,
        port                 => $args{port} || 8080,
        max_workers          => $max_workers,
        timeout              => $args{timeout} || 300,
        keepalive_timeout    => $args{keepalive_timeout} || 10,
        max_keepalive_reqs   => $args{max_keepalive_reqs} || 100,
        server_software      => $args{server_software} || $class,
        server_ready         => $args{server_ready} || sub {},
        min_reqs_per_child   => (
            defined $args{min_reqs_per_child}
                ? $args{min_reqs_per_child} : undef,
        ),
        max_reqs_per_child   => (
            $args{max_reqs_per_child} || $args{max_requests} || 100,
        ),
        err_respawn_interval => (
            defined $args{err_respawn_interval}
                ? $args{err_respawn_interval} : undef,
        ),
        _using_defer_accept  => 0,
        listen_sock => ( defined $listen_sock ? $listen_sock : undef),
    }, $class;

    $self;
}

sub run {
    my ($self, $app) = @_;
    $self->setup_listener();
    $self->setup_sockpair();
    $self->run_workers($app);
}

sub setup_sockpair {
    my $self = shift;
    my @worker_pipe = IO::Socket->socketpair(AF_UNIX, SOCK_STREAM, 0)
        or die "failed to create socketpair: $!";
    $self->{worker_pipe} = \@worker_pipe; 

    my @defer_pipe = IO::Socket->socketpair(AF_UNIX, SOCK_STREAM, 0)
        or die "failed to create socketpair: $!";
    $self->{defer_pipe} = \@defer_pipe; 

    my @lstn_pipe = IO::Socket->socketpair(AF_UNIX, SOCK_STREAM, 0)
            or die "failed to create socketpair: $!";    
    $self->{lstn_pipe} = \@lstn_pipe;

    1;
}

sub run_workers {
    my ($self,$app) = @_;
    local $SIG{PIPE} = 'IGNORE';    
    my $pid = fork;  
    if ( $pid ) {
        #parent
        $self->connection_manager($pid);
    }
    elsif ( defined $pid ) {
        $self->request_worker($app);
        exit;
    }
    else {
        die "failed fork:$!";
    }
}

sub queued_fdsend {
    my $self = shift;
    my $info = shift;

    $info->[S_IDLE] = 0; #no-idle

    $self->{fdsend_queue} ||= [];
    push @{$self->{fdsend_queue}},  $info;
    $self->{fdsend_worker} ||= AE::io $self->{lstn_pipe}[WRITER], 1, sub {
        do {
            if ( !$self->{fdsend_queue}[0][S_SOCK] ) {
                shift @{$self->{fdsend_queue}};
                next;
            }
            if ( ! IO::FDPass::send(fileno $self->{lstn_pipe}[WRITER], fileno $self->{fdsend_queue}[0][S_SOCK] ) ) {
                return if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK;
                undef $self->{fdsend_worker};
                die "unable to pass file handle: $!"; 
            }
            shift @{$self->{fdsend_queue}};
        } while @{$self->{fdsend_queue}};
        undef $self->{fdsend_worker};
    };

    1;
}

sub connection_manager {
    my ($self, $worker_pid) = @_;

    $self->{lstn_pipe}[READER]->close;
    fh_nonblocking $self->{lstn_pipe}[WRITER], 1;
    $self->{worker_pipe}->[WRITER]->close;
    fh_nonblocking $self->{worker_pipe}->[READER], 1;
    $self->{defer_pipe}->[WRITER]->close;
    fh_nonblocking $self->{defer_pipe}->[READER], 1;
    fh_nonblocking $self->{listen_sock}, 1;

    my %manager;
    my %sockets;
    my $term_received = 0;
    my %wait_read;

    my $cv = AE::cv;
    my $close_all = 0;
    my $sig2;$sig2 = AE::signal 'USR1', sub {
        my $t;$t = AE::timer 0, 1, sub {
            return unless $close_all;
            undef $t;
            kill 'TERM', $worker_pid;
            my $t2;$t2 = AE::timer 0, 1, sub {
                my $kid = waitpid($worker_pid, WNOHANG);
                return if $kid >= 0;
                undef $t2;
                $cv->send;
            };
        };
    };
    my $sig;$sig = AE::signal 'TERM', sub {
        $term_received++;
        kill 'USR1', $worker_pid; #stop accept
        my $t;$t = AE::timer 0, 1, sub {
            my $time = time;
            use Data::Dumper;
            return if keys %sockets;
            undef $t;
            $close_all=1;
        };
    };

    $manager{disconnect_keepalive_timeout} = AE::timer 0, 1, sub {
        my $time = time;
        for my $key ( keys %sockets ) {
            if ( !$sockets{$key}->[S_IDLE] && (!$sockets{$key}->[S_SOCK] || !$sockets{$key}->[S_SOCK]->connected()) ) {
                delete $wait_read{$key};
                delete $sockets{$key};
            }
            elsif ( $sockets{$key}->[S_IDLE] && $sockets{$key}->[S_REQS] == 0 
                     && $time - $sockets{$key}->[1] > $self->{timeout} ) { #idle && first req 
                delete $wait_read{$key};
                delete $sockets{$key};
            }
            elsif ( $sockets{$key}->[S_IDLE] && $sockets{$key}->[S_REQS] > 0 &&
                     $time - $sockets{$key}->[1] > $self->{keepalive_timeout} ) { #idle && keepalive
                delete $wait_read{$key};
                delete $sockets{$key};
            }
        }
    };
    
    if ( $self->{_using_defer_accept} ) {
        $manager{defer_listener} = AE::io $self->{defer_pipe}->[READER], 0, sub {
            my @fd;
            D_PIPE_READ: for (1..$self->{max_workers}) {
                my $fd = IO::FDPass::recv($self->{defer_pipe}->[READER]->fileno);
                last D_PIPE_READ if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK;
                next if $fd < 0;
                push @fd, $fd;
            }
            for my $fd ( @fd ) {
                my $fh = IO::Socket::INET->new_from_fd($fd,'r+')
                    or die "unable to convert file descriptor to handle: $!";
                my $peername = $fh->peername;
                next unless $peername;
                my $remote = md5_hex($peername);
                $sockets{$remote} = [$fh,time,1,1];  #fh,time,reqs,idle
                $wait_read{$remote} = AE::io $sockets{$remote}->[S_SOCK], 0, sub {
                    undef $wait_read{$remote};
                    if ( !$sockets{$remote}->[S_SOCK] || !$sockets{$remote}->[S_SOCK]->connected()) {
                        delete $sockets{$remote};
                        return;
                    }
                    $self->queued_fdsend($sockets{$remote});
                };
            }
        };
    }
    else {
        $manager{main_listener} = AE::io $self->{listen_sock}, 0, sub {
            L_SOCK_READ: for (1..$self->{max_workers}) {
                return if $term_received;
                my ($fh,$peer) = $self->{listen_sock}->accept;
                last L_SOCK_READ if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK;
                next unless $fh;
                my $remote = md5_hex($peer);
                $sockets{$remote} = [$fh,time,0,1];  #fh,time,reqs,idle
                fh_nonblocking $fh, 1
                    or die "failed to set socket to nonblocking mode:$!";
                setsockopt($fh, IPPROTO_TCP, TCP_NODELAY, 1)
                    or die "setsockopt(TCP_NODELAY) failed:$!";
                $wait_read{$remote} = AE::io $fh, 0, sub {
                    undef $wait_read{$remote};
                    if ( !$sockets{$remote}->[S_SOCK] || !$sockets{$remote}->[S_SOCK]->connected()) {
                        delete $sockets{$remote};
                        return;
                    }
                    $self->queued_fdsend($sockets{$remote});
                };
            }
        };
    }
 
   my $pipe_buf = '';
    $manager{worker_listener} = AE::io $self->{worker_pipe}->[READER], 0, sub {
        my @keep;
        PIPE_READ: for (1..$self->{max_workers}) {
            my $len = $self->{worker_pipe}->[READER]->sysread($pipe_buf, 10240, length($pipe_buf));
            last PIPE_READ if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK;
            BUF_READ: while ( length $pipe_buf ) {
                my $string = substr $pipe_buf, 0, 37, '';
                my ($method,$remote) = split / /,$string, 2;
                next BUF_READ unless exists $sockets{$remote};
                if ( $method eq 'exit' ) {
                    $sockets{$remote}->[S_IDLE] = 1; #idle
                    delete $sockets{$remote};
                } elsif ( $method eq 'keep') {
                    push @keep, $remote;
                }
                last BUF_READ if length $pipe_buf < 37;
            }
        }

        my $time = time;
        for my $remote ( @keep ) {
            $sockets{$remote}->[S_TIME] = $time; #time
            $sockets{$remote}->[S_REQS]++; #reqs
            $sockets{$remote}->[S_IDLE] = 1; #idle
            $wait_read{$remote} = AE::io $sockets{$remote}->[S_SOCK], 0, sub {
                undef $wait_read{$remote};
                if ( !$sockets{$remote}->[S_SOCK] || !$sockets{$remote}->[S_SOCK]->connected()) {
                    delete $sockets{$remote};
                    return;
                }
                $self->queued_fdsend($sockets{$remote});
            };
        }

    };

    $cv->recv;
}

sub request_worker {
    my ($self,$app) = @_;

    if ( $self->{_using_defer_accept} ) {
        $self->{listen_sock}->blocking(0);
    }
    else {
        $self->{listen_sock}->close;
    }

    $self->{worker_pipe}->[READER]->close;
    $self->{defer_pipe}->[READER]->close;
    $self->{lstn_pipe}[WRITER]->close;
    $self->{lstn_pipe}[READER]->blocking(0);

    # use Parallel::Prefork
    my %pm_args = (
        max_workers => $self->{max_workers},
        trap_signals => {
            TERM => 'TERM',
            HUP  => 'TERM',
            USR1 => 'USR1',
        },
    );
    if (defined $self->{err_respawn_interval}) {
        $pm_args{err_respawn_interval} = $self->{err_respawn_interval};
    }

    my $pm = Parallel::Prefork->new(\%pm_args);

    while ($pm->signal_received !~ /^(?:TERM|USR1)$/) {
        $pm->start(sub {
            srand();
            my %sys_fileno;
            my $select = IO::Select->new();
            $sys_fileno{$self->{lstn_pipe}[READER]->fileno} = 1;
            $select->add($self->{lstn_pipe}[READER]);
            if ( $self->{_using_defer_accept} ) {
                $sys_fileno{$self->{listen_sock}->fileno} = 1;
                $select->add($self->{listen_sock});
            }

            my $max_reqs_per_child = $self->_calc_reqs_per_child();
            my $proc_req_count = 0;
            
            $self->{term_received} = 0;
            $self->{stop_accept} = 0;
            local $SIG{TERM} = sub {
                $self->{term_received}++;
                exit 0 if $self->{term_received} > 1;
            };
            local $SIG{USR1} = sub {
                $select->remove($self->{listen_sock});
                $self->{stop_accept}++;
            };
            local $SIG{PIPE} = 'IGNORE';

            my $next_conn;

            while ( $self->{stop_accept} || $proc_req_count < $max_reqs_per_child ) {
                last if $self->{term_received};
                my $conn;
                if ( $next_conn ) {
                    $conn = $next_conn;
                    $next_conn = undef;
                }
                else {
                    my @can_read = $select->can_read(1);
                    if ( !@can_read ) {
                        next;
                    }
                    for (@can_read) {
                        if ( ! exists $sys_fileno{$_->fileno} ) {
                            $select->remove($_);
                        }
                    }
                    $conn = $self->accept_or_recv( grep { exists $sys_fileno{$_->fileno} } @can_read );
                }
                next unless $conn;
                
                ++$proc_req_count;
                my ($peerport,$peerhost) = unpack_sockaddr_in $conn->{peername};
                my $remote = md5_hex($conn->{peername});
                my $env = {
                    SERVER_PORT => $self->{port},
                    SERVER_NAME => $self->{host},
                    SCRIPT_NAME => '',
                    REMOTE_ADDR => inet_ntoa($peerhost),
                    REMOTE_PORT => $peerport,
                    'psgi.version' => [ 1, 1 ],
                    'psgi.errors'  => *STDERR,
                    'psgi.url_scheme' => 'http',
                    'psgi.run_once'     => Plack::Util::FALSE,
                    'psgi.multithread'  => Plack::Util::FALSE,
                    'psgi.multiprocess' => Plack::Util::TRUE,
                    'psgi.streaming'    => Plack::Util::TRUE,
                    'psgi.nonblocking'  => Plack::Util::FALSE,
                    'psgix.input.buffered' => Plack::Util::TRUE,
                    'psgix.io'          => $conn->{fh},
                };
                $self->{_is_deferred_accept} = 1; #ready to read
                my $prebuf;
                if ( exists $conn->{buf} ) {
                    $prebuf = delete $conn->{buf};
                }
                elsif ( $conn->{direct} ) {
                    my $ret = $conn->{fh}->sysread($prebuf, MAX_REQUEST_SIZE);
                    if ( ! defined $ret && ($! == EAGAIN || $! == EWOULDBLOCK) ) {
                        $select->add($conn->{fh});
                        IO::FDPass::send($self->{defer_pipe}->[WRITER]->fileno, $conn->{fn}) or die $!;
                        next;
                    }
                }
                my $may_keepalive = ($self->{term_received} == 0 && $self->{stop_accept} == 0 ) ? 1 : 0;
                                         
                my $is_keepalive = 1; # to use "keepalive_timeout" in handle_connection, 
                                      #  treat every connection as keepalive
                
                my $keepalive = $self->handle_connection($env, $conn->{fh}, $app, 
                                                         $may_keepalive, $is_keepalive, $prebuf);
                if ( !$keepalive ) {
                    $self->{worker_pipe}->[WRITER]->syswrite("exit $remote") unless $conn->{direct};
                    next;
                }

                # read fowrard
                if ( $select->count() <= scalar(keys  %sys_fileno) + $self->{max_workers} 
                         && $proc_req_count < $max_reqs_per_child) {
                    $next_conn = $self->accept_or_recv( 
                        $self->{_using_defer_accept} ? ($self->{lstn_pipe}[READER], $self->{listen_sock}) : ($self->{lstn_pipe}[READER])
                    );
                    if ( ! $next_conn ) {
                        my $ret = $conn->{fh}->sysread(my $buf, MAX_REQUEST_SIZE);
                        # readed next req
                        if ( defined $ret && $ret > 0 ) {
                            $next_conn = $conn;
                            $next_conn->{buf} = $buf;
                        }
                    }
                }
                # wait if !next_conn and ! defined next_buf
                if ( !$next_conn  || ( $next_conn && $next_conn->{fn} != $conn->{fn}) ) {
                    if ( $conn->{direct} ) {
                        $select->add($conn->{fh});
                        IO::FDPass::send($self->{defer_pipe}->[WRITER]->fileno, $conn->{fn});
                    }
                    else {
                        $self->{worker_pipe}->[WRITER]->syswrite("keep $remote");
                    }
                }
            }

            while (1) {
                my @can_read = $select->can_read(1);
                for (@can_read){
                    if ( ! exists $sys_fileno{$_->fileno} ) {
                        $select->remove($_);
                    }
                }
                last if $select->count <= scalar(keys  %sys_fileno);
            }
            
        });
    }
    local $SIG{TERM} = sub {
        $pm->signal_all_children('TERM');
    };
    kill 'USR1', getppid();
    $pm->wait_all_children;
    exit;
}

sub accept_or_recv {
    my $self = shift;
    my @for_read = @_;
    my $conn;
    for my $pipe_or_sock ( @for_read ) {
        if ( $self->{_using_defer_accept} && $pipe_or_sock->fileno == $self->{listen_sock}->fileno ) {
            my ($fh,$peer) = $self->{listen_sock}->accept;
            next unless $fh;
            $fh->blocking(0);
            setsockopt($fh, IPPROTO_TCP, TCP_NODELAY, 1)
                or die "setsockopt(TCP_NODELAY) failed:$!";
            $conn = {
                fh => $fh,
                fn => $fh->fileno,
                peername => $peer,
                direct => 1,
                reqs => 0,
            };
            last;
        }
        elsif ( $pipe_or_sock->fileno == $self->{lstn_pipe}[READER]->fileno ) {
            my $fd = IO::FDPass::recv(fileno $pipe_or_sock);
            if ( $fd >= 0 ) {
                my $fh = IO::Socket::INET->new_from_fd($fd,'r+')
                    or die "unable to convert file descriptor to handle: $!";
                $conn = {
                    fh => $fh,
                    fn => $fh->fileno,
                    peername => $fh->peername,
                    direct => 0,
                    reqs => 0,
                };
                last;
            }
        }
    }
    return unless $conn;
    return unless $conn->{fh};
    return unless $conn->{peername};
    $conn;
}

sub handle_connection {
    my($self, $env, $conn, $app, $use_keepalive, $is_keepalive, $prebuf) = @_;
    
    my $buf = '';
    my $res = [ 400, [ 'Content-Type' => 'text/plain' ], [ 'Bad Request' ] ];
    
    while (1) {
        my $rlen;
        if ( defined $prebuf ) {
            $rlen = length $prebuf;
            $buf = $prebuf;
            undef $prebuf;
        }
        else {
            $rlen = $self->read_timeout(
                $conn, \$buf, MAX_REQUEST_SIZE - length($buf), length($buf),
                $is_keepalive ? $self->{keepalive_timeout} : $self->{timeout},
            ) or return;
        }

        my $reqlen = parse_http_request($buf, $env);
        if ($reqlen >= 0) {
            # handle request
            if ($use_keepalive) {
                if (my $c = $env->{HTTP_CONNECTION}) {
                    $use_keepalive = undef
                        unless $c =~ /^\s*keep-alive\s*/i;
                } else {
                    $use_keepalive = undef;
                }
            }
            $buf = substr $buf, $reqlen;
            if (my $cl = $env->{CONTENT_LENGTH}) {
                my $buffer = Plack::TempBuffer->new($cl);
                while ($cl > 0) {
                    my $chunk;
                    if (length $buf) {
                        $chunk = $buf;
                        $buf = '';
                    } else {
                        $self->read_timeout(
                            $conn, \$chunk, $cl, 0, $self->{timeout})
                            or return;
                    }
                    $buffer->print($chunk);
                    $cl -= length $chunk;
                }
                $env->{'psgi.input'} = $buffer->rewind;
            } else {
                $env->{'psgi.input'} = $null_io;
            }
            $res = Plack::Util::run_app $app, $env;
            last;
        }
        if ($reqlen == -2) {
            # request is incomplete, do nothing
        } elsif ($reqlen == -1) {
            # error, close conn
            last;
        }
    }

    if (ref $res eq 'ARRAY') {
        $self->_handle_response($res, $conn, \$use_keepalive);
    } elsif (ref $res eq 'CODE') {
        $res->(sub {
            $self->_handle_response($_[0], $conn, \$use_keepalive);
        });
    } else {
        die "Bad response $res";
    }

    return $use_keepalive;
}

1;