The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#include "EVAPI.h"
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <errno.h>
#include <ctype.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <sys/uio.h>

#include "ppport.h"


///////////////////////////////////////////////////////////////
// "Compile Time Options" - See Feersum.pm POD for information

#define MAX_HEADERS 64
#define MAX_HEADER_NAME_LEN 128
#define MAX_BODY_LEN 2147483647

#define READ_BUFSZ 4096
#define READ_INIT_FACTOR 2
#define READ_GROW_FACTOR 8

#define AUTOCORK_WRITES 1

#if 0
# define FLASH_SOCKET_POLICY_SUPPORT
#endif

#ifndef FLASH_SOCKET_POLICY
# define FLASH_SOCKET_POLICY "<?xml version=\"1.0\"?>\n<!DOCTYPE cross-domain-policy SYSTEM \"/xml/dtds/cross-domain-policy.dtd\">\n<cross-domain-policy>\n<site-control permitted-cross-domain-policies=\"master-only\"/>\n<allow-access-from domain=\"*\" to-ports=\"*\" secure=\"false\"/>\n</cross-domain-policy>\n"
#endif

// may be lower for your platform (e.g. Solaris is 16).  See POD.
#define FEERSUM_IOMATRIX_SIZE 64

// auto-detected in Makefile.PL by perl versions and ithread usage; override
// that here. See POD for details.
#if 0
# undef FEERSUM_STEAL
#endif

///////////////////////////////////////////////////////////////


#ifdef __GNUC__
# define likely(x)   __builtin_expect(!!(x), 1)
# define unlikely(x) __builtin_expect(!!(x), 0)
#else
# define likely(x)   (x)
# define unlikely(x) (x)
#endif

#ifndef CRLF
#define CRLF "\015\012"
#endif
#define CRLFx2 CRLF CRLF

// make darwin, solaris and bsd happy:
#ifndef SOL_TCP
 #define SOL_TCP IPPROTO_TCP
#endif

// Wish-list: %z formats for perl sprintf.  Would make compiling a lot less
// noisy for systems that warn size_t and STRLEN are incompatible with
// %d/%u/%x.
#if Size_t_size == LONGSIZE
# define Sz_f "l"
# define Sz_t long
#elif Size_t_size == 8 && defined HAS_QUAD && QUADKIND == QUAD_IS_LONG_LONG
# define Sz_f "ll"
# define Sz_t long long
#else
// hope "int" works.
# define Sz_f ""
# define Sz_t int
#endif

#define Sz_uf Sz_f"u"
#define Sz_xf Sz_f"x"
#define Ssz_df Sz_f"d"
#define Sz unsigned Sz_t
#define Ssz Sz_t

#define WARN_PREFIX "Feersum: "

#ifndef DEBUG
 #ifndef __inline
  #define __inline
 #endif
 #define INLINE_UNLESS_DEBUG __inline
#else
 #define INLINE_UNLESS_DEBUG
#endif

#define trouble(f_, ...) warn(WARN_PREFIX f_, ##__VA_ARGS__);

#ifdef DEBUG
#define trace(f_, ...) warn("%s:%-4d [%d] " f_, __FILE__, __LINE__, (int)getpid(), ##__VA_ARGS__)
#else
#define trace(...)
#endif

#if DEBUG >= 2
#define trace2(f_, ...) trace(f_, ##__VA_ARGS__)
#else
#define trace2(...)
#endif

#if DEBUG >= 3
#define trace3(f_, ...) trace(f_, ##__VA_ARGS__)
#else
#define trace3(...)
#endif

#include "picohttpparser-git/picohttpparser.c"
#include "rinq.c"

// Check FEERSUM_IOMATRIX_SIZE against what's actually usable on this
// platform. See Feersum.pm for an explanation
#if defined(IOV_MAX) && FEERSUM_IOMATRIX_SIZE > IOV_MAX
# undef FEERSUM_IOMATRIX_SIZE
# define FEERSUM_IOMATRIX_SIZE IOV_MAX
#elif defined(UIO_MAXIOV) && FEERSUM_IOMATRIX_SIZE > UIO_MAXIOV
# undef FEERSUM_IOMATRIX_SIZE
# define FEERSUM_IOMATRIX_SIZE UIO_MAXIOV
#endif

struct iomatrix {
    unsigned offset;
    unsigned count;
    struct iovec iov[FEERSUM_IOMATRIX_SIZE];
    SV *sv[FEERSUM_IOMATRIX_SIZE];
};

struct feer_req {
    SV *buf;
    const char* method;
    size_t method_len;
    const char* path;
    size_t path_len;
    int minor_version;
    size_t num_headers;
    struct phr_header headers[MAX_HEADERS];
};

enum feer_respond_state {
    RESPOND_NOT_STARTED = 0,
    RESPOND_NORMAL = 1,
    RESPOND_STREAMING = 2,
    RESPOND_SHUTDOWN = 3
};
#define RESPOND_STR(_n,_s) do { \
    switch(_n) { \
    case RESPOND_NOT_STARTED: _s = "NOT_STARTED(0)"; break; \
    case RESPOND_NORMAL:      _s = "NORMAL(1)"; break; \
    case RESPOND_STREAMING:   _s = "STREAMING(2)"; break; \
    case RESPOND_SHUTDOWN:    _s = "SHUTDOWN(4)"; break; \
    } \
} while (0)

enum feer_receive_state {
    RECEIVE_HEADERS = 0,
    RECEIVE_BODY = 1,
    RECEIVE_STREAMING = 2,
    RECEIVE_SHUTDOWN = 3
};
#define RECEIVE_STR(_n,_s) do { \
    switch(_n) { \
    case RECEIVE_HEADERS:   _s = "HEADERS(0)"; break; \
    case RECEIVE_BODY:      _s = "BODY(1)"; break; \
    case RECEIVE_STREAMING: _s = "STREAMING(2)"; break; \
    case RECEIVE_SHUTDOWN:  _s = "SHUTDOWN(3)"; break; \
    } \
} while (0)

struct feer_conn {
    SV *self;
    int fd;
    struct sockaddr *sa;

    struct ev_io read_ev_io;
    struct ev_io write_ev_io;
    struct ev_timer read_ev_timer;

    SV *rbuf;
    struct rinq *wbuf_rinq;

    SV *poll_write_cb;
    SV *ext_guard;

    struct feer_req *req;
    ssize_t expected_cl;
    ssize_t received_cl;

    enum feer_respond_state responding;
    enum feer_receive_state receiving;

    int in_callback;
    int is_http11:1;
    int poll_write_cb_is_io_handle:1;
    int auto_cl:1;
};

typedef struct feer_conn feer_conn_handle; // for typemap

#define dCONN struct feer_conn *c = (struct feer_conn *)w->data
#define IsArrayRef(_x) (SvROK(_x) && SvTYPE(SvRV(_x)) == SVt_PVAV)
#define IsCodeRef(_x) (SvROK(_x) && SvTYPE(SvRV(_x)) == SVt_PVCV)

static HV* feersum_env(pTHX_ struct feer_conn *c);
static void feersum_start_response
    (pTHX_ struct feer_conn *c, SV *message, AV *headers, int streaming);
static size_t feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body);
static void feersum_handle_psgi_response(
    pTHX_ struct feer_conn *c, SV *ret, bool can_recurse);
static int feersum_close_handle(pTHX_ struct feer_conn *c, bool is_writer);
static SV* feersum_conn_guard(pTHX_ struct feer_conn *c, SV *guard);

static void start_read_watcher(struct feer_conn *c);
static void stop_read_watcher(struct feer_conn *c);
static void restart_read_timer(struct feer_conn *c);
static void stop_read_timer(struct feer_conn *c);
static void start_write_watcher(struct feer_conn *c);
static void stop_write_watcher(struct feer_conn *c);

static void try_conn_write(EV_P_ struct ev_io *w, int revents);
static void try_conn_read(EV_P_ struct ev_io *w, int revents);
static void conn_read_timeout(EV_P_ struct ev_timer *w, int revents);
static bool process_request_headers(struct feer_conn *c, int body_offset);
static void sched_request_callback(struct feer_conn *c);
static void call_died (pTHX_ struct feer_conn *c, const char *cb_type);
static void call_request_callback(struct feer_conn *c);
static void call_poll_callback (struct feer_conn *c, bool is_write);
static void pump_io_handle (struct feer_conn *c, SV *io);

static void conn_write_ready (struct feer_conn *c);
static void respond_with_server_error(struct feer_conn *c, const char *msg, STRLEN msg_len, int code);

static void update_wbuf_placeholder(struct feer_conn *c, SV *sv, struct iovec *iov);
static STRLEN add_sv_to_wbuf (struct feer_conn *c, SV *sv);
static STRLEN add_const_to_wbuf (struct feer_conn *c, const char *str, size_t str_len);
#define add_crlf_to_wbuf(c) add_const_to_wbuf(c,CRLF,2)
static void finish_wbuf (struct feer_conn *c);
static void add_chunk_sv_to_wbuf (struct feer_conn *c, SV *sv);
static void add_placeholder_to_wbuf (struct feer_conn *c, SV **sv, struct iovec **iov_ref);

static void uri_decode_sv (SV *sv);
static bool str_eq(const char *a, int a_len, const char *b, int b_len);
static bool str_case_eq(const char *a, int a_len, const char *b, int b_len);
static SV* fetch_av_normal (pTHX_ AV *av, I32 i);

static const char *http_code_to_msg (int code);
static int prep_socket (int fd, int is_tcp);

static HV *feer_stash, *feer_conn_stash;
static HV *feer_conn_reader_stash = NULL, *feer_conn_writer_stash = NULL;
static MGVTBL psgix_io_vtbl;

static SV *request_cb_cv = NULL;
static bool request_cb_is_psgi = 0;
static SV *shutdown_cb_cv = NULL;
static bool shutting_down = 0;
static int active_conns = 0;
static double read_timeout = 5.0;

static SV *feer_server_name = NULL;
static SV *feer_server_port = NULL;

static ev_io accept_w;
static ev_prepare ep;
static ev_check   ec;
struct ev_idle    ei;

static struct rinq *request_ready_rinq = NULL;

static AV *psgi_ver;
static SV *psgi_serv10, *psgi_serv11, *crlf_sv;

// TODO: make this thread-local if and when there are multiple C threads:
struct ev_loop *feersum_ev_loop = NULL;
static HV *feersum_tmpl_env = NULL;

INLINE_UNLESS_DEBUG
static SV*
fetch_av_normal (pTHX_ AV *av, I32 i)
{
    SV **elt = av_fetch(av, i, 0);
    if (elt == NULL) return NULL;
    SV *sv = *elt;
    // copy to remove magic
    if (unlikely(SvMAGICAL(sv))) sv = sv_2mortal(newSVsv(sv));
    if (unlikely(!SvOK(sv))) return NULL;
    // usually array ref elems aren't RVs (for PSGI anyway)
    if (unlikely(SvROK(sv))) sv = SvRV(sv);
    return sv;
}

INLINE_UNLESS_DEBUG
static struct iomatrix *
next_iomatrix (struct feer_conn *c)
{
    bool add_iomatrix = 0;
    struct iomatrix *m;

    if (!c->wbuf_rinq) {
        trace3("next_iomatrix(%d): head\n", c->fd);
        add_iomatrix = 1;
    }
    else {
        // get the tail-end struct
        m = (struct iomatrix *)c->wbuf_rinq->prev->ref;
        trace3("next_iomatrix(%d): tail, count=%d, offset=%d\n",
            c->fd, m->count, m->offset);
        if (m->count >= FEERSUM_IOMATRIX_SIZE) {
            add_iomatrix = 1;
        }
    }

    if (add_iomatrix) {
        trace3("next_iomatrix(%d): malloc\n", c->fd);
        Newx(m,1,struct iomatrix);
        Poison(m,1,struct iomatrix);
        m->offset = m->count = 0;
        rinq_push(&c->wbuf_rinq, m);
    }

    trace3("next_iomatrix(%d): end, count=%d, offset=%d\n",
        c->fd, m->count, m->offset);
    return m;
}

