The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
static pe_ring NQueue;
static int StarvePrio = PE_QUEUES - 2;

static void boot_queue() {
    HV *stash = gv_stashpv("Event", 1);
    PE_RING_INIT(&NQueue, 0);
    newCONSTSUB(stash, "QUEUES", newSViv(PE_QUEUES));
    newCONSTSUB(stash, "PRIO_NORMAL", newSViv(PE_PRIO_NORMAL));
    newCONSTSUB(stash, "PRIO_HIGH", newSViv(PE_PRIO_HIGH));
}

/*inline*/ static void dequeEvent(pe_event *ev) {
    assert(ev);
    PE_RING_DETACH(&ev->que);
    --ActiveWatchers;
}

static void db_show_queue() {
    pe_event *ev;
    ev = (pe_event*) NQueue.next->self;
    while (ev) {
	warn("0x%x : %d\n", ev, ev->prio);
	ev = (pe_event*) ev->que.next->self;
    }
}

static int prepare_event(pe_event *ev, char *forwhat) {
    /* AVOID DIEING IN HERE!! */
    STRLEN n_a;
    pe_watcher *wa = ev->up;
    if (!ev->callback) {
	if (WaPERLCB(wa)) {
	    ev->callback = SvREFCNT_inc(wa->callback);
	    EvPERLCB_on(ev);
	} else {
	    ev->callback = wa->callback;
	    ev->ext_data = wa->ext_data;
	    EvPERLCB_off(ev);
	}
	assert(ev->callback);
    }
    assert(!WaSUSPEND(wa));
    assert(WaREENTRANT(wa) || !wa->running);
    if (!WaACTIVE(wa)) {
	if (!WaRUNNOW(wa))
	    warn("Event: event for !ACTIVE watcher '%s'", SvPV(wa->desc,n_a));
    }
    else {
	if (!WaREPEAT(wa))
	    pe_watcher_stop(wa, 0);
	else if (WaINVOKE1(wa))
	    pe_watcher_off(wa);
    }
    WaRUNNOW_off(wa); /* race condition? XXX */
    if (WaDEBUGx(wa) >= 3)
	warn("Event: %s '%s' prio=%d\n", forwhat, SvPV(wa->desc,n_a), ev->prio);
    return 1;
}

static void queueEvent(pe_event *ev) {  /**INVOKE**/
    assert(ev->hits);
    if (!PE_RING_EMPTY(&ev->que)) return; /* clump'd event already queued */
    if (!prepare_event(ev, "queue")) return;

    if (ev->prio < 0) {  /* invoke the event immediately! */
	ev->prio = 0;
	pe_event_invoke(ev);
	return;
    }
    if (ev->prio >= PE_QUEUES)
	ev->prio = PE_QUEUES-1;

    {
	/* queue in reverse direction? XXX */ 
	/*  warn("-- adding 0x%x/%d\n", ev, prio); db_show_queue();/**/
	pe_ring *rg;
	rg = NQueue.next;
	while (rg->self && ((pe_event*)rg->self)->prio <= ev->prio)
	    rg = rg->next;
	PE_RING_ADD_BEFORE(&ev->que, rg);
	/*  warn("=\n"); db_show_queue();/**/
	++ActiveWatchers;
    }
}

static int pe_empty_queue(int maxprio) { /**INVOKE**/
    pe_event *ev;
    ev = (pe_event*) NQueue.next->self;
    if (ev && ev->prio < maxprio) {
	dequeEvent(ev);
	pe_event_invoke(ev);
	return 1;
    }
    return 0;
}

/*inline*/ static void pe_multiplex(NV tm) {
    if (SvIVX(DebugLevel) >= 2) {
	warn("Event: multiplex %.4fs %s%s\n", tm,
	     PE_RING_EMPTY(&NQueue)?"":"QUEUE",
	     PE_RING_EMPTY(&Idle)?"":"IDLE");
    }
    if (!Estat.on)
	pe_sys_multiplex(tm);
    else {
	void *st = Estat.enter(-1, 0);
	pe_sys_multiplex(tm);
	Estat.commit(st, 0);
    }
}

