/*
* libetp implementation
*
* Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modifica-
* tion, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
* CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
* CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
* ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Alternatively, the contents of this file may be used under the terms of
* the GNU General Public License ("GPL") version 2 or any later version,
* in which case the provisions of the GPL are applicable instead of
* the above. If you wish to allow the use of your version of this file
* only under the terms of the GPL and not to allow others to use your
* version of this file under the BSD license, indicate your decision
* by deleting the provisions above and replace them with the notice
* and other provisions required by the GPL. If you do not delete the
* provisions above, a recipient may use your version of this file under
* either the BSD or the GPL.
*/
#ifndef ETP_API_DECL
# define ETP_API_DECL static
#endif
#ifndef ETP_PRI_MIN
# define ETP_PRI_MIN 0
# define ETP_PRI_MAX 0
#endif
#ifndef ETP_TYPE_QUIT
# define ETP_TYPE_QUIT 0
#endif
#ifndef ETP_TYPE_GROUP
# define ETP_TYPE_GROUP 1
#endif
#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
#define ETP_TICKS ((1000000 + 1023) >> 10)
/* calculate time difference in ~1/ETP_TICKS of a second */
ecb_inline int
etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
{
return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
+ ((tv2->tv_usec - tv1->tv_usec) >> 10);
}
static unsigned int started, idle, wanted = 4;
static void (*want_poll_cb) (void);
static void (*done_poll_cb) (void);
static unsigned int max_poll_time; /* reslock */
static unsigned int max_poll_reqs; /* reslock */
static unsigned int nreqs; /* reqlock */
static unsigned int nready; /* reqlock */
static unsigned int npending; /* reqlock */
static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
static xmutex_t wrklock;
static xmutex_t reslock;
static xmutex_t reqlock;
static xcond_t reqwait;
typedef struct etp_worker
{
struct tmpbuf tmpbuf;
/* locked by wrklock */
struct etp_worker *prev, *next;
xthread_t tid;
#ifdef ETP_WORKER_COMMON
ETP_WORKER_COMMON
#endif
} etp_worker;
static etp_worker wrk_first; /* NOT etp */
#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
/* worker threads management */
static void
etp_worker_clear (etp_worker *wrk)
{
}
static void ecb_cold
etp_worker_free (etp_worker *wrk)
{
free (wrk->tmpbuf.ptr);
wrk->next->prev = wrk->prev;
wrk->prev->next = wrk->next;
free (wrk);
}
ETP_API_DECL unsigned int
etp_nreqs (void)
{
int retval;
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
retval = nreqs;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
return retval;
}
ETP_API_DECL unsigned int
etp_nready (void)
{
unsigned int retval;
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
retval = nready;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
return retval;
}
ETP_API_DECL unsigned int
etp_npending (void)
{
unsigned int retval;
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
retval = npending;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
return retval;
}
ETP_API_DECL unsigned int
etp_nthreads (void)
{
unsigned int retval;
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
retval = started;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
return retval;
}
/*
* a somewhat faster data structure might be nice, but
* with 8 priorities this actually needs <20 insns
* per shift, the most expensive operation.
*/
typedef struct {
ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
int size;
} etp_reqq;
static etp_reqq req_queue;
static etp_reqq res_queue;
static void ecb_noinline ecb_cold
reqq_init (etp_reqq *q)
{
int pri;
for (pri = 0; pri < ETP_NUM_PRI; ++pri)
q->qs[pri] = q->qe[pri] = 0;
q->size = 0;
}
static int ecb_noinline
reqq_push (etp_reqq *q, ETP_REQ *req)
{
int pri = req->pri;
req->next = 0;
if (q->qe[pri])
{
q->qe[pri]->next = req;
q->qe[pri] = req;
}
else
q->qe[pri] = q->qs[pri] = req;
return q->size++;
}
static ETP_REQ * ecb_noinline
reqq_shift (etp_reqq *q)
{
int pri;
if (!q->size)
return 0;
--q->size;
for (pri = ETP_NUM_PRI; pri--; )
{
eio_req *req = q->qs[pri];
if (req)
{
if (!(q->qs[pri] = (eio_req *)req->next))
q->qe[pri] = 0;
return req;
}
}
abort ();
}
ETP_API_DECL int ecb_cold
etp_init (void (*want_poll)(void), void (*done_poll)(void))
{
X_MUTEX_CREATE (wrklock);
X_MUTEX_CREATE (reslock);
X_MUTEX_CREATE (reqlock);
X_COND_CREATE (reqwait);
reqq_init (&req_queue);
reqq_init (&res_queue);
wrk_first.next =
wrk_first.prev = &wrk_first;
started = 0;
idle = 0;
nreqs = 0;
nready = 0;
npending = 0;
want_poll_cb = want_poll;
done_poll_cb = done_poll;
return 0;
}
/* not yet in etp.c */
X_THREAD_PROC (etp_proc);
static void ecb_cold
etp_start_thread (void)
{
etp_worker *wrk = calloc (1, sizeof (etp_worker));
/*TODO*/
assert (("unable to allocate worker thread data", wrk));
X_LOCK (wrklock);
if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
{
wrk->prev = &wrk_first;
wrk->next = wrk_first.next;
wrk_first.next->prev = wrk;
wrk_first.next = wrk;
++started;
}
else
free (wrk);
X_UNLOCK (wrklock);
}
static void
etp_maybe_start_thread (void)
{
if (ecb_expect_true (etp_nthreads () >= wanted))
return;
/* todo: maybe use idle here, but might be less exact */
if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ()))
return;
etp_start_thread ();
}
static void ecb_cold
etp_end_thread (void)
{
eio_req *req = calloc (1, sizeof (eio_req)); /* will be freed by worker */
req->type = ETP_TYPE_QUIT;
req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
X_LOCK (reqlock);
reqq_push (&req_queue, req);
X_COND_SIGNAL (reqwait);
X_UNLOCK (reqlock);
X_LOCK (wrklock);
--started;
X_UNLOCK (wrklock);
}
ETP_API_DECL int
etp_poll (void)
{
unsigned int maxreqs;
unsigned int maxtime;
struct timeval tv_start, tv_now;
X_LOCK (reslock);
maxreqs = max_poll_reqs;
maxtime = max_poll_time;
X_UNLOCK (reslock);
if (maxtime)
gettimeofday (&tv_start, 0);
for (;;)
{
ETP_REQ *req;
etp_maybe_start_thread ();
X_LOCK (reslock);
req = reqq_shift (&res_queue);
if (req)
{
--npending;
if (!res_queue.size && done_poll_cb)
done_poll_cb ();
}
X_UNLOCK (reslock);
if (!req)
return 0;
X_LOCK (reqlock);
--nreqs;
X_UNLOCK (reqlock);
if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
{
req->int1 = 1; /* mark request as delayed */
continue;
}
else
{
int res = ETP_FINISH (req);
if (ecb_expect_false (res))
return res;
}
if (ecb_expect_false (maxreqs && !--maxreqs))
break;
if (maxtime)
{
gettimeofday (&tv_now, 0);
if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
break;
}
}
errno = EAGAIN;
return -1;
}
ETP_API_DECL void
etp_grp_cancel (ETP_REQ *grp);
ETP_API_DECL void
etp_cancel (ETP_REQ *req)
{
req->cancelled = 1;
etp_grp_cancel (req);
}
ETP_API_DECL void
etp_grp_cancel (ETP_REQ *grp)
{
for (grp = grp->grp_first; grp; grp = grp->grp_next)
etp_cancel (grp);
}
ETP_API_DECL void
etp_submit (ETP_REQ *req)
{
req->pri -= ETP_PRI_MIN;
if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
{
/* I hope this is worth it :/ */
X_LOCK (reqlock);
++nreqs;
X_UNLOCK (reqlock);
X_LOCK (reslock);
++npending;
if (!reqq_push (&res_queue, req) && want_poll_cb)
want_poll_cb ();
X_UNLOCK (reslock);
}
else
{
X_LOCK (reqlock);
++nreqs;
++nready;
reqq_push (&req_queue, req);
X_COND_SIGNAL (reqwait);
X_UNLOCK (reqlock);
etp_maybe_start_thread ();
}
}
ETP_API_DECL void ecb_cold
etp_set_max_poll_time (double nseconds)
{
if (WORDACCESS_UNSAFE) X_LOCK (reslock);
max_poll_time = nseconds * ETP_TICKS;
if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
}
ETP_API_DECL void ecb_cold
etp_set_max_poll_reqs (unsigned int maxreqs)
{
if (WORDACCESS_UNSAFE) X_LOCK (reslock);
max_poll_reqs = maxreqs;
if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
}
ETP_API_DECL void ecb_cold
etp_set_max_idle (unsigned int nthreads)
{
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
max_idle = nthreads;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
}
ETP_API_DECL void ecb_cold
etp_set_idle_timeout (unsigned int seconds)
{
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
idle_timeout = seconds;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
}
ETP_API_DECL void ecb_cold
etp_set_min_parallel (unsigned int nthreads)
{
if (wanted < nthreads)
wanted = nthreads;
}
ETP_API_DECL void ecb_cold
etp_set_max_parallel (unsigned int nthreads)
{
if (wanted > nthreads)
wanted = nthreads;
while (started > wanted)
etp_end_thread ();
}