INLINE_UNLESS_DEBUG
static STRLEN
add_sv_to_wbuf(struct feer_conn *c, SV *sv)
{
    struct iomatrix *m = next_iomatrix(c);
    int idx = m->count++;
    STRLEN cur;
    if (unlikely(SvMAGICAL(sv))) {
        sv = newSVsv(sv); // copy to force it to be normal.
    }
    else if (unlikely(SvPADTMP(sv))) {
        // PADTMPs have their PVs re-used, so we can't simply keep a
        // reference.  TEMPs maybe behave in a similar way and are potentially
        // stealable.  If not stealing, we must make a copy.
#ifdef FEERSUM_STEAL
        if (SvFLAGS(sv) == (SVs_PADTMP|SVf_POK|SVp_POK)) {
            trace3("STEALING\n");
            SV *theif = newSV(0);
            sv_upgrade(theif, SVt_PV);

            SvPV_set(theif, SvPVX(sv));
            SvLEN_set(theif, SvLEN(sv));
            SvCUR_set(theif, SvCUR(sv));

            // make the temp null
            (void)SvOK_off(sv);
            SvPV_set(sv, NULL);
            SvLEN_set(sv, 0);
            SvCUR_set(sv, 0);

            SvFLAGS(theif) |= SVf_READONLY|SVf_POK|SVp_POK;

            sv = theif;
        }
        else {
            sv = newSVsv(sv);
        }
#else
        sv = newSVsv(sv);
#endif
    }
    else {
        sv = SvREFCNT_inc(sv);
    }

    m->iov[idx].iov_base = SvPV(sv, cur);
    m->iov[idx].iov_len = cur;
    m->sv[idx] = sv;

    return cur;
}

INLINE_UNLESS_DEBUG
static STRLEN
add_const_to_wbuf(struct feer_conn *c, const char *str, size_t str_len)
{
    struct iomatrix *m = next_iomatrix(c);
    int idx = m->count++;
    m->iov[idx].iov_base = (void*)str;
    m->iov[idx].iov_len = str_len;
    m->sv[idx] = NULL;
    return str_len;
}

INLINE_UNLESS_DEBUG
static void
add_placeholder_to_wbuf(struct feer_conn *c, SV **sv, struct iovec **iov_ref)
{
    struct iomatrix *m = next_iomatrix(c);
    int idx = m->count++;
    *sv = newSV(31);
    SvPOK_on(*sv);
    m->sv[idx] = *sv;
    *iov_ref = &m->iov[idx];
}

INLINE_UNLESS_DEBUG
static void
finish_wbuf(struct feer_conn *c)
{
    if (!c->is_http11) return; // nothing required
    add_const_to_wbuf(c, "0\r\n\r\n", 5); // terminating chunk
}

INLINE_UNLESS_DEBUG
static void
update_wbuf_placeholder(struct feer_conn *c, SV *sv, struct iovec *iov)
{
    STRLEN cur;
    // can't pass iov_len for cur; incompatible pointer type on some systems:
    iov->iov_base = SvPV(sv,cur);
    iov->iov_len = cur;
}

static void
add_chunk_sv_to_wbuf(struct feer_conn *c, SV *sv)
{
    SV *chunk;
    struct iovec *chunk_iov;
    add_placeholder_to_wbuf(c, &chunk, &chunk_iov);
    STRLEN cur = add_sv_to_wbuf(c, sv);
    add_crlf_to_wbuf(c);
    sv_setpvf(chunk, "%"Sz_xf CRLF, (Sz)cur);
    update_wbuf_placeholder(c, chunk, chunk_iov);
}

static const char *
http_code_to_msg (int code) {
    // http://en.wikipedia.org/wiki/List_of_HTTP_status_codes
    switch (code) {
        case 100: return "Continue";
        case 101: return "Switching Protocols";
        case 102: return "Processing"; // RFC 2518
        case 200: return "OK";
        case 201: return "Created";
        case 202: return "Accepted";
        case 203: return "Non Authoritative Information";
        case 204: return "No Content";
        case 205: return "Reset Content";
        case 206: return "Partial Content";
        case 207: return "Multi-Status"; // RFC 4918 (WebDav)
        case 300: return "Multiple Choices";
        case 301: return "Moved Permanently";
        case 302: return "Found";
        case 303: return "See Other";
        case 304: return "Not Modified";
        case 305: return "Use Proxy";
        case 307: return "Temporary Redirect";
        case 400: return "Bad Request";
        case 401: return "Unauthorized";
        case 402: return "Payment Required";
        case 403: return "Forbidden";
        case 404: return "Not Found";
        case 405: return "Method Not Allowed";
        case 406: return "Not Acceptable";
        case 407: return "Proxy Authentication Required";
        case 408: return "Request Timeout";
        case 409: return "Conflict";
        case 410: return "Gone";
        case 411: return "Length Required";
        case 412: return "Precondition Failed";
        case 413: return "Request Entity Too Large";
        case 414: return "Request URI Too Long";
        case 415: return "Unsupported Media Type";
        case 416: return "Requested Range Not Satisfiable";
        case 417: return "Expectation Failed";
        case 418: return "I'm a teapot";
        case 421: return "Too Many Connections"; // Microsoft?
        case 422: return "Unprocessable Entity"; // RFC 4918
        case 423: return "Locked"; // RFC 4918
        case 424: return "Failed Dependency"; // RFC 4918
        case 425: return "Unordered Collection"; // RFC 3648
        case 426: return "Upgrade Required"; // RFC 2817
        case 449: return "Retry With"; // Microsoft
        case 450: return "Blocked by Parental Controls"; // Microsoft
        case 500: return "Internal Server Error";
        case 501: return "Not Implemented";
        case 502: return "Bad Gateway";
        case 503: return "Service Unavailable";
        case 504: return "Gateway Timeout";
        case 505: return "HTTP Version Not Supported";
        case 506: return "Variant Also Negotiates"; // RFC 2295
        case 507: return "Insufficient Storage"; // RFC 4918
        case 509: return "Bandwidth Limit Exceeded"; // Apache mod
        case 510: return "Not Extended"; // RFC 2774
        case 530: return "User access denied"; // ??
        default: break;
    }

    // default to the Nxx group names in RFC 2616
    if (100 <= code && code <= 199) {
        return "Informational";
    }
    else if (200 <= code && code <= 299) {
        return "Success";
    }
    else if (300 <= code && code <= 399) {
        return "Redirection";
    }
    else if (400 <= code && code <= 499) {
        return "Client Error";
    }
    else {
        return "Error";
    }
}

static int
prep_socket(int fd, int is_tcp)
{
    int flags;

    // make it non-blocking
    flags = O_NONBLOCK;
    if (unlikely(fcntl(fd, F_SETFL, flags) < 0))
        return -1;

    if (likely(is_tcp)) {
        // flush writes immediately
        flags = 1;
        if (unlikely(setsockopt(fd, SOL_TCP, TCP_NODELAY, &flags, sizeof(int))))
            return -1;
    }

    // handle URG data inline
    flags = 1;
    if (unlikely(setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &flags, sizeof(int))))
        return -1;

    // disable lingering
    struct linger linger = { .l_onoff = 0, .l_linger = 0 };
    if (unlikely(setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger))))
        return -1;

    return 0;
}

INLINE_UNLESS_DEBUG static void
safe_close_conn(struct feer_conn *c, const char *where)
{
    if (unlikely(c->fd < 0))
        return;

    // make it blocking
    fcntl(c->fd, F_SETFL, 0);

    if (unlikely(close(c->fd)))
        perror(where);

    c->fd = -1;
}

static struct feer_conn *
new_feer_conn (EV_P_ int conn_fd, struct sockaddr *sa)
{
    SV *self = newSV(0);
    SvUPGRADE(self, SVt_PVMG); // ensures sv_bless doesn't reallocate
    SvGROW(self, sizeof(struct feer_conn));
    SvPOK_only(self);
    SvIOK_on(self);
    SvIV_set(self,conn_fd);

    struct feer_conn *c = (struct feer_conn *)SvPVX(self);
    Zero(c, 1, struct feer_conn);

    c->self = self;
    c->fd = conn_fd;
    c->sa = sa;
    c->responding = RESPOND_NOT_STARTED;
    c->receiving = RECEIVE_HEADERS;

    ev_io_init(&c->read_ev_io, try_conn_read, conn_fd, EV_READ);
    c->read_ev_io.data = (void *)c;

    ev_init(&c->read_ev_timer, conn_read_timeout);
    c->read_ev_timer.data = (void *)c;

    trace3("made conn fd=%d self=%p, c=%p, cur=%"Sz_uf", len=%"Sz_uf"\n",
        c->fd, self, c, (Sz)SvCUR(self), (Sz)SvLEN(self));

    SV *rv = newRV_inc(c->self);
    sv_bless(rv, feer_conn_stash); // so DESTROY can get called on read errors
    SvREFCNT_dec(rv);

    SvREADONLY_on(self); // turn off later for blessing
    active_conns++;
    return c;
}

// for use in the typemap:
INLINE_UNLESS_DEBUG
static struct feer_conn *
sv_2feer_conn (SV *rv)
{
    if (unlikely(!sv_isa(rv,"Feersum::Connection")))
       croak("object is not of type Feersum::Connection");
    return (struct feer_conn *)SvPVX(SvRV(rv));
}

INLINE_UNLESS_DEBUG
static SV*
feer_conn_2sv (struct feer_conn *c)
{
    return newRV_inc(c->self);
}

static feer_conn_handle *
sv_2feer_conn_handle (SV *rv, bool can_croak)
{
    trace3("sv 2 conn_handle\n");
    if (unlikely(!SvROK(rv))) croak("Expected a reference");
    // do not allow subclassing
    SV *sv = SvRV(rv);
    if (likely(
        sv_isobject(rv) &&
        (SvSTASH(sv) == feer_conn_writer_stash ||
         SvSTASH(sv) == feer_conn_reader_stash)
    )) {
        UV uv = SvUV(sv);
        if (uv == 0) {
            if (can_croak) croak("Operation not allowed: Handle is closed.");
            return NULL;
        }
        return INT2PTR(feer_conn_handle*,uv);
    }

    if (can_croak)
        croak("Expected a Feersum::Connection::Writer or ::Reader object");
    return NULL;
}

static SV *
new_feer_conn_handle (pTHX_ struct feer_conn *c, bool is_writer)
{
    SV *sv;
    SvREFCNT_inc_void_NN(c->self);
    sv = newRV_noinc(newSVuv(PTR2UV(c)));
    sv_bless(sv, is_writer ? feer_conn_writer_stash : feer_conn_reader_stash);
    return sv;
}

#if DEBUG
# define change_responding_state(c, _to) do { \
    enum feer_respond_state __to = (_to); \
    enum feer_respond_state __from = c->responding; \
    const char *_from_str, *_to_str; \
    if (likely(__from != __to)) { \
        RESPOND_STR(c->responding, _from_str); \
        RESPOND_STR(__to, _to_str); \
        trace2("==> responding state %d: %s to %s\n", \
            c->fd,_from_str,_to_str); \
        c->responding = __to; \
    } \
} while (0)
# define change_receiving_state(c, _to) do { \
    enum feer_receive_state __to = (_to); \
    enum feer_receive_state __from = c->receiving; \
    const char *_from_str, *_to_str; \
    if (likely(__from != __to)) { \
        RECEIVE_STR(c->receiving, _from_str); \
        RECEIVE_STR(__to, _to_str); \
        trace2("==> receiving state %d: %s to %s\n", \
            c->fd,_from_str,_to_str); \
        c->receiving = __to; \
    } \
} while (0)
#else
# define change_responding_state(c, _to) c->responding = _to
# define change_receiving_state(c, _to) c->receiving = _to
#endif

