The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#include <zmq.h>

#include <assert.h>
#include <stdlib.h>

#include "mutex.h"
#include "timers.h"


struct zmq_raw_timers
{
	zmq_raw_mutex *mutex;
	void *timers;
	void *thread;
	int running;

	zmq_raw_timer **last;
	int run_count;

	zmq_pollitem_t wakeup_item;
	void *wakeup_context;
	void *wakeup_send;
	void *wakeup_recv;
};

struct zmq_raw_timer
{
	int id;
	int running;
	int after;
	int interval;
	void *context;
	void *send;
	void *recv;
	void *recv_sv;
	zmq_raw_timers *timers;
};


static void timer_handler (int timer_id, void *arg);
static void timer_thread (void *arg);


zmq_raw_timers *zmq_raw_timers_create()
{
	int rc;
	zmq_raw_timers *timers;

	if ((timers = calloc (1, sizeof (zmq_raw_timers))) == NULL ||
		(timers->timers = zmq_timers_new()) == NULL ||
		(timers->wakeup_context = zmq_ctx_new()) == NULL ||
		(timers->wakeup_send = zmq_socket (timers->wakeup_context, ZMQ_PAIR)) == NULL ||
		(timers->wakeup_recv = zmq_socket (timers->wakeup_context, ZMQ_PAIR)) == NULL)
		goto on_error;

	if ((rc = zmq_bind (timers->wakeup_recv, "inproc://_wakeup")) < 0 ||
		(rc = zmq_connect (timers->wakeup_send, "inproc://_wakeup")) < 0)
		goto on_error;

	goto done;

on_error:
	zmq_close (timers->wakeup_recv);
	zmq_close (timers->wakeup_send);
	zmq_ctx_term (timers->wakeup_context);
	free (timers);
	return NULL;

done:
	timers->wakeup_item.events = ZMQ_POLLIN;
	timers->wakeup_item.socket = timers->wakeup_recv;
	timers->mutex = zmq_raw_mutex_create();
	return timers;
}

void zmq_raw_timers_destroy (zmq_raw_timers *timers)
{
	assert (timers);

	zmq_raw_mutex_lock (timers->mutex);
	timers->running = 0;
	zmq_send_const (timers->wakeup_send, "", 1, ZMQ_DONTWAIT);
	zmq_raw_mutex_unlock (timers->mutex);

	if (timers->thread)
		zmq_threadclose (timers->thread);
	zmq_raw_mutex_destroy (timers->mutex);

	zmq_close (timers->wakeup_send);
	zmq_close (timers->wakeup_recv);
	zmq_ctx_term (timers->wakeup_context);
	zmq_timers_destroy (&timers->timers);

	free (timers);
}

static zmq_raw_timer *zmq_raw_timer_create (void *context, int after, int interval)
{
	int rc;
	char endpoint[64];
	static const int v = 1;
	static int id = 0;
	zmq_raw_timer *timer;

	assert (context);

	sprintf (endpoint, "inproc://_timer-%d", ++id);

	if ((timer = calloc (1, sizeof (zmq_raw_timer))) == NULL ||
		(timer->send = zmq_socket (context, ZMQ_PAIR)) == NULL ||
		(timer->recv = zmq_socket (context, ZMQ_PAIR)) == NULL)
		goto on_error;

	timer->after = after;
	timer->interval = interval;

	if ((rc = zmq_bind (timer->recv, endpoint)) < 0 ||
		(rc = zmq_setsockopt (timer->recv, ZMQ_CONFLATE, &v, sizeof (v))) < 0 ||
		(rc = zmq_connect (timer->send, endpoint)) < 0)
		goto on_error;

	goto done;

on_error:
	zmq_close (timer->send);
	zmq_close (timer->recv);
	free (timer);
	return NULL;

done:
	return timer;
}

static void zmq_raw_timer_destroy (zmq_raw_timer *timer)
{
	assert (timer);
	assert (timer->send);

	zmq_close (timer->send);

	if (timer->recv && !timer->recv_sv)
		zmq_close (timer->recv);

	free (timer);
}

static void zmq_raw_timers__start (zmq_raw_timer *timer)
{
	zmq_raw_timers *timers;
	assert (timer);
	assert (timer->timers);

	timers = timer->timers;

	timer->id = zmq_timers_add (timers->timers, timer->after, timer_handler, timer);
	timer->running = 1;

	if (!timers->running)
	{
		/* start the timer thread */
		timers->running = 1;
		timers->thread = zmq_threadstart (timer_thread, timers);
	}
	else
	{
		/* wakeup the timer thread */
		zmq_send_const (timers->wakeup_send, "", 1, ZMQ_DONTWAIT);
	}
}