static NV pe_map_prepare(NV tm) {
    pe_qcallback *qcb = (pe_qcallback*) Prepare.prev->self;
    while (qcb) {
	if (qcb->is_perl) {
	    SV *got;
	    NV when;
	    dSP;
	    PUSHMARK(SP);
	    PUTBACK;
	    perl_call_sv((SV*)qcb->callback, G_SCALAR);
	    SPAGAIN;
	    got = POPs;
	    PUTBACK;
	    when = SvNOK(got) ? SvNVX(got) : SvNV(got);
	    if (when < tm) tm = when;
	}
	else { /* !is_perl */
	    NV got = (* (NV(*)(void*)) qcb->callback)(qcb->ext_data);
	    if (got < tm) tm = got;
	}
	qcb = (pe_qcallback*) qcb->ring.prev->self;
    }
    return tm;
}

static void pe_queue_pending() {
    NV tm = 0;
    if (!PE_RING_EMPTY(&Prepare)) tm = pe_map_prepare(tm);

    pe_multiplex(0);

    pe_timeables_check();
    if (!PE_RING_EMPTY(&Check)) pe_map_check(&Check);

    pe_signal_asynccheck();
    if (!PE_RING_EMPTY(&AsyncCheck)) pe_map_check(&AsyncCheck);
}

static int one_event(NV tm) {  /**INVOKE**/
    /*if (SvIVX(DebugLevel) >= 4)
      warn("Event: ActiveWatchers=%d\n", ActiveWatchers); /**/

    pe_signal_asynccheck();
    if (!PE_RING_EMPTY(&AsyncCheck)) pe_map_check(&AsyncCheck);

    if (pe_empty_queue(StarvePrio)) return 1;

    if (!PE_RING_EMPTY(&NQueue) || !PE_RING_EMPTY(&Idle)) {
	tm = 0;
    }
    else {
	NV t1 = timeTillTimer();
	if (t1 < tm) tm = t1;
    }
    if (!PE_RING_EMPTY(&Prepare)) tm = pe_map_prepare(tm);

    pe_multiplex(tm);

    pe_timeables_check();
    if (!PE_RING_EMPTY(&Check)) pe_map_check(&Check);

    if (tm) {
	pe_signal_asynccheck();
	if (!PE_RING_EMPTY(&AsyncCheck)) pe_map_check(&AsyncCheck);
    }

    if (pe_empty_queue(PE_QUEUES)) return 1;

    while (1) {
	pe_watcher *wa;
	pe_event *ev;
	pe_ring *lk;

	if (PE_RING_EMPTY(&Idle)) return 0;

	lk = Idle.prev;
	PE_RING_DETACH(lk);
	wa = (pe_watcher*) lk->self;

	/* idle is not an event so CLUMP is never an option but we still need
	   to create an event to pass info to the callback */
	ev = pe_event_allocate(wa);
	if (!prepare_event(ev, "idle")) continue;
	/* can't queueEvent because we are already missed that */
	pe_event_invoke(ev);
	return 1;
    }
}

static void pe_reentry() {
    pe_watcher *wa;
    struct pe_cbframe *frp;

    ENTER;  /* for SAVE*() macro (see below) */

    if (CurCBFrame < 0)
	return;

    frp = CBFrame + CurCBFrame;
    wa = frp->ev->up;
    assert(wa->running == frp->run_id);
    if (Estat.on)
	Estat.suspend(frp->stats);  /* reversed by pe_event_postCB? */
    if (WaREPEAT(wa)) {
	if (WaREENTRANT(wa)) {
	    if (WaACTIVE(wa) && WaINVOKE1(wa))
		pe_watcher_on(wa, 1);
	} else {
	    if (!WaSUSPEND(wa)) {
		/* temporarily suspend non-reentrant watcher until
		   callback is finished! */
		pe_watcher_suspend(wa);
		SAVEDESTRUCTOR(_resume_watcher, wa);
	    }
	}
    }
}

static int safe_one_event(NV maxtm) {
    int got;
    pe_check_recovery();
    pe_reentry();
    got = one_event(maxtm);
    LEAVE; /* reentry */
    return got;
}

static void pe_unloop(SV *why) {
    SV *rsv = perl_get_sv("Event::Result", 0);
    assert(rsv);
    sv_setsv(rsv, why);
    if (--ExitLevel < 0) {
	warn("Event::unloop() to %d", ExitLevel);
    }
}

static void pe_unloop_all(SV *why) {
    SV *rsv = perl_get_sv("Event::TopResult", 0);
    assert(rsv);
    sv_setsv(rsv, why);
    ExitLevel = 0;
}