INLINE_UNLESS_DEBUG static void
start_read_watcher(struct feer_conn *c) {
    if (unlikely(ev_is_active(&c->read_ev_io)))
        return;
    trace("start read watcher %d\n",c->fd);
    ev_io_start(feersum_ev_loop, &c->read_ev_io);
    SvREFCNT_inc_void_NN(c->self);
}

INLINE_UNLESS_DEBUG static void
stop_read_watcher(struct feer_conn *c) {
    if (unlikely(!ev_is_active(&c->read_ev_io)))
        return;
    trace("stop read watcher %d\n",c->fd);
    ev_io_stop(feersum_ev_loop, &c->read_ev_io);
    SvREFCNT_dec(c->self);
}

INLINE_UNLESS_DEBUG static void
restart_read_timer(struct feer_conn *c) {
    if (likely(!ev_is_active(&c->read_ev_timer))) {
        trace("restart read timer %d\n",c->fd);
        c->read_ev_timer.repeat = read_timeout;
        SvREFCNT_inc_void_NN(c->self);
    }
    ev_timer_again(feersum_ev_loop, &c->read_ev_timer);
}

INLINE_UNLESS_DEBUG static void
stop_read_timer(struct feer_conn *c) {
    if (unlikely(!ev_is_active(&c->read_ev_timer)))
        return;
    trace("stop read timer %d\n",c->fd);
    ev_timer_stop(feersum_ev_loop, &c->read_ev_timer);
    SvREFCNT_dec(c->self);
}

INLINE_UNLESS_DEBUG static void
start_write_watcher(struct feer_conn *c) {
    if (unlikely(ev_is_active(&c->write_ev_io)))
        return;
    trace("start write watcher %d\n",c->fd);
    ev_io_start(feersum_ev_loop, &c->write_ev_io);
    SvREFCNT_inc_void_NN(c->self);
}

INLINE_UNLESS_DEBUG static void
stop_write_watcher(struct feer_conn *c) {
    if (unlikely(!ev_is_active(&c->write_ev_io)))
        return;
    trace("stop write watcher %d\n",c->fd);
    ev_io_stop(feersum_ev_loop, &c->write_ev_io);
    SvREFCNT_dec(c->self);
}


static void
process_request_ready_rinq (void)
{
    while (request_ready_rinq) {
        struct feer_conn *c =
            (struct feer_conn *)rinq_shift(&request_ready_rinq);
        //trace("rinq shifted c=%p, head=%p\n", c, request_ready_rinq);

        call_request_callback(c);

        if (likely(c->wbuf_rinq)) {
            // this was deferred until after the perl callback
            conn_write_ready(c);
        }
        SvREFCNT_dec(c->self); // for the rinq
    }
}

static void
prepare_cb (EV_P_ ev_prepare *w, int revents)
{
    if (unlikely(revents & EV_ERROR)) {
        trouble("EV error in prepare, revents=0x%08x\n", revents);
        ev_break(EV_A, EVBREAK_ALL);
        return;
    }

    if (!ev_is_active(&accept_w) && !shutting_down) {
        ev_io_start(EV_A, &accept_w);
    }
    ev_prepare_stop(EV_A, w);
}

static void
check_cb (EV_P_ ev_check *w, int revents)
{
    if (unlikely(revents & EV_ERROR)) {
        trouble("EV error in check, revents=0x%08x\n", revents);
        ev_break(EV_A, EVBREAK_ALL);
        return;
    }
    trace3("check! head=%p\n", request_ready_rinq);
    if (request_ready_rinq)
        process_request_ready_rinq();
}

static void
idle_cb (EV_P_ ev_idle *w, int revents)
{
    if (unlikely(revents & EV_ERROR)) {
        trouble("EV error in idle, revents=0x%08x\n", revents);
        ev_break(EV_A, EVBREAK_ALL);
        return;
    }
    trace3("idle! head=%p\n", request_ready_rinq);
    if (request_ready_rinq)
        process_request_ready_rinq();
    ev_idle_stop(EV_A, w);
}

static void
try_conn_write(EV_P_ struct ev_io *w, int revents)
{
    dCONN;
    int i;
    struct iomatrix *m;

    SvREFCNT_inc_void_NN(c->self);

    // if it's marked writeable EV suggests we simply try write to it.
    // Otherwise it is stopped and we should ditch this connection.
    if (unlikely(revents & EV_ERROR && !(revents & EV_WRITE))) {
        trace("EV error on write, fd=%d revents=0x%08x\n", w->fd, revents);
        change_responding_state(c, RESPOND_SHUTDOWN);
        goto try_write_finished;
    }

    if (unlikely(!c->wbuf_rinq)) {
        if (unlikely(c->responding >= RESPOND_SHUTDOWN))
            goto try_write_finished;

        if (!c->poll_write_cb) {
            // no callback and no data: wait for app to push to us.
            if (c->responding == RESPOND_STREAMING)
                goto try_write_paused;

            trace("tried to write with an empty buffer %d resp=%d\n",w->fd,c->responding);
            change_responding_state(c, RESPOND_SHUTDOWN);
            goto try_write_finished;
        }

        if (c->poll_write_cb_is_io_handle)
            pump_io_handle(c, c->poll_write_cb);
        else
            call_poll_callback(c, 1);

        // callback didn't write anything:
        if (unlikely(!c->wbuf_rinq)) goto try_write_again;
    }

try_write_again_immediately:
    m = (struct iomatrix *)c->wbuf_rinq->ref;
#if DEBUG >= 2
    warn("going to write to %d:\n",c->fd);
    for (i=0; i < m->count; i++) {
        fprintf(stderr,"%.*s",
            (int)m->iov[i].iov_len, (char*)m->iov[i].iov_base);
    }
#endif

    trace("going to write %d off=%d count=%d\n", w->fd, m->offset, m->count);
    errno = 0;
    ssize_t wrote = writev(w->fd, &m->iov[m->offset], m->count - m->offset);
    trace("wrote %"Ssz_df" bytes to %d, errno=%d\n", (Ssz)wrote, w->fd, errno);

    if (unlikely(wrote <= 0)) {
        if (unlikely(wrote == 0))
            goto try_write_again;
        if (likely(errno == EAGAIN || errno == EINTR))
            goto try_write_again;
        perror("Feersum try_conn_write");
        change_responding_state(c, RESPOND_SHUTDOWN);
        goto try_write_finished;
    }

    for (i = m->offset; i < m->count && wrote > 0; i++) {
        struct iovec *v = &m->iov[i];
        if (unlikely(v->iov_len > wrote)) {
            trace3("offset vector %d  base=%p len=%"Sz_uf"\n",
                w->fd, v->iov_base, (Sz)v->iov_len);
            v->iov_base += wrote;
            v->iov_len  -= wrote;
            // don't consume any more:
            wrote = 0;
        }
        else {
            trace3("consume vector %d base=%p len=%"Sz_uf" sv=%p\n",
                w->fd, v->iov_base, (Sz)v->iov_len, m->sv[i]);
            wrote -= v->iov_len;
            m->offset++;
            if (m->sv[i]) {
                SvREFCNT_dec(m->sv[i]);
                m->sv[i] = NULL;
            }
        }
    }

    if (likely(m->offset >= m->count)) {
        trace2("all done with iomatrix %d state=%d\n",w->fd,c->responding);
        rinq_shift(&c->wbuf_rinq);
        Safefree(m);
        if (!c->wbuf_rinq)
            goto try_write_finished;
        trace2("write again immediately %d state=%d\n",w->fd,c->responding);
        goto try_write_again_immediately;
    }
    // else, fallthrough:
    trace2("write fallthrough %d state=%d\n",w->fd,c->responding);

try_write_again:
    trace("write again %d state=%d\n",w->fd,c->responding);
    start_write_watcher(c);
    goto try_write_cleanup;

try_write_finished:
    // should always be responding, but just in case
    switch(c->responding) {
    case RESPOND_NOT_STARTED:
        // the write watcher shouldn't ever get called before starting to
        // respond. Shut it down if it does.
        trace("unexpected try_write when response not started %d\n",c->fd);
        goto try_write_shutdown;
    case RESPOND_NORMAL:
        goto try_write_shutdown;
    case RESPOND_STREAMING:
        if (c->poll_write_cb) goto try_write_again;
        else goto try_write_paused;
    case RESPOND_SHUTDOWN:
        goto try_write_shutdown;
    default:
        goto try_write_cleanup;
    }

try_write_paused:
    trace3("write PAUSED %d, refcnt=%d, state=%d\n", c->fd, SvREFCNT(c->self), c->responding);
    stop_write_watcher(c);
    goto try_write_cleanup;

try_write_shutdown:
    trace3("write SHUTDOWN %d, refcnt=%d, state=%d\n", c->fd, SvREFCNT(c->self), c->responding);
    change_responding_state(c, RESPOND_SHUTDOWN);
    stop_write_watcher(c);
    safe_close_conn(c, "close at write shutdown");

try_write_cleanup:
    SvREFCNT_dec(c->self);
    return;
}

static int
try_parse_http(struct feer_conn *c, size_t last_read)
{
    struct feer_req *req = c->req;
    if (likely(!req)) {
        Newxz(req,1,struct feer_req);
        c->req = req;
    }

    // GH#12 - incremental parsing sets num_headers to 0 each time; force it
    // back on every invocation
    req->num_headers = MAX_HEADERS;

    return phr_parse_request(SvPVX(c->rbuf), SvCUR(c->rbuf),
        &req->method, &req->method_len,
        &req->path, &req->path_len, &req->minor_version,
        req->headers, &req->num_headers,
        (SvCUR(c->rbuf)-last_read));
}

