static int NextID = 0;
static pe_ring AllWatchers;
static struct pe_watcher_vtbl pe_watcher_base_vtbl;
static void pe_watcher_init(pe_watcher *ev, HV *stash, SV *temple) {
STRLEN n_a;
assert(ev);
assert(ev->vtbl);
if (!ev->vtbl->stash)
croak("sub-class VTBL must have a stash (doesn't!)");
if (!ev->vtbl->did_require) {
SV *tmp;
char *name = HvNAME(ev->vtbl->stash);
dTHX;
if (memEQ(name, "Event::", 7))
name += 7;
tmp = sv_2mortal(newSVpvf("Event/%s.pm", name));
perl_require_pv(SvPV(tmp, n_a));
if (sv_true(ERRSV))
croak("Event: could not load perl support code for Event::%s: %s",
name, SvPV(ERRSV,n_a));
++ev->vtbl->did_require;
}
/* if we have a non-default stash then we need to save it! */
ev->mysv = stash || temple ? wrap_watcher(ev, stash, temple) : 0;
PE_RING_INIT(&ev->all, ev);
PE_RING_INIT(&ev->events, 0);
/* no exceptions after this point */
PE_RING_UNSHIFT(&ev->all, &AllWatchers);
WaFLAGS(ev) = 0;
WaINVOKE1_on(ev);
WaREENTRANT_on(ev);
ev->FALLBACK = 0;
NextID = (NextID+1) & 0x7fff; /* make it look like the kernel :-, */
ev->refcnt = 0;
ev->desc = newSVpvn("??",2);
ev->running = 0;
ev->max_cb_tm = 1; /* make default configurable? */
ev->cbtime = 0;
ev->prio = PE_QUEUES;
ev->callback = 0;
ev->ext_data = 0;
ev->stats = 0;
}
static void pe_watcher_cancel_events(pe_watcher *wa) {
pe_event *ev;
while (!PE_RING_EMPTY(&wa->events)) {
pe_ring *lk = wa->events.prev;
ev = (pe_event*) lk->self;
dequeEvent(ev);
pe_event_release(ev);
}
}
static void pe_watcher_dtor(pe_watcher *wa) {
STRLEN n_a;
assert(WaCANDESTROY(wa));
if (WaDESTROYED(wa)) {
warn("Attempt to destroy watcher 0x%x again (ignored)", wa);
return;
}
WaDESTROYED_on(wa);
if (WaDEBUGx(wa) >= 3)
warn("Watcher '%s' destroyed", SvPV(wa->desc, n_a));
assert(PE_RING_EMPTY(&wa->events));
if (WaPERLCB(wa))
SvREFCNT_dec(wa->callback);
if (wa->FALLBACK)
SvREFCNT_dec(wa->FALLBACK);
if (wa->desc)
SvREFCNT_dec(wa->desc);
if (wa->stats)
Estat.dtor(wa->stats);
/* safefree(wa); do it yourself */
}
/********************************** *******************************/
WKEYMETH(_watcher_callback) {
if (nval) {
AV *av;
SV *sv;
SV *old=0;
if (WaPERLCB(ev))
old = (SV*) ev->callback;
if (!SvOK(nval)) {
WaPERLCB_off(ev);
ev->callback = 0;
ev->ext_data = 0;
pe_watcher_stop(ev, 0);
} else if (SvROK(nval) && (SvTYPE(sv=SvRV(nval)) == SVt_PVCV)) {
WaPERLCB_on(ev);
ev->callback = SvREFCNT_inc(nval);
} else if (SvROK(nval) &&
(SvTYPE(av=(AV*)SvRV(nval)) == SVt_PVAV) &&
av_len(av) == 1) {
/* method lookup code adapted from universal.c */
STRLEN n_a;
SV *pkgsv = *av_fetch(av, 0, 0);
HV *pkg = NULL;
SV *namesv = *av_fetch(av, 1, 0);
char *name = SvPV(namesv, n_a);
int ok=0;
if(SvROK(pkgsv)) {
pkgsv = (SV*)SvRV(pkgsv);
if(SvOBJECT(pkgsv))
pkg = SvSTASH(pkgsv);
}
else {
pkg = gv_stashsv(pkgsv, FALSE);
}
if (pkg) {
GV *gv = gv_fetchmethod_autoload(pkg, name, FALSE);
if (gv && isGV(gv))
ok=1;
}
else {
warn("Event: package '%s' doesn't exist (creating)",
SvPV(pkgsv, n_a));
pkg = gv_stashsv(pkgsv, 1);
}
if (!ok) {
warn("Event: callback method %s->%s doesn't exist",
HvNAME(pkg), name);
}
WaPERLCB_on(ev);
ev->callback = SvREFCNT_inc(nval);
} else {
if (SvIV(DebugLevel) >= 2)
sv_dump(sv);
croak("Callback must be a code ref or [$object, $method_name]");
}
if (old)
SvREFCNT_dec(old);
}
{
SV *ret = (WaPERLCB(ev)?
(SV*) ev->callback :
(ev->callback?
sv_2mortal(newSVpvf("<FPTR=0x%p EXT=0x%p>",
ev->callback, ev->ext_data)) :
&PL_sv_undef));
dSP;
XPUSHs(ret);
PUTBACK;
}
}
WKEYMETH(_watcher_cbtime) {
if (!nval) {
dSP;
XPUSHs(sv_2mortal(newSVnv(ev->cbtime)));
PUTBACK;
} else
croak("'e_cbtime' is read-only");
}
WKEYMETH(_watcher_desc) {
if (nval) {
sv_setsv(ev->desc, nval);
}
{
dSP;
XPUSHs(ev->desc);
PUTBACK;
}
}
WKEYMETH(_watcher_debug) {
if (nval) {
if (sv_true(nval)) WaDEBUG_on(ev); else WaDEBUG_off(ev);
}
{
dSP;
XPUSHs(boolSV(WaDEBUG(ev)));
PUTBACK;
}
}
WKEYMETH(_watcher_priority) {
if (nval) {
ev->prio = SvIV(nval);
}
{
dSP;
XPUSHs(sv_2mortal(newSViv(ev->prio)));
PUTBACK;
}
}
WKEYMETH(_watcher_reentrant) {
if (nval) {
if (sv_true(nval))
WaREENTRANT_on(ev);
else {
if (ev->running > 1)
croak("'reentrant' cannot be turned off while nested %d times",
ev->running);
WaREENTRANT_off(ev);
}
}
{
dSP;
XPUSHs(boolSV(WaREENTRANT(ev)));
PUTBACK;
}
}
WKEYMETH(_watcher_repeat) {
if (nval) {
if (sv_true(nval)) WaREPEAT_on(ev); else WaREPEAT_off(ev);
}
{
dSP;
XPUSHs(boolSV(WaREPEAT(ev)));
PUTBACK;
}
}
WKEYMETH(_watcher_suspend) {
if (nval) {
if (sv_true(nval))
pe_watcher_suspend(ev);
else
pe_watcher_resume(ev);
}
{
dSP;
XPUSHs(boolSV(WaSUSPEND(ev)));
PUTBACK;
}
}
WKEYMETH(_watcher_max_cb_tm) {
if (nval) {
int tm = SvIOK(nval)? SvIV(nval) : 0;
if (tm < 0) {
warn("e_max_cb_tm must be non-negative");
tm=0;
}
ev->max_cb_tm = tm;
}
{
dSP;
XPUSHs(sv_2mortal(newSViv(ev->max_cb_tm)));
PUTBACK;
}
}
/********************************** *******************************/
static void pe_watcher_nomethod(pe_watcher *ev, char *meth) {
HV *stash = ev->vtbl->stash;
assert(stash);
croak("%s::%s is missing", HvNAME(stash), meth);
}
static char *pe_watcher_nostart(pe_watcher *ev, int repeat)
{ pe_watcher_nomethod(ev,"start"); return 0; }
static void pe_watcher_nostop(pe_watcher *ev)
{ pe_watcher_nomethod(ev,"stop"); }
static void pe_watcher_alarm(pe_watcher *ev, pe_timeable *tm)
{ pe_watcher_nomethod(ev,"alarm"); }
static void boot_pe_watcher() {
HV *stash = gv_stashpv("Event::Watcher", 1);
struct pe_watcher_vtbl *vt;
PE_RING_INIT(&AllWatchers, 0);
vt = &pe_watcher_base_vtbl;
vt->stash = 0;
vt->did_require = 0;
vt->dtor = 0;
vt->start = pe_watcher_nostart;
vt->stop = pe_watcher_nostop;
vt->alarm = pe_watcher_alarm;
newCONSTSUB(stash, "ACTIVE", newSViv(PE_ACTIVE));
newCONSTSUB(stash, "SUSPEND", newSViv(PE_SUSPEND));
newCONSTSUB(stash, "R", newSViv(PE_R));
newCONSTSUB(stash, "W", newSViv(PE_W));
newCONSTSUB(stash, "E", newSViv(PE_E));
newCONSTSUB(stash, "T", newSViv(PE_T));
}
static void pe_register_vtbl(pe_watcher_vtbl *vt, HV *stash,
pe_event_vtbl *evt) {
vt->stash = stash;
vt->event_vtbl = evt;
vt->new_event = evt->new_event;
}
static void pe_watcher_now(pe_watcher *wa) {
pe_event *ev;
if (WaSUSPEND(wa)) return;
if (!wa->callback) {
STRLEN n_a;
croak("Event: attempt to invoke now() method with callback unset on watcher '%s'", SvPV(wa->desc,n_a));
}
WaRUNNOW_on(wa); /* race condition XXX */
ev = (*wa->vtbl->new_event)(wa);
++ev->hits;
queueEvent(ev);
}
/*******************************************************************
The following methods change the status flags. This is the only
code that should be changing these flags!
*/
static void pe_watcher_cancel(pe_watcher *wa) {
if (WaCANCELLED(wa))
return;
WaSUSPEND_off(wa);
pe_watcher_stop(wa, 1); /* peer */
WaCANCELLED_on(wa);
PE_RING_DETACH(&wa->all);
if (wa->mysv)
SvREFCNT_dec(wa->mysv); /* might destroy */
else if (WaCANDESTROY(wa))
(*wa->vtbl->dtor)(wa);
}
static void pe_watcher_suspend(pe_watcher *ev) {
STRLEN n_a;
assert(ev);
if (WaSUSPEND(ev))
return;
if (WaDEBUGx(ev) >= 4)
warn("Event: suspend '%s'\n", SvPV(ev->desc,n_a));
pe_watcher_off(ev);
pe_watcher_cancel_events(ev);
WaSUSPEND_on(ev); /* must happen nowhere else!! */
}
static void pe_watcher_resume(pe_watcher *ev) {
STRLEN n_a;
assert(ev);
if (!WaSUSPEND(ev))
return;
WaSUSPEND_off(ev);
if (WaDEBUGx(ev) >= 4)
warn("Event: resume '%s'%s\n", SvPV(ev->desc,n_a),
WaACTIVE(ev)?" ACTIVE":"");
if (WaACTIVE(ev))
pe_watcher_on(ev, 0);
}
static char *pe_watcher_on(pe_watcher *wa, int repeat) {
STRLEN n_a;
char *excuse;
if (WaPOLLING(wa) || WaSUSPEND(wa))
return 0;
if (WaCANCELLED(wa))
croak("Event: attempt to start cancelled watcher '%s'",
SvPV(wa->desc,n_a));
excuse = (*wa->vtbl->start)(wa, repeat);
if (excuse) {
if (SvIV(DebugLevel))
warn("Event: can't restart '%s' %s", SvPV(wa->desc, n_a), excuse);
pe_watcher_stop(wa, 1); /* update flags! */
} else
WaPOLLING_on(wa); /* must happen nowhere else!! */
return excuse;
}
static void pe_watcher_off(pe_watcher *wa) {
if (!WaPOLLING(wa) || WaSUSPEND(wa)) return;
(*wa->vtbl->stop)(wa);
WaPOLLING_off(wa);
}
static void pe_watcher_start(pe_watcher *ev, int repeat) {
char *excuse;
STRLEN n_a;
if (WaACTIVE(ev))
return;
if (WaDEBUGx(ev) >= 4)
warn("Event: active ON '%s'\n", SvPV(ev->desc,n_a));
excuse = pe_watcher_on(ev, repeat);
if (excuse)
croak("Event: can't start '%s' %s", SvPV(ev->desc,n_a), excuse);
WaACTIVE_on(ev); /* must happen nowhere else!! */
++ActiveWatchers;
}
static void pe_watcher_stop(pe_watcher *ev, int cancel_events) {
STRLEN n_a;
if (!WaACTIVE(ev))
return;
if (WaDEBUGx(ev) >= 4)
warn("Event: active OFF '%s'\n", SvPV(ev->desc,n_a));
pe_watcher_off(ev);
WaACTIVE_off(ev); /* must happen nowhere else!! */
if (cancel_events) pe_watcher_cancel_events(ev);
--ActiveWatchers;
}