zmq_raw_timer *zmq_raw_timers_start (zmq_raw_timers *timers, void *context, int after, int interval)
{
	int rc;
	zmq_raw_timer *timer;

	assert (timers);
	assert (context);

	zmq_raw_mutex_lock (timers->mutex);

	timer = zmq_raw_timer_create (context, after, interval);
	if (timer == NULL)
	{
		zmq_raw_mutex_unlock (timers->mutex);
		return NULL;
	}

	timer->timers = timers;
	zmq_raw_timers__start (timer);

	zmq_raw_mutex_unlock (timers->mutex);

	return timer;
}

void zmq_raw_timers_reset (zmq_raw_timer *timer)
{
	assert (timer);

	zmq_raw_mutex_lock (timer->timers->mutex);
	if (timer->running)
		zmq_timers_reset (timer->timers->timers, timer->id);
	else
		zmq_raw_timers__start (timer);

	while (zmq_recv (timer->recv, NULL, 0, ZMQ_DONTWAIT) == 0);
	zmq_raw_mutex_unlock (timer->timers->mutex);
}

static void zmq_raw_timers__stop (zmq_raw_timer *timer)
{
	assert (timer);

	if (timer->running)
	{
		timer->running = 0;
		zmq_timers_cancel (timer->timers->timers, timer->id);
	}
}

static void zmq_raw_timers__expire (zmq_raw_timer *timer)
{
	if (timer->running)
	{
		zmq_raw_timers__stop (timer);

		timer->timers->last = NULL;
		timer->timers->run_count = 0;

		timer_handler (timer->id, timer);

		free (timer->timers->last);
	}
}

void zmq_raw_timers_expire (zmq_raw_timer *timer)
{
	assert (timer);

	zmq_raw_mutex_lock (timer->timers->mutex);
	zmq_raw_timers__expire (timer);
	zmq_raw_mutex_unlock (timer->timers->mutex);
}

void zmq_raw_timers_stop (zmq_raw_timer *timer)
{
	assert (timer);

	zmq_raw_mutex_lock (timer->timers->mutex);
	zmq_raw_timers__stop (timer);
	zmq_raw_mutex_unlock (timer->timers->mutex);
}

void zmq_raw_timers_remove (zmq_raw_timer *timer)
{
	assert (timer);

	zmq_raw_timers_stop (timer);
	zmq_raw_timer_destroy (timer);
}

int zmq_raw_timer_id (zmq_raw_timer *timer)
{
	assert (timer);
	return timer->id;
}

void *zmq_raw_timer_get_recv (zmq_raw_timer *timer)
{
	assert (timer);
	return timer->recv;
}

int zmq_raw_timer_is_running (zmq_raw_timer *timer)
{
	assert (timer);
	return timer->running;
}

void zmq_raw_timer_set_sv (zmq_raw_timer *timer, void *sv)
{
	assert (timer);
	assert (sv);
	assert (timer->recv_sv == NULL);

	timer->recv_sv = sv;
}

void *zmq_raw_timer_get_sv (zmq_raw_timer *timer)
{
	return timer->recv_sv;
}


void timer_thread (void *arg)
{
	int count = 0, running = 1;
	long timeout;
	zmq_raw_timers *timers = (zmq_raw_timers *)arg;

	while (running)
	{
		zmq_raw_mutex_lock (timers->mutex);

		/* clear any 'pending' wakeup signals */
		while (zmq_recv (timers->wakeup_recv, NULL, 0, ZMQ_DONTWAIT) == 0);

		timers->last = NULL;
		timers->run_count = 0;
		zmq_timers_execute (timers->timers);

		while (--timers->run_count >= 0)
		{
			int index = timers->run_count;
			zmq_raw_timer *timer = timers->last[index];

			if (timer->interval == 0)
				zmq_raw_timers__stop (timer);
			else
				zmq_timers_set_interval (timers->timers, timer->id,
					(size_t)timer->interval);
		}

		if (timers->last)
			free (timers->last);

		running = timers->running;
		timeout = zmq_timers_timeout (timers->timers);
		zmq_raw_mutex_unlock (timers->mutex);

		/* sleep for 'timeout'. this may be interrupted
		 * by adding a new timer*/
		if (running)
			zmq_poll (&timers->wakeup_item, 1, timeout);
	}
}

void timer_handler (int timer_id, void *arg)
{
	assert (arg);

	/* this is guaranteed to execute with the timers mutex locked */
	zmq_raw_timer *timer = (zmq_raw_timer *)arg;

	assert (timer->id == timer_id);

	zmq_send_const (timer->send, "", 1, ZMQ_DONTWAIT);

	zmq_raw_timers *timers = timer->timers;
	int index = timers->run_count++;

	if (index == 0)
		timers->last = calloc (1, sizeof (zmq_raw_timer *));
	else
		timers->last = realloc (timers->last, timers->run_count*sizeof (zmq_raw_timer *));

	assert (timers->last);
	timers->last[index] = timer;
}