static void
try_conn_read(EV_P_ ev_io *w, int revents)
{
    dCONN;
    SvREFCNT_inc_void_NN(c->self);

    // if it's marked readable EV suggests we simply try read it. Otherwise it
    // is stopped and we should ditch this connection.
    if (unlikely(revents & EV_ERROR && !(revents & EV_READ))) {
        trace("EV error on read, fd=%d revents=0x%08x\n", w->fd, revents);
        goto try_read_error;
    }

    if (unlikely(c->receiving == RECEIVE_SHUTDOWN))
        goto dont_read_again;

    trace("try read %d\n",w->fd);

    if (likely(!c->rbuf)) { // likely = optimize for small requests
        trace("init rbuf for %d\n",w->fd);
        c->rbuf = newSV(READ_INIT_FACTOR*READ_BUFSZ + 1);
        SvPOK_on(c->rbuf);
    }

    ssize_t space_free = SvLEN(c->rbuf) - SvCUR(c->rbuf);
    if (unlikely(space_free < READ_BUFSZ)) { // unlikely = optimize for small
        size_t new_len = SvLEN(c->rbuf) + READ_GROW_FACTOR*READ_BUFSZ;
        trace("moar memory %d: %"Sz_uf" to %"Sz_uf"\n",
            w->fd, (Sz)SvLEN(c->rbuf), (Sz)new_len);
        SvGROW(c->rbuf, new_len);
        space_free += READ_GROW_FACTOR*READ_BUFSZ;
    }

    char *cur = SvPVX(c->rbuf) + SvCUR(c->rbuf);
    ssize_t got_n = read(w->fd, cur, space_free);

    if (unlikely(got_n <= 0)) {
        if (unlikely(got_n == 0)) {
            trace("EOF before complete request: %d\n",w->fd,SvCUR(c->rbuf));
            goto try_read_error;
        }
        if (likely(errno == EAGAIN || errno == EINTR))
            goto try_read_again;
        perror("try_conn_read error");
        goto try_read_error;
    }

    trace("read %d %"Ssz_df"\n", w->fd, (Ssz)got_n);
    SvCUR(c->rbuf) += got_n;
    // likely = optimize for small requests
    if (likely(c->receiving == RECEIVE_HEADERS)) {

#ifdef FLASH_SOCKET_POLICY_SUPPORT
        if (unlikely(*SvPVX(c->rbuf) == '<')) {
            if (likely(SvCUR(c->rbuf) >= 22)) { // length of vvv
                if (str_eq(SvPVX(c->rbuf), 22, "<policy-file-request/>", 22)) {
                    add_const_to_wbuf(c, STR_WITH_LEN(FLASH_SOCKET_POLICY));
                    conn_write_ready(c);
                    stop_read_watcher(c);
                    stop_read_timer(c);
                    // TODO: keep-alives: be sure to remove the 22 bytes
                    // out of the rbuf
                    change_receiving_state(c, RECEIVE_SHUTDOWN);
                    change_responding_state(c, RESPOND_SHUTDOWN);
                    goto dont_read_again;
                }
            }
            // "if prefixed with"
            else if (likely(str_eq(SvPVX(c->rbuf), SvCUR(c->rbuf),
                                   "<policy-file-request/>", SvCUR(c->rbuf))))
            {
                goto try_read_again;
            }
        }
#endif

        int ret = try_parse_http(c, (size_t)got_n);
        if (ret == -1) goto try_read_bad;
        if (ret == -2) goto try_read_again;

        if (process_request_headers(c, ret))
            goto try_read_again_reset_timer;
        else
            goto dont_read_again;
    }
    else if (likely(c->receiving == RECEIVE_BODY)) {
        c->received_cl += got_n;
        if (c->received_cl < c->expected_cl)
            goto try_read_again_reset_timer;
        // body is complete
        sched_request_callback(c);
        goto dont_read_again;
    }
    else {
        trouble("unknown read state %d %d", w->fd, c->receiving);
    }

    // fallthrough:
try_read_error:
    trace("READ ERROR %d, refcnt=%d\n", w->fd, SvREFCNT(c->self));
    change_receiving_state(c, RECEIVE_SHUTDOWN);
    change_responding_state(c, RESPOND_SHUTDOWN);
    stop_read_watcher(c);
    stop_read_timer(c);
    stop_write_watcher(c);
    goto try_read_cleanup;

try_read_bad:
    trace("bad request %d\n", w->fd);
    respond_with_server_error(c, "Malformed request.\n", 0, 400);
    // TODO: when keep-alive, close conn instead of fallthrough here.
    // fallthrough:
dont_read_again:
    trace("done reading %d\n", w->fd);
    change_receiving_state(c, RECEIVE_SHUTDOWN);
    stop_read_watcher(c);
    stop_read_timer(c);
    goto try_read_cleanup;

try_read_again_reset_timer:
    trace("(reset read timer) %d\n", w->fd);
    restart_read_timer(c);
    // fallthrough:
try_read_again:
    trace("read again %d\n", w->fd);
    start_read_watcher(c);

try_read_cleanup:
    SvREFCNT_dec(c->self);
}

static void
conn_read_timeout (EV_P_ ev_timer *w, int revents)
{
    dCONN;
    SvREFCNT_inc_void_NN(c->self);

    if (unlikely(!(revents & EV_TIMER) || c->receiving == RECEIVE_SHUTDOWN)) {
        // if there's no EV_TIMER then EV has stopped it on an error
        if (revents & EV_ERROR)
            trouble("EV error on read timer, fd=%d revents=0x%08x\n",
                c->fd,revents);
        goto read_timeout_cleanup;
    }

    trace("read timeout %d\n", c->fd);

    if (likely(c->responding == RESPOND_NOT_STARTED)) {
        const char *msg;
        if (c->receiving == RECEIVE_HEADERS) {
            msg = "Headers took too long.";
        }
        else {
            msg = "Timeout reading body.";
        }
        respond_with_server_error(c, msg, 0, 408);
    }
    else {
        // XXX as of 0.984 this appears to be dead code
        trace("read timeout while writing %d\n",c->fd);
        stop_write_watcher(c);
        stop_read_watcher(c);
        stop_read_timer(c);
        safe_close_conn(c, "close at read timeout");
        change_responding_state(c, RESPOND_SHUTDOWN);
    }

read_timeout_cleanup:
    stop_read_watcher(c);
    stop_read_timer(c);
    SvREFCNT_dec(c->self);
}

static void
accept_cb (EV_P_ ev_io *w, int revents)
{
    struct sockaddr_storage sa_buf;
    socklen_t sa_len;

    if (unlikely(shutting_down)) {
        // shouldn't get called, but be defensive
        ev_io_stop(EV_A, w);
        close(w->fd);
        return;
    }

    if (unlikely(revents & EV_ERROR)) {
        trouble("EV error in accept_cb, fd=%d, revents=0x%08x\n",w->fd,revents);
        ev_break(EV_A, EVBREAK_ALL);
        return;
    }

    trace2("accept! revents=0x%08x\n", revents);

    while (1) {
        sa_len = sizeof(struct sockaddr_storage);
        errno = 0;

        int fd = accept(w->fd, (struct sockaddr *)&sa_buf, &sa_len);
        trace("accepted fd=%d, errno=%d\n", fd, errno);
        if (fd == -1) break;

        int is_tcp = 1;
#ifdef AF_UNIX
        if (unlikely(sa_buf.ss_family == AF_UNIX)) is_tcp = 0;
#endif

        assert(sa_len <= sizeof(struct sockaddr_storage));
        if (unlikely(prep_socket(fd, is_tcp))) {
            perror("prep_socket");
            trouble("prep_socket failed for %d\n", fd);
            close(fd);
            continue;
        }

        struct sockaddr *sa = (struct sockaddr *)malloc(sa_len);
        memcpy(sa,&sa_buf,(size_t)sa_len);
        struct feer_conn *c = new_feer_conn(EV_A,fd,sa);
        start_read_watcher(c);
        restart_read_timer(c);
        assert(SvREFCNT(c->self) == 3);
        SvREFCNT_dec(c->self);
    }
}

static void
sched_request_callback (struct feer_conn *c)
{
    trace("sched req callback: %d c=%p, head=%p\n", c->fd, c, request_ready_rinq);
    rinq_push(&request_ready_rinq, c);
    SvREFCNT_inc_void_NN(c->self); // for the rinq
    if (!ev_is_active(&ei)) {
        ev_idle_start(feersum_ev_loop, &ei);
    }
}

// the unlikely/likely annotations here are trying to optimize for GET first
// and POST second.  Other entity-body requests are third in line.
static bool
process_request_headers (struct feer_conn *c, int body_offset)
{
    int err_code;
    const char *err;
    struct feer_req *req = c->req;

    trace("processing headers %d minor_version=%d\n",c->fd,req->minor_version);
    bool body_is_required;
    bool next_req_follows = 0;

    c->is_http11 = (req->minor_version == 1);

    change_receiving_state(c, RECEIVE_BODY);

    if (likely(str_eq("GET", 3, req->method, req->method_len))) {
        // Not supposed to have a body.  Additional bytes are either a
        // mistake, a websocket negotiation or pipelined requests under
        // HTTP/1.1
        next_req_follows = 1;
    }
    else if (likely(str_eq("OPTIONS", 7, req->method, req->method_len))) {
        body_is_required = 1;
        next_req_follows = 1;
    }
    else if (likely(str_eq("POST", 4, req->method, req->method_len))) {
        body_is_required = 1;
    }
    else if (str_eq("PUT", 3, req->method, req->method_len)) {
        body_is_required = 1;
    }
    else if (str_eq("HEAD", 4, req->method, req->method_len) ||
             str_eq("DELETE", 6, req->method, req->method_len))
    {
        next_req_follows = 1;
    }
    else {
        err = "Feersum doesn't support that method yet\n";
        err_code = 405;
        goto got_bad_request;
    }

#if DEBUG >= 2
    if (next_req_follows)
        trace2("next req follows fd=%d, boff=%d\n",c->fd,body_offset);
    if (body_is_required)
        trace2("body is required fd=%d, boff=%d\n",c->fd,body_offset);
#endif

    // a body or follow-on data potentially follows the headers. Let feer_req
    // retain its pointers into rbuf and make a new scalar for more body data.
    STRLEN from_len;
    char *from = SvPV(c->rbuf,from_len);
    from += body_offset;
    int need = from_len - body_offset;
    int new_alloc = (need > READ_INIT_FACTOR*READ_BUFSZ)
        ? need : READ_INIT_FACTOR*READ_BUFSZ-1;
    trace("new rbuf for body %d need=%d alloc=%d\n",c->fd, need, new_alloc);
    SV *new_rbuf = newSVpvn(need ? from : "", need);

    req->buf = c->rbuf;
    c->rbuf = new_rbuf;
    SvCUR_set(req->buf, body_offset);

    if (likely(next_req_follows)) // optimize for GET
        goto got_it_all;

    // determine how much we need to read
    int i;
    UV expected = 0;
    for (i=0; i < req->num_headers; i++) {
        struct phr_header *hdr = &req->headers[i];
        if (!hdr->name) continue;
        // XXX: ignore multiple C-L headers?
        if (unlikely(
             str_case_eq("content-length", 14, hdr->name, hdr->name_len)))
        {
            int g = grok_number(hdr->value, hdr->value_len, &expected);
            if (likely(g == IS_NUMBER_IN_UV)) {
                if (unlikely(expected > MAX_BODY_LEN)) {
                    err_code = 413;
                    err = "Content length exceeds maximum\n";
                    goto got_bad_request;
                }
                else
                    goto got_cl;
            }
            else {
                err_code = 400;
                err = "invalid content-length\n";
                goto got_bad_request;
            }
        }
        // TODO: support "Connection: close" bodies
        // TODO: support "Transfer-Encoding: chunked" bodies
    }

    if (body_is_required) {
        // Go the nginx route...
        err_code = 411;
        err = "Content-Length required\n";
    }
    else {
        // XXX TODO support requests that don't require a body
        err_code = 418;
        err = "Feersum doesn't know how to handle optional-body requests yet\n";
    }

got_bad_request:
    respond_with_server_error(c, err, 0, err_code);
    return 0;

got_cl:
    c->expected_cl = (ssize_t)expected;
    c->received_cl = SvCUR(c->rbuf);
    trace("expecting body %d size=%"Ssz_df" have=%"Ssz_df"\n",
        c->fd, (Ssz)c->expected_cl, (Ssz)c->received_cl);
    SvGROW(c->rbuf, c->expected_cl + 1);

    // don't have enough bytes to schedule immediately?
    // unlikely = optimize for short requests
    if (unlikely(c->expected_cl && c->received_cl < c->expected_cl)) {
        // TODO: schedule the callback immediately and support a non-blocking
        // ->read method.
        // sched_request_callback(c);
        // change_receiving_state(c, RECEIVE_STREAM);
        return 1;
    }
    // fallthrough: have enough bytes
got_it_all:
    sched_request_callback(c);
    return 0;
}

static void
conn_write_ready (struct feer_conn *c)
{
    if (c->in_callback) return; // defer until out of callback

    if (c->write_ev_io.data == NULL) {
        ev_io_init(&c->write_ev_io, try_conn_write, c->fd, EV_WRITE);
        c->write_ev_io.data = (void *)c;
    }

#if AUTOCORK_WRITES
    start_write_watcher(c);
#else
    // attempt a non-blocking write immediately if we're not already
    // waiting for writability
    try_conn_write(feersum_ev_loop, &c->write_ev_io, EV_WRITE);
#endif
}

static void
respond_with_server_error (struct feer_conn *c, const char *msg, STRLEN msg_len, int err_code)
{
    SV *tmp;

    if (unlikely(c->responding != RESPOND_NOT_STARTED)) {
        trouble("Tried to send server error but already responding!");
        return;
    }

    if (!msg_len) msg_len = strlen(msg);
    assert(msg_len < INT_MAX);

    tmp = newSVpvf("HTTP/1.%d %d %s" CRLF
                   "Content-Type: text/plain" CRLF
                   "Connection: close" CRLF
                   "Cache-Control: no-cache, no-store" CRLF
                   "Content-Length: %"Ssz_df"" CRLFx2
                   "%.*s",
              c->is_http11 ? 1 : 0,
              err_code, http_code_to_msg(err_code),
              (Ssz)msg_len,
              (int)msg_len, msg);
    add_sv_to_wbuf(c, sv_2mortal(tmp));

    stop_read_watcher(c);
    stop_read_timer(c);
    change_responding_state(c, RESPOND_SHUTDOWN);
    change_receiving_state(c, RECEIVE_SHUTDOWN);
    conn_write_ready(c);
}

INLINE_UNLESS_DEBUG bool
str_eq(const char *a, int a_len, const char *b, int b_len)
{
    if (a_len != b_len) return 0;
    if (a == b) return 1;
    int i;
    for (i=0; i<a_len && i<b_len; i++) {
        if (a[i] != b[i]) return 0;
    }
    return 1;
}

/*
 * Compares two strings, assumes that the first string is already lower-cased
 */
INLINE_UNLESS_DEBUG bool
str_case_eq(const char *a, int a_len, const char *b, int b_len)
{
    if (a_len != b_len) return 0;
    if (a == b) return 1;
    int i;
    for (i=0; i<a_len && i<b_len; i++) {
        if (a[i] != tolower(b[i])) return 0;
    }
    return 1;
}

INLINE_UNLESS_DEBUG int
hex_decode(const char ch)
{
    if (likely('0' <= ch && ch <= '9'))
        return ch - '0';
    else if ('A' <= ch && ch <= 'F')
        return ch - 'A' + 10;
    else if ('a' <= ch && ch <= 'f')
        return ch - 'a' + 10;
    return -1;
}

static void
uri_decode_sv (SV *sv)
{
    STRLEN len;
    char *ptr, *end, *decoded;

    ptr = SvPV(sv, len);
    end = SvEND(sv);

    // quickly scan for % so we can ignore decoding that portion of the string
    while (ptr < end) {
        if (unlikely(*ptr == '%')) goto needs_decode;
        ptr++;
    }
    return;

needs_decode:

    // Up until ptr have been "decoded" already by virtue of those chars not
    // being encoded.
    decoded = ptr;

    for (; ptr < end; ptr++) {
        if (unlikely(*ptr == '%') && likely(end - ptr >= 2)) {
            int c1 = hex_decode(ptr[1]);
            int c2 = hex_decode(ptr[2]);
            if (likely(c1 != -1 && c2 != -1)) {
                *decoded++ = (c1 << 4) + c2;
                ptr += 2;
                continue;
            }
        }
        *decoded++ = *ptr;
    }

    *decoded = '\0'; // play nice with C

    ptr = SvPV_nolen(sv);
    SvCUR_set(sv, decoded-ptr);
}

static void
feersum_init_tmpl_env(pTHX)
{
    HV *e;
    e = newHV();

    // constants
    hv_stores(e, "psgi.version", newRV((SV*)psgi_ver));
    hv_stores(e, "psgi.url_scheme", newSVpvs("http"));
    hv_stores(e, "psgi.run_once", &PL_sv_no);
    hv_stores(e, "psgi.nonblocking", &PL_sv_yes);
    hv_stores(e, "psgi.multithread", &PL_sv_no);
    hv_stores(e, "psgi.multiprocess", &PL_sv_no);
    hv_stores(e, "psgi.streaming", &PL_sv_yes);
    hv_stores(e, "psgi.errors", newRV((SV*)PL_stderrgv));
    hv_stores(e, "psgix.input.buffered", &PL_sv_yes);
    hv_stores(e, "psgix.output.buffered", &PL_sv_yes);
    hv_stores(e, "psgix.body.scalar_refs", &PL_sv_yes);
    hv_stores(e, "psgix.output.guard", &PL_sv_yes);
    hv_stores(e, "SCRIPT_NAME", newSVpvs(""));

    // placeholders that get defined for every request
    hv_stores(e, "SERVER_PROTOCOL", &PL_sv_undef);
    hv_stores(e, "SERVER_NAME", &PL_sv_undef);
    hv_stores(e, "SERVER_PORT", &PL_sv_undef);
    hv_stores(e, "REQUEST_URI", &PL_sv_undef);
    hv_stores(e, "REQUEST_METHOD", &PL_sv_undef);
    hv_stores(e, "PATH_INFO", &PL_sv_undef);
    hv_stores(e, "REMOTE_ADDR", &PL_sv_placeholder);
    hv_stores(e, "REMOTE_PORT", &PL_sv_placeholder);

    // defaults that get changed for some requests
    hv_stores(e, "psgi.input", &PL_sv_undef);
    hv_stores(e, "CONTENT_LENGTH", newSViv(0));
    hv_stores(e, "QUERY_STRING", newSVpvs(""));

    // anticipated headers
    hv_stores(e, "CONTENT_TYPE", &PL_sv_placeholder);
    hv_stores(e, "HTTP_HOST", &PL_sv_placeholder);
    hv_stores(e, "HTTP_USER_AGENT", &PL_sv_placeholder);
    hv_stores(e, "HTTP_ACCEPT", &PL_sv_placeholder);
    hv_stores(e, "HTTP_ACCEPT_LANGUAGE", &PL_sv_placeholder);
    hv_stores(e, "HTTP_ACCEPT_CHARSET", &PL_sv_placeholder);
    hv_stores(e, "HTTP_KEEP_ALIVE", &PL_sv_placeholder);
    hv_stores(e, "HTTP_CONNECTION", &PL_sv_placeholder);
    hv_stores(e, "HTTP_REFERER", &PL_sv_placeholder);
    hv_stores(e, "HTTP_COOKIE", &PL_sv_placeholder);
    hv_stores(e, "HTTP_IF_MODIFIED_SINCE", &PL_sv_placeholder);
    hv_stores(e, "HTTP_IF_NONE_MATCH", &PL_sv_placeholder);
    hv_stores(e, "HTTP_CACHE_CONTROL", &PL_sv_placeholder);

    hv_stores(e, "psgix.io", &PL_sv_placeholder);

    feersum_tmpl_env = e;
}

static HV*
feersum_env(pTHX_ struct feer_conn *c)
{
    HV *e;
    SV **hsv;
    int i,j;
    struct feer_req *r = c->req;

    if (unlikely(!feersum_tmpl_env))
        feersum_init_tmpl_env(aTHX);
    e = newHVhv(feersum_tmpl_env);

    trace("generating header (fd %d) %.*s\n",
        c->fd, (int)r->path_len, r->path);

    SV *path = newSVpvn(r->path, r->path_len);
    hv_stores(e, "SERVER_NAME", newSVsv(feer_server_name));
    hv_stores(e, "SERVER_PORT", newSVsv(feer_server_port));
    hv_stores(e, "REQUEST_URI", path);
    hv_stores(e, "REQUEST_METHOD", newSVpvn(r->method,r->method_len));
    hv_stores(e, "SERVER_PROTOCOL", (r->minor_version == 1) ?
        newSVsv(psgi_serv11) : newSVsv(psgi_serv10));

    SV *addr = &PL_sv_undef;
    SV *port = &PL_sv_undef;
    const char *str_addr;
    unsigned short s_port;

    if (c->sa->sa_family == AF_INET) {
        struct sockaddr_in *in = (struct sockaddr_in *)c->sa;
        addr = newSV(INET_ADDRSTRLEN);
        str_addr = inet_ntop(AF_INET,&in->sin_addr,SvPVX(addr),INET_ADDRSTRLEN);
        s_port = ntohs(in->sin_port);
    }
#ifdef AF_INET6
    else if (c->sa->sa_family == AF_INET6) {
        struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)c->sa;
        addr = newSV(INET6_ADDRSTRLEN);
        str_addr = inet_ntop(AF_INET6,&in6->sin6_addr,SvPVX(addr),INET6_ADDRSTRLEN);
        s_port = ntohs(in6->sin6_port);
    }
#endif
#ifdef AF_UNIX
    else if (c->sa->sa_family == AF_UNIX) {
        str_addr = "unix";
        addr = newSV(sizeof(str_addr));
        memcpy(SvPVX(addr), str_addr, sizeof(str_addr));
        s_port = 0;
    }
#endif

    if (likely(str_addr)) {
        SvCUR(addr) = strlen(SvPVX(addr));
        SvPOK_on(addr);
        port = newSViv(s_port);
    }
    hv_stores(e, "REMOTE_ADDR", addr);
    hv_stores(e, "REMOTE_PORT", port);

    // if (unlikely(c->expected_cl > 0)) { /* #23 - POST can still have empty content */
        hv_stores(e, "CONTENT_LENGTH", newSViv(c->expected_cl));
        hv_stores(e, "psgi.input", new_feer_conn_handle(aTHX_ c,0));
    // }
    // else if (request_cb_is_psgi) {
        // TODO: make psgi.input a valid, but always empty stream for PSGI mode?
    // }

    if (request_cb_is_psgi) {
        SV *fake_fh = newSViv(c->fd); // just some random dummy value
        SV *selfref = sv_2mortal(feer_conn_2sv(c));
        sv_magicext(fake_fh, selfref, PERL_MAGIC_ext, &psgix_io_vtbl, NULL, 0);
        hv_stores(e, "psgix.io", fake_fh);
    }

    {
        const char *qpos = r->path;
        SV *pinfo, *qstr;

        // rather than memchr, for speed:
        while (*qpos != '?' && qpos < r->path + r->path_len)
            qpos++;

        if (*qpos == '?') {
            pinfo = newSVpvn(r->path, (qpos - r->path));
            qpos++;
            qstr = newSVpvn(qpos, r->path_len - (qpos - r->path));
        }
        else {
            pinfo = newSVsv(path);
            qstr = NULL; // use template default
        }
        uri_decode_sv(pinfo);
        hv_stores(e, "PATH_INFO", pinfo);
        if (qstr != NULL) // hv template defaults QUERY_STRING to empty
            hv_stores(e, "QUERY_STRING", qstr);
    }

    SV *val = NULL;
    char *kbuf;
    size_t kbuflen = 64;
    Newx(kbuf, kbuflen, char);
    kbuf[0]='H'; kbuf[1]='T'; kbuf[2]='T'; kbuf[3]='P'; kbuf[4]='_';

    for (i=0; i<r->num_headers; i++) {
        struct phr_header *hdr = &(r->headers[i]);
        if (unlikely(hdr->name == NULL && val != NULL)) {
            trace("... multiline %.*s\n", (int)hdr->value_len, hdr->value);
            sv_catpvn(val, hdr->value, hdr->value_len);
            continue;
        }
        else if (unlikely(str_case_eq(
            STR_WITH_LEN("content-length"), hdr->name, hdr->name_len)))
        {
            // content length shouldn't show up as HTTP_CONTENT_LENGTH but
            // as CONTENT_LENGTH in the env-hash.
            continue;
        }
        else if (unlikely(str_case_eq(
            STR_WITH_LEN("content-type"), hdr->name, hdr->name_len)))
        {
            hv_stores(e, "CONTENT_TYPE",newSVpvn(hdr->value, hdr->value_len));
            continue;
        }

        size_t klen = 5+hdr->name_len;
        if (kbuflen < klen) {
            kbuflen = klen;
            kbuf = Renew(kbuf, kbuflen, char);
        }
        char *key = kbuf + 5;
        for (j=0; j<hdr->name_len; j++) {
            char n = hdr->name[j];
            *key++ = (n == '-') ? '_' : toupper(n);
        }

        SV **val = hv_fetch(e, kbuf, klen, 1);
        trace("adding header to env (fd %d) %.*s: %.*s\n",
            c->fd, (int)klen, kbuf, (int)hdr->value_len, hdr->value);

        assert(val != NULL); // "fetch is store" flag should ensure this
        if (unlikely(SvPOK(*val))) {
            trace("... is multivalue\n");
            // extend header with comma
            sv_catpvn(*val, ", ", 2);
            sv_catpvn(*val, hdr->value, hdr->value_len);
        }
        else {
            // change from undef to a real value
            sv_setpvn(*val, hdr->value, hdr->value_len);
        }
    }
    Safefree(kbuf);

    return e;
}

