The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to
 * deal in the Software without restriction, including without limitation the
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 * sell copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 * IN THE SOFTWARE.
 */

#include "internal.h"

#include <errno.h>
#include <pthread.h>

/* TODO add condvar support to libuv */
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static pthread_once_t once = PTHREAD_ONCE_INIT;
static pthread_t threads[4];
static ngx_queue_t exit_message;
static ngx_queue_t wq = { &wq, &wq };


static void* worker(void* arg) {
  struct uv__work* w;
  ngx_queue_t* q;

  (void) arg;

  for (;;) {
    if (pthread_mutex_lock(&mutex))
      abort();

    while (ngx_queue_empty(&wq))
      if (pthread_cond_wait(&cond, &mutex))
        abort();

    q = ngx_queue_head(&wq);

    if (q == &exit_message)
      pthread_cond_signal(&cond);
    else
      ngx_queue_remove(q);

    if (pthread_mutex_unlock(&mutex))
      abort();

    if (q == &exit_message)
      break;

    w = ngx_queue_data(q, struct uv__work, wq);
    w->work(w);

    uv_mutex_lock(&w->loop->wq_mutex);
    ngx_queue_insert_tail(&w->loop->wq, &w->wq);
    uv_mutex_unlock(&w->loop->wq_mutex);
    uv_async_send(&w->loop->wq_async);
  }

  return NULL;
}


static void post(ngx_queue_t* q) {
  pthread_mutex_lock(&mutex);
  ngx_queue_insert_tail(&wq, q);
  pthread_cond_signal(&cond);
  pthread_mutex_unlock(&mutex);
}


static void init_once(void) {
  unsigned int i;

  ngx_queue_init(&wq);

  for (i = 0; i < ARRAY_SIZE(threads); i++)
    if (pthread_create(threads + i, NULL, worker, NULL))
      abort();
}


__attribute__((destructor))
static void cleanup(void) {
  unsigned int i;
  int err;

  post(&exit_message);

  for (i = 0; i < ARRAY_SIZE(threads); i++) {
    err = pthread_join(threads[i], NULL);

    if (err == 0 || err == ESRCH)
      continue;

    abort();
  }
}


void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     void (*work)(struct uv__work* w),
                     void (*done)(struct uv__work* w)) {
  pthread_once(&once, init_once);
  w->loop = loop;
  w->work = work;
  w->done = done;
  post(&w->wq);
}


void uv__work_done(uv_async_t* handle, int status) {
  struct uv__work* w;
  uv_loop_t* loop;
  ngx_queue_t* q;
  ngx_queue_t wq;

  loop = container_of(handle, uv_loop_t, wq_async);
  ngx_queue_init(&wq);

  uv_mutex_lock(&loop->wq_mutex);
  if (!ngx_queue_empty(&loop->wq)) {
    q = ngx_queue_head(&loop->wq);
    ngx_queue_split(&loop->wq, q, &wq);
  }
  uv_mutex_unlock(&loop->wq_mutex);

  while (!ngx_queue_empty(&wq)) {
    q = ngx_queue_head(&wq);
    ngx_queue_remove(q);

    w = container_of(q, struct uv__work, wq);
    w->done(w);
  }
}


static void uv__queue_work(struct uv__work* w) {
  uv_work_t* req = container_of(w, uv_work_t, work_req);

  if (req->work_cb)
    req->work_cb(req);
}


static void uv__queue_done(struct uv__work* w) {
  uv_work_t* req = container_of(w, uv_work_t, work_req);

  uv__req_unregister(req->loop, req);

  if (req->after_work_cb)
    req->after_work_cb(req);
}


int uv_queue_work(uv_loop_t* loop,
                  uv_work_t* req,
                  uv_work_cb work_cb,
                  uv_after_work_cb after_work_cb) {
  uv__req_init(loop, req, UV_WORK);
  req->loop = loop;
  req->work_cb = work_cb;
  req->after_work_cb = after_work_cb;
  uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done);
  return 0;
}