static void
feersum_start_response (pTHX_ struct feer_conn *c, SV *message, AV *headers,
                        int streaming)
{
    const char *ptr;
    I32 i;

    trace("start_response fd=%d streaming=%d\n", c->fd, streaming);

    if (unlikely(c->responding != RESPOND_NOT_STARTED))
        croak("already responding?!");
    change_responding_state(c, streaming ? RESPOND_STREAMING : RESPOND_NORMAL);

    if (unlikely(!SvOK(message) || !(SvIOK(message) || SvPOK(message)))) {
        croak("Must define an HTTP status code or message");
    }

    I32 avl = av_len(headers);
    if (unlikely(avl+1 % 2 == 1)) {
        croak("expected even-length array, got %d", avl+1);
    }

    // int or 3 chars? use a stock message
    UV code = 0;
    if (SvIOK(message))
        code = SvIV(message);
    else if (SvUOK(message))
        code = SvUV(message);
    else {
        const int numtype = grok_number(SvPVX_const(message),3,&code);
        if (unlikely(numtype != IS_NUMBER_IN_UV))
            code = 0;
    }
    trace2("starting response fd=%d code=%"UVuf"\n",c->fd,code);

    if (unlikely(!code))
        croak("first parameter is not a number or doesn't start with digits");

    // for PSGI it's always just an IV so optimize for that
    if (likely(!SvPOK(message) || SvCUR(message) == 3)) {
        ptr = http_code_to_msg(code);
        message = sv_2mortal(newSVpvf("%"UVuf" %s",code,ptr));
    }

    // don't generate or strip Content-Length headers for 304 or 1xx
    c->auto_cl = (code == 304 || (100 <= code && code <= 199)) ? 0 : 1;

    add_const_to_wbuf(c, c->is_http11 ? "HTTP/1.1 " : "HTTP/1.0 ", 9);
    add_sv_to_wbuf(c, message);
    add_crlf_to_wbuf(c);

    for (i=0; i<avl; i+= 2) {
        SV **hdr = av_fetch(headers, i, 0);
        if (unlikely(!hdr || !SvOK(*hdr))) {
            trace("skipping undef header key");
            continue;
        }

        SV **val = av_fetch(headers, i+1, 0);
        if (unlikely(!val || !SvOK(*val))) {
            trace("skipping undef header value");
            continue;
        }

        STRLEN hlen;
        const char *hp = SvPV(*hdr, hlen);
        if (likely(c->auto_cl) &&
            unlikely(str_case_eq("content-length",14,hp,hlen)))
        {
            trace("ignoring content-length header in the response\n");
            continue;
        }

        add_sv_to_wbuf(c, *hdr);
        add_const_to_wbuf(c, ": ", 2);
        add_sv_to_wbuf(c, *val);
        add_crlf_to_wbuf(c);
    }

    if (streaming) {
        if (c->is_http11)
            add_const_to_wbuf(c, "Transfer-Encoding: chunked" CRLFx2, 30);
        else
            add_const_to_wbuf(c, "Connection: close" CRLFx2, 21);
    }

    conn_write_ready(c);
}

static size_t
feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body)
{
    size_t RETVAL;
    int i;
    bool body_is_string = 0;
    STRLEN cur;

    if (c->responding != RESPOND_NORMAL)
        croak("can't use write_whole_body when in streaming mode");

    if (!SvOK(body)) {
        body = sv_2mortal(newSVpvs(""));
        body_is_string = 1;
    }
    else if (SvROK(body)) {
        SV *refd = SvRV(body);
        if (SvOK(refd) && !SvROK(refd)) {
            body = refd;
            body_is_string = 1;
        }
        else if (SvTYPE(refd) != SVt_PVAV) {
            croak("body must be a scalar, scalar reference or array reference");
        }
    }
    else {
        body_is_string = 1;
    }

    SV *cl_sv; // content-length future
    struct iovec *cl_iov;
    if (likely(c->auto_cl))
        add_placeholder_to_wbuf(c, &cl_sv, &cl_iov);
    else
        add_crlf_to_wbuf(c);

    if (body_is_string) {
        cur = add_sv_to_wbuf(c,body);
        RETVAL = cur;
    }
    else {
        AV *abody = (AV*)SvRV(body);
        I32 amax = av_len(abody);
        RETVAL = 0;
        for (i=0; i<=amax; i++) {
            SV *sv = fetch_av_normal(aTHX_ abody, i);
            if (unlikely(!sv)) continue;
            cur = add_sv_to_wbuf(c,sv);
            trace("body part i=%d sv=%p cur=%"Sz_uf"\n", i, sv, (Sz)cur);
            RETVAL += cur;
        }
    }

    if (likely(c->auto_cl)) {
        sv_setpvf(cl_sv, "Content-Length: %"Sz_uf"" CRLFx2, (Sz)RETVAL);
        update_wbuf_placeholder(c, cl_sv, cl_iov);
    }

    change_responding_state(c, RESPOND_SHUTDOWN);
    conn_write_ready(c);
    return RETVAL;
}

static void
feersum_start_psgi_streaming(pTHX_ struct feer_conn *c, SV *streamer)
{
    dSP;
    ENTER;
    SAVETMPS;
    PUSHMARK(SP);
    mXPUSHs(feer_conn_2sv(c));
    XPUSHs(streamer);
    PUTBACK;
    call_method("_initiate_streaming_psgi", G_DISCARD|G_EVAL|G_VOID);
    SPAGAIN;
    if (unlikely(SvTRUE(ERRSV))) {
        call_died(aTHX_ c, "PSGI stream initiator");
    }
    PUTBACK;
    FREETMPS;
    LEAVE;
}

static void
feersum_handle_psgi_response(
    pTHX_ struct feer_conn *c, SV *ret, bool can_recurse)
{
    if (unlikely(!SvOK(ret) || !SvROK(ret))) {
        sv_setpvs(ERRSV, "Invalid PSGI response (expected reference)");
        call_died(aTHX_ c, "PSGI request");
        return;
    }

    if (SvOK(ret) && unlikely(!IsArrayRef(ret))) {
        if (likely(can_recurse)) {
            trace("PSGI response non-array, c=%p ret=%p\n", c, ret);
            feersum_start_psgi_streaming(aTHX_ c, ret);
        }
        else {
            sv_setpvs(ERRSV, "PSGI attempt to recurse in a streaming callback");
            call_died(aTHX_ c, "PSGI request");
        }
        return;
    }

    AV *psgi_triplet = (AV*)SvRV(ret);
    if (unlikely(av_len(psgi_triplet)+1 != 3)) {
        sv_setpvs(ERRSV, "Invalid PSGI array response (expected triplet)");
        call_died(aTHX_ c, "PSGI request");
        return;
    }

    trace("PSGI response triplet, c=%p av=%p\n", c, psgi_triplet);
    // we know there's three elems so *should* be safe to de-ref
    SV *msg =  *(av_fetch(psgi_triplet,0,0));
    SV *hdrs = *(av_fetch(psgi_triplet,1,0));
    SV *body = *(av_fetch(psgi_triplet,2,0));

    AV *headers;
    if (IsArrayRef(hdrs))
        headers = (AV*)SvRV(hdrs);
    else {
        sv_setpvs(ERRSV, "PSGI Headers must be an array-ref");
        call_died(aTHX_ c, "PSGI request");
        return;
    }

    if (likely(IsArrayRef(body))) {
        feersum_start_response(aTHX_ c, msg, headers, 0);
        feersum_write_whole_body(aTHX_ c, body);
    }
    else if (likely(SvROK(body))) { // probaby an IO::Handle-like object
        feersum_start_response(aTHX_ c, msg, headers, 1);
        c->poll_write_cb = newSVsv(body);
        c->poll_write_cb_is_io_handle = 1;
        conn_write_ready(c);
    }
    else {
        sv_setpvs(ERRSV, "Expected PSGI array-ref or IO::Handle-like body");
        call_died(aTHX_ c, "PSGI request");
        return;
    }
}

static int
feersum_close_handle (pTHX_ struct feer_conn *c, bool is_writer)
{
    int RETVAL;
    if (is_writer) {
        trace("close writer fd=%d, c=%p, refcnt=%d\n", c->fd, c, SvREFCNT(c->self));
        if (c->poll_write_cb) {
            SvREFCNT_dec(c->poll_write_cb);
            c->poll_write_cb = NULL;
        }
        if (c->responding < RESPOND_SHUTDOWN) {
            finish_wbuf(c);
            conn_write_ready(c);
            change_responding_state(c, RESPOND_SHUTDOWN);
        }
        RETVAL = 1;
    }
    else {
        trace("close reader fd=%d, c=%p\n", c->fd, c);
        // TODO: ref-dec poll_read_cb
        if (c->rbuf) {
            SvREFCNT_dec(c->rbuf);
            c->rbuf = NULL;
        }
        RETVAL = shutdown(c->fd, SHUT_RD);
        change_receiving_state(c, RECEIVE_SHUTDOWN);
    }

    // disassociate the handle from the conn
    SvREFCNT_dec(c->self);
    return RETVAL;
}

static SV*
feersum_conn_guard(pTHX_ struct feer_conn *c, SV *guard)
{
    if (guard) {
        if (c->ext_guard) SvREFCNT_dec(c->ext_guard);
        c->ext_guard = SvOK(guard) ? newSVsv(guard) : NULL;
    }
    return c->ext_guard ? newSVsv(c->ext_guard) : &PL_sv_undef;
}

static void
call_died (pTHX_ struct feer_conn *c, const char *cb_type)
{
    dSP;
#if DEBUG >= 1
    trace("An error was thrown in the %s callback: %-p\n",cb_type,ERRSV);
#endif
    PUSHMARK(SP);
    mXPUSHs(newSVsv(ERRSV));
    PUTBACK;
    call_pv("Feersum::DIED", G_DISCARD|G_EVAL|G_VOID|G_KEEPERR);
    SPAGAIN;

    respond_with_server_error(c,"Request handler exception.\n",0,500);
    sv_setsv(ERRSV, &PL_sv_undef);
}

static void
call_request_callback (struct feer_conn *c)
{
    dTHX;
    dSP;
    int flags;
    c->in_callback++;
    SvREFCNT_inc_void_NN(c->self);

    trace("request callback c=%p\n", c);

    ENTER;
    SAVETMPS;
    PUSHMARK(SP);

    if (request_cb_is_psgi) {
        HV *env = feersum_env(aTHX_ c);
        mXPUSHs(newRV_noinc((SV*)env));
        flags = G_EVAL|G_SCALAR;
    }
    else {
        mXPUSHs(feer_conn_2sv(c));
        flags = G_DISCARD|G_EVAL|G_VOID;
    }

    PUTBACK;
    int returned = call_sv(request_cb_cv, flags);
    SPAGAIN;

    trace("called request callback, errsv? %d\n", SvTRUE(ERRSV) ? 1 : 0);

    if (unlikely(SvTRUE(ERRSV))) {
        call_died(aTHX_ c, "request");
        returned = 0; // pretend nothing got returned
    }

    SV *psgi_response;
    if (request_cb_is_psgi && likely(returned >= 1)) {
        psgi_response = POPs;
        SvREFCNT_inc_void_NN(psgi_response);
    }

    trace("leaving request callback\n");
    PUTBACK;

    if (request_cb_is_psgi && likely(returned >= 1)) {
        feersum_handle_psgi_response(aTHX_ c, psgi_response, 1); // can_recurse
        SvREFCNT_dec(psgi_response);
    }

    //fangyousong
    if (request_cb_is_psgi && c->expected_cl > 0) {
        SvREFCNT_dec(c->self);
    }


    c->in_callback--;
    SvREFCNT_dec(c->self);

    FREETMPS;
    LEAVE;
}

static void
call_poll_callback (struct feer_conn *c, bool is_write)
{
    dTHX;
    dSP;

    SV *cb = (is_write) ? c->poll_write_cb : NULL;

    if (unlikely(cb == NULL)) return;

    c->in_callback++;

    trace("%s poll callback c=%p cbrv=%p\n",
        is_write ? "write" : "read", c, cb);

    ENTER;
    SAVETMPS;
    PUSHMARK(SP);
    mXPUSHs(new_feer_conn_handle(aTHX_ c, is_write));
    PUTBACK;
    call_sv(cb, G_DISCARD|G_EVAL|G_VOID);
    SPAGAIN;

    trace("called %s poll callback, errsv? %d\n",
        is_write ? "write" : "read", SvTRUE(ERRSV) ? 1 : 0);

    if (unlikely(SvTRUE(ERRSV))) {
        call_died(aTHX_ c, is_write ? "write poll" : "read poll");
    }

    trace("leaving %s poll callback\n", is_write ? "write" : "read");
    PUTBACK;
    FREETMPS;
    LEAVE;

    c->in_callback--;
}

static void
pump_io_handle (struct feer_conn *c, SV *io)
{
    dTHX;
    dSP;

    if (unlikely(io == NULL)) return;

    c->in_callback++;

    trace("pump io handle %d\n", c->fd);

    ENTER;
    SAVETMPS;

    // Emulate `local $/ = \4096;`
    SV *old_rs = PL_rs;
    PL_rs = sv_2mortal(newRV_noinc(newSViv(4096)));
    sv_setsv(get_sv("/", GV_ADD), PL_rs);

    PUSHMARK(SP);
    XPUSHs(c->poll_write_cb);
    PUTBACK;
    int returned = call_method("getline", G_SCALAR|G_EVAL);
    SPAGAIN;

    trace("called getline on io handle fd=%d errsv=%d returned=%d\n",
        c->fd, SvTRUE(ERRSV) ? 1 : 0, returned);

    if (unlikely(SvTRUE(ERRSV))) {
        call_died(aTHX_ c, "getline on io handle");
        goto done_pump_io;
    }

    SV *ret = NULL;
    if (returned > 0)
        ret = POPs;
    if (ret && SvMAGICAL(ret))
        ret = sv_2mortal(newSVsv(ret));

    if (unlikely(!ret || !SvOK(ret))) {
        // returned undef, so call the close method out of niceity
        PUSHMARK(SP);
        XPUSHs(c->poll_write_cb);
        PUTBACK;
        call_method("close", G_VOID|G_DISCARD|G_EVAL);
        SPAGAIN;

        if (unlikely(SvTRUE(ERRSV))) {
            trouble("Couldn't close body IO handle: %-p",ERRSV);
        }

        SvREFCNT_dec(c->poll_write_cb);
        c->poll_write_cb = NULL;
        finish_wbuf(c);
        change_responding_state(c, RESPOND_SHUTDOWN);

        goto done_pump_io;
    }

    if (c->is_http11)
        add_chunk_sv_to_wbuf(c, ret);
    else
        add_sv_to_wbuf(c, ret);

done_pump_io:
    trace("leaving pump io handle %d\n", c->fd);

    PUTBACK;
    FREETMPS;
    LEAVE;

    PL_rs = old_rs;
    sv_setsv(get_sv("/", GV_ADD), old_rs);

    c->in_callback--;
}

static int
psgix_io_svt_get (pTHX_ SV *sv, MAGIC *mg)
{
    dSP;

    struct feer_conn *c = sv_2feer_conn(mg->mg_obj);
    trace("invoking psgix.io magic for fd=%d\n", c->fd);

    sv_unmagic(sv, PERL_MAGIC_ext);

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);
    XPUSHs(sv);
    mXPUSHs(newSViv(c->fd));
    PUTBACK;

    call_pv("Feersum::Connection::_raw", G_VOID|G_DISCARD|G_EVAL);
    SPAGAIN;

    if (unlikely(SvTRUE(ERRSV))) {
        call_died(aTHX_ c, "psgix.io magic");
    }
    else {
        SV *io_glob   = SvRV(sv);
        GvSV(io_glob) = newRV_inc(c->self);

        // Put whatever remainder data into the socket buffer.
        // Optimizes for the websocket case.
        //
        // TODO: For keepalive support the opposite operation is required;
        // pull the data out of the socket buffer and back into feersum.
        if (likely(c->rbuf && SvOK(c->rbuf) && SvCUR(c->rbuf))) {
            STRLEN rbuf_len;
            const char *rbuf_ptr = SvPV(c->rbuf, rbuf_len);
            IO *io = GvIOp(io_glob);
            assert(io != NULL);
            PerlIO_unread(IoIFP(io), (const void *)rbuf_ptr, rbuf_len);
            sv_setpvs(c->rbuf, "");
        }

        stop_read_watcher(c);
        stop_read_timer(c);
        // don't stop write watcher in case there's outstanding data.
    }

    PUTBACK;
    FREETMPS;
    LEAVE;
    return 0;
}

MODULE = Feersum		PACKAGE = Feersum

PROTOTYPES: ENABLE

void
set_server_name_and_port(SV *self, SV *name, SV *port)
    PPCODE:
{
    if (feer_server_name)
        SvREFCNT_dec(feer_server_name);
    feer_server_name = newSVsv(name);
    SvREADONLY_on(feer_server_name);

    if (feer_server_port)
        SvREFCNT_dec(feer_server_port);
    feer_server_port = newSVsv(port);
    SvREADONLY_on(feer_server_port);
}

void
accept_on_fd(SV *self, int fd)
    PPCODE:
{
    trace("going to accept on %d\n",fd);
    feersum_ev_loop = EV_DEFAULT;

    signal(SIGPIPE, SIG_IGN);

    ev_prepare_init(&ep, prepare_cb);
    ev_prepare_start(feersum_ev_loop, &ep);

    ev_check_init(&ec, check_cb);
    ev_check_start(feersum_ev_loop, &ec);

    ev_idle_init(&ei, idle_cb);

    ev_io_init(&accept_w, accept_cb, fd, EV_READ);
}

void
unlisten (SV *self)
    PPCODE:
{
    trace("stopping accept\n");
    ev_prepare_stop(feersum_ev_loop, &ep);
    ev_check_stop(feersum_ev_loop, &ec);
    ev_idle_stop(feersum_ev_loop, &ei);
    ev_io_stop(feersum_ev_loop, &accept_w);
}

void
request_handler(SV *self, SV *cb)
    PROTOTYPE: $&
    ALIAS:
        psgi_request_handler = 1
    PPCODE:
{
    if (unlikely(!SvOK(cb) || !SvROK(cb)))
        croak("can't supply an undef handler");
    if (request_cb_cv)
        SvREFCNT_dec(request_cb_cv);
    request_cb_cv = newSVsv(cb); // copy so 5.8.7 overload magic sticks.
    request_cb_is_psgi = ix;
    trace("assigned %s request handler %p\n",
        request_cb_is_psgi?"PSGI":"Feersum", request_cb_cv);
}

void
graceful_shutdown (SV *self, SV *cb)
    PROTOTYPE: $&
    PPCODE:
{
    if (!IsCodeRef(cb))
        croak("must supply a code reference");
    if (unlikely(shutting_down))
        croak("already shutting down");
    shutdown_cb_cv = newSVsv(cb);
    trace("shutting down, handler=%p, active=%d\n", SvRV(cb), active_conns);

    shutting_down = 1;
    ev_io_stop(feersum_ev_loop, &accept_w);
    close(accept_w.fd);

    if (active_conns <= 0) {
        trace("shutdown is immediate\n");
        dSP;
        ENTER;
        SAVETMPS;
        PUSHMARK(SP);
        call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
        PUTBACK;
        trace3("called shutdown handler\n");
        SvREFCNT_dec(shutdown_cb_cv);
        shutdown_cb_cv = NULL;
        FREETMPS;
        LEAVE;
    }
}

double
read_timeout (SV *self, ...)
    PROTOTYPE: $;$
    CODE:
{
    if (items <= 1) {
        RETVAL = read_timeout;
    }
    else if (items == 2) {
        SV *duration = ST(1);
        NV new_read_timeout = SvNV(duration);
        if (!(new_read_timeout > 0.0)) {
            croak("must set a positive (non-zero) value for the timeout");
        }
        read_timeout = (double) new_read_timeout;
    }
}
    OUTPUT:
        RETVAL

void
DESTROY (SV *self)
    PPCODE:
{
    trace3("DESTROY server\n");
    if (request_cb_cv)
        SvREFCNT_dec(request_cb_cv);
}

MODULE = Feersum	PACKAGE = Feersum::Connection::Handle

PROTOTYPES: ENABLE

int
fileno (feer_conn_handle *hdl)
    CODE:
        RETVAL = c->fd;
    OUTPUT:
        RETVAL

void
DESTROY (SV *self)
    ALIAS:
        Feersum::Connection::Reader::DESTROY = 1
        Feersum::Connection::Writer::DESTROY = 2
    PPCODE:
{
    feer_conn_handle *hdl = sv_2feer_conn_handle(self, 0);

    if (hdl == NULL) {
        trace3("DESTROY handle (closed) class=%s\n",
            HvNAME(SvSTASH(SvRV(self))));
    }
    else {
        struct feer_conn *c = (struct feer_conn *)hdl;
        trace3("DESTROY handle fd=%d, class=%s\n", c->fd,
            HvNAME(SvSTASH(SvRV(self))));
        if (ix == 2) // only close the writer on destruction
            feersum_close_handle(aTHX_ c, 1);
    }
}

SV*
read (feer_conn_handle *hdl, SV *buf, size_t len, ...)
    PROTOTYPE: $$$;$
    PPCODE:
{
    STRLEN buf_len = 0, src_len = 0;
    ssize_t offset;
    char *buf_ptr, *src_ptr;

    // optimizes for the "read everything" case.

    if (unlikely(items == 4) && SvOK(ST(3)) && SvIOK(ST(3)))
        offset = SvIV(ST(3));
    else
        offset = 0;

    trace("read fd=%d : request    len=%"Sz_uf" off=%"Ssz_df"\n",
        c->fd, (Sz)len, (Ssz)offset);

    if (unlikely(c->receiving <= RECEIVE_HEADERS))
        // XXX as of 0.984 this is dead code
        croak("can't call read() until the body begins to arrive");

    if (!SvOK(buf) || !SvPOK(buf)) {
        // force to a PV and ensure buffer space
        sv_setpvn(buf,"",0);
        SvGROW(buf, len+1);
    }

    if (unlikely(SvREADONLY(buf)))
        croak("buffer must not be read-only");

    if (unlikely(len == 0))
        XSRETURN_IV(0); // assumes undef buffer got allocated to empty-string

    buf_ptr = SvPV(buf, buf_len);
    if (likely(c->rbuf))
        src_ptr = SvPV(c->rbuf, src_len);

    if (unlikely(len < 0))
        len = src_len;

    if (unlikely(offset < 0))
        offset = (-offset >= c->received_cl) ? 0 : c->received_cl + offset;

    if (unlikely(len + offset > src_len))
        len = src_len - offset;

    trace("read fd=%d : normalized len=%"Sz_uf" off=%"Ssz_df" src_len=%"Sz_uf"\n",
        c->fd, (Sz)len, (Ssz)offset, (Sz)src_len);

    if (unlikely(!c->rbuf || src_len == 0 || offset >= c->received_cl)) {
        trace2("rbuf empty during read %d\n", c->fd);
        if (c->receiving == RECEIVE_SHUTDOWN) {
            XSRETURN_IV(0);
        }
        else {
            errno = EAGAIN;
            XSRETURN_UNDEF;
        }
    }

    if (likely(len == src_len && offset == 0)) {
        trace2("appending entire rbuf fd=%d\n", c->fd);
        sv_2mortal(c->rbuf); // allow pv to be stolen
        if (likely(buf_len == 0)) {
            sv_setsv(buf, c->rbuf);
        }
        else {
            sv_catsv(buf, c->rbuf);
        }
        c->rbuf = NULL;
    }
    else {
        src_ptr += offset;
        trace2("appending partial rbuf fd=%d len=%"Sz_uf" off=%"Ssz_df" ptr=%p\n",
            c->fd, len, offset, src_ptr);
        SvGROW(buf, SvCUR(buf) + len);
        sv_catpvn(buf, src_ptr, len);
        if (likely(items == 3)) {
            // there wasn't an offset param, throw away beginning
            sv_chop(c->rbuf, SvPVX(c->rbuf) + len);
        }
    }

    XSRETURN_IV(len);
}

STRLEN
write (feer_conn_handle *hdl, ...)
    PROTOTYPE: $;$
    CODE:
{
    if (unlikely(c->responding != RESPOND_STREAMING))
        croak("can only call write in streaming mode");

    SV *body = (items == 2) ? ST(1) : &PL_sv_undef;
    if (unlikely(!body || !SvOK(body)))
        XSRETURN_IV(0);

    trace("write fd=%d c=%p, body=%p\n", c->fd, c, body);
    if (SvROK(body)) {
        SV *refd = SvRV(body);
        if (SvOK(refd) && SvPOK(refd)) {
            body = refd;
        }
        else {
            croak("body must be a scalar, scalar ref or undef");
        }
    }
    (void)SvPV(body, RETVAL);

    if (c->is_http11)
        add_chunk_sv_to_wbuf(c, body);
    else
        add_sv_to_wbuf(c, body);

    conn_write_ready(c);
}
    OUTPUT:
        RETVAL

void
write_array (feer_conn_handle *hdl, AV *abody)
    PROTOTYPE: $$
    PPCODE:
{
    if (unlikely(c->responding != RESPOND_STREAMING))
        croak("can only call write in streaming mode");

    trace("write_array fd=%d c=%p, abody=%p\n", c->fd, c, abody);

    I32 amax = av_len(abody);
    int i;
    if (c->is_http11) {
        for (i=0; i<=amax; i++) {
            SV *sv = fetch_av_normal(aTHX_ abody, i);
            if (likely(sv)) add_chunk_sv_to_wbuf(c, sv);
        }
    }
    else {
        for (i=0; i<=amax; i++) {
            SV *sv = fetch_av_normal(aTHX_ abody, i);
            if (likely(sv)) add_sv_to_wbuf(c, sv);
        }
    }

    conn_write_ready(c);
}

int
seek (feer_conn_handle *hdl, ssize_t offset, ...)
    PROTOTYPE: $$;$
    CODE:
{
    int whence = SEEK_CUR;
    if (items == 3 && SvOK(ST(2)) && SvIOK(ST(2)))
        whence = SvIV(ST(2));

    trace("seek fd=%d offset=%"Ssz_df" whence=%d\n", c->fd, offset, whence);

    if (unlikely(!c->rbuf)) {
        // handle is effectively "closed"
        RETVAL = 0;
    }
    else if (offset == 0) {
        RETVAL = 1; // stay put for any whence
    }
    else if (offset > 0 && (whence == SEEK_CUR || whence == SEEK_SET)) {
        STRLEN len;
        const char *str = SvPV_const(c->rbuf, len);
        if (offset > len)
            offset = len;
        sv_chop(c->rbuf, str + offset);
        RETVAL = 1;
    }
    else if (offset < 0 && whence == SEEK_END) {
        STRLEN len;
        const char *str = SvPV_const(c->rbuf, len);
        offset += len; // can't be > len since block is offset<0
        if (offset == 0) {
            RETVAL = 1; // no-op, but OK
        }
        else if (offset > 0) {
            sv_chop(c->rbuf, str + offset);
            RETVAL = 1;
        }
        else {
            // past beginning of string
            RETVAL = 0;
        }
    }
    else {
        // invalid seek
        RETVAL = 0;
    }
}
    OUTPUT:
        RETVAL

int
close (feer_conn_handle *hdl)
    PROTOTYPE: $
    ALIAS:
        Feersum::Connection::Reader::close = 1
        Feersum::Connection::Writer::close = 2
    CODE:
{
    assert(ix);
    RETVAL = feersum_close_handle(aTHX_ c, (ix == 2));
    SvUVX(hdl_sv) = 0;
}
    OUTPUT:
        RETVAL

void
_poll_cb (feer_conn_handle *hdl, SV *cb)
    PROTOTYPE: $$
    ALIAS:
        Feersum::Connection::Reader::poll_cb = 1
        Feersum::Connection::Writer::poll_cb = 2
    PPCODE:
{
    if (unlikely(ix < 1 || ix > 2))
        croak("can't call _poll_cb directly");
    else if (unlikely(ix == 1))
        croak("poll_cb for reading not yet supported"); // TODO poll_read_cb

    if (c->poll_write_cb != NULL) {
        SvREFCNT_dec(c->poll_write_cb);
        c->poll_write_cb = NULL;
    }

    if (!SvOK(cb)) {
        trace("unset poll_cb ix=%d\n", ix);
        return;
    }
    else if (unlikely(!IsCodeRef(cb)))
        croak("must supply a code reference to poll_cb");

    c->poll_write_cb = newSVsv(cb);
    conn_write_ready(c);
}

SV*
response_guard (feer_conn_handle *hdl, ...)
    PROTOTYPE: $;$
    CODE:
        RETVAL = feersum_conn_guard(aTHX_ c, (items==2) ? ST(1) : NULL);
    OUTPUT:
        RETVAL

MODULE = Feersum	PACKAGE = Feersum::Connection

PROTOTYPES: ENABLE

SV *
start_streaming (struct feer_conn *c, SV *message, AV *headers)
    PROTOTYPE: $$\@
    CODE:
        feersum_start_response(aTHX_ c, message, headers, 1);
        RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
    OUTPUT:
        RETVAL

size_t
send_response (struct feer_conn *c, SV* message, AV *headers, SV *body)
    PROTOTYPE: $$\@$
    CODE:
        feersum_start_response(aTHX_ c, message, headers, 0);
        if (unlikely(!SvOK(body)))
            croak("can't send_response with an undef body");
        RETVAL = feersum_write_whole_body(aTHX_ c, body);
    OUTPUT:
        RETVAL

SV*
_continue_streaming_psgi (struct feer_conn *c, SV *psgi_response)
    PROTOTYPE: $\@
    CODE:
{
    AV *av;
    int len = 0;

    if (IsArrayRef(psgi_response)) {
        av = (AV*)SvRV(psgi_response);
        len = av_len(av) + 1;
    }

    if (len == 3) {
        // 0 is "don't recurse" (i.e. don't allow another code-ref)
        feersum_handle_psgi_response(aTHX_ c, psgi_response, 0);
        RETVAL = &PL_sv_undef;
    }
    else if (len == 2) {
        SV *message = *(av_fetch(av,0,0));
        SV *headers = *(av_fetch(av,1,0));
        if (unlikely(!IsArrayRef(headers)))
            croak("PSGI headers must be an array ref");
        feersum_start_response(aTHX_ c, message, (AV*)SvRV(headers), 1);
        RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
    }
    else {
        croak("PSGI response starter expects a 2 or 3 element array-ref");
    }
}
    OUTPUT:
        RETVAL

void
force_http10 (struct feer_conn *c)
    PROTOTYPE: $
    ALIAS:
        force_http11 = 1
    PPCODE:
        c->is_http11 = ix;

SV *
env (struct feer_conn *c)
    PROTOTYPE: $
    CODE:
        RETVAL = newRV_noinc((SV*)feersum_env(aTHX_ c));
    OUTPUT:
        RETVAL

int
fileno (struct feer_conn *c)
    CODE:
        RETVAL = c->fd;
    OUTPUT:
        RETVAL

SV*
response_guard (struct feer_conn *c, ...)
    PROTOTYPE: $;$
    CODE:
        RETVAL = feersum_conn_guard(aTHX_ c, (items == 2) ? ST(1) : NULL);
    OUTPUT:
        RETVAL

void
DESTROY (struct feer_conn *c)
    PPCODE:
{
    int i;
    trace("DESTROY connection fd=%d c=%p\n", c->fd, c);

    if (likely(c->rbuf)) SvREFCNT_dec(c->rbuf);

    if (c->wbuf_rinq) {
        struct iomatrix *m;
        while ((m = (struct iomatrix *)rinq_shift(&c->wbuf_rinq)) != NULL) {
            for (i=0; i < m->count; i++) {
                if (m->sv[i]) SvREFCNT_dec(m->sv[i]);
            }
            Safefree(m);
        }
    }

    if (likely(c->req)) {
        if (c->req->buf) SvREFCNT_dec(c->req->buf);
        Safefree(c->req);
    }

    if (likely(c->sa)) free(c->sa);

    safe_close_conn(c, "close at destruction");

    if (c->poll_write_cb) SvREFCNT_dec(c->poll_write_cb);

    if (c->ext_guard) SvREFCNT_dec(c->ext_guard);

    active_conns--;

    if (unlikely(shutting_down && active_conns <= 0)) {
        ev_idle_stop(feersum_ev_loop, &ei);
        ev_prepare_stop(feersum_ev_loop, &ep);
        ev_check_stop(feersum_ev_loop, &ec);

        trace3("... was last conn, going to try shutdown\n");
        if (shutdown_cb_cv) {
            PUSHMARK(SP);
            call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
            PUTBACK;
            trace3("... ok, called that handler\n");
            SvREFCNT_dec(shutdown_cb_cv);
            shutdown_cb_cv = NULL;
        }
    }
}

MODULE = Feersum	PACKAGE = Feersum

BOOT:
    {
        feer_stash = gv_stashpv("Feersum", 1);
        feer_conn_stash = gv_stashpv("Feersum::Connection", 1);
        feer_conn_writer_stash = gv_stashpv("Feersum::Connection::Writer",0);
        feer_conn_reader_stash = gv_stashpv("Feersum::Connection::Reader",0);
        I_EV_API("Feersum");

        psgi_ver = newAV();
        av_extend(psgi_ver, 2);
        av_push(psgi_ver, newSViv(1));
        av_push(psgi_ver, newSViv(1));
        SvREADONLY_on((SV*)psgi_ver);

        psgi_serv10 = newSVpvs("HTTP/1.0");
        SvREADONLY_on(psgi_serv10);
        psgi_serv11 = newSVpvs("HTTP/1.1");
        SvREADONLY_on(psgi_serv11);

        Zero(&psgix_io_vtbl, 1, MGVTBL);
        psgix_io_vtbl.svt_get = psgix_io_svt_get;

        trace3("Feersum booted, iomatrix %lu "
                "(IOV_MAX=%u, FEERSUM_IOMATRIX_SIZE=%u), "
            "feer_req %lu, "
            "feer_conn %lu\n",
            (long unsigned int)sizeof(struct iomatrix),
                (unsigned int)IOV_MAX,
                (unsigned int)FEERSUM_IOMATRIX_SIZE,
            (long unsigned int)sizeof(struct feer_req),
            (long unsigned int)sizeof(struct feer_conn)
        );
    }