The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package ZMQ::Raw::Loop;
$ZMQ::Raw::Loop::VERSION = '0.22';
use strict;
use warnings;
use Carp;

sub CLONE_SKIP { 1 }

my @attributes;

BEGIN
{
	@attributes = qw/
		context
		poller
		timers
		handles
		promises
		events
		terminated

		tevent
	/;

	no strict 'refs';
	foreach my $accessor (@attributes)
	{
		*{$accessor} = sub
		{
			@_ > 1 ? $_[0]->{$accessor} = $_[1] : $_[0]->{$accessor}
		};
	}
}

use ZMQ::Raw;
use ZMQ::Raw::Loop::Event;
use ZMQ::Raw::Loop::Handle;
use ZMQ::Raw::Loop::Promise;
use ZMQ::Raw::Loop::Timer;

=head1 NAME

ZMQ::Raw::Loop - Loop class

=head1 VERSION

version 0.22

=head1 DESCRIPTION

A L<ZMQ::Raw::Loop> represents an event loop.

B<WARNING>: The API of this module is unstable and may change without warning
(any change will be appropriately documented in the changelog).

=head1 METHODS

=head2 new( $context )

Create a new event loop

=head2 run( )

Run the event loop

=head2 run_one( )

Run until a single event occurs

=head2 add( $item )

Add C<$item> to the event loop. C<$item> should be a L<C<ZMQ::Raw::Loop::Event>>,
L<C<ZMQ::Raw::Loop::Handle>>, L<C<ZMQ::Raw::Loop::Timer>> or
L<C<ZMQ::Raw::Loop::Promise>>.

=head2 remove( $item )

Remove C<$item> from the event loop.

=head2 terminate( )

Terminate the event loop

=cut

sub new
{
	my ($this, $context) = @_;

	my $class = ref ($this) || $this;
	my $self =
	{
		context => $context,
		poller => ZMQ::Raw::Poller->new,
		timers => [],
		handles => [],
		events => [],
		promises => [],
	};

	my $obj = bless $self, $class;
	$obj->tevent (ZMQ::Raw::Loop::Event->new ($context,
			on_set => sub
			{
				$obj->terminated (1);
			}
		)
	);
	return $obj;
}



sub run
{
	my ($this) = @_;

	$this->terminated (0);
	$this->tevent->reset();
	$this->add ($this->tevent);

	while (!$this->terminated && $this->poller->size > 1)
	{
		$this->run_one;
	}

	$this->remove ($this->tevent);

	$this->_cancel_timers();
	$this->_cancel_events();
	$this->_cancel_handles();
	$this->_clear_promises();
}



sub run_one
{
	my ($this) = @_;

	if ($this->poller->size)
	{
		my $count = $this->poller->wait (-1);
		if ($count)
		{
			$this->_dispatch_events() || $this->_dispatch_handles() || $this->_dispatch_timers();
			$this->promises ([grep { $_->status == ZMQ::Raw::Loop::Promise->PLANNED } @{$this->promises}]);
		}

		return 1;
	}

	return 0;
}



sub add
{
	my ($this, $item) = @_;

	if (ref ($item) eq 'ZMQ::Raw::Loop::Timer')
	{
		$this->_add_timer ($item);
	}
	elsif (ref ($item) eq 'ZMQ::Raw::Loop::Handle')
	{
		$this->_add_handle ($item);
	}
	elsif (ref ($item) eq 'ZMQ::Raw::Loop::Event')
	{
		$this->_add_event ($item);
	}
	elsif (ref ($item) eq 'ZMQ::Raw::Loop::Promise')
	{
		$this->_add_promise ($item);
	}
	else
	{
		croak "don't know how to add $item";
	}
}



sub _add_timer
{
	my ($this, $timer) = @_;

	$timer->loop ($this);
	$this->poller->add ($timer->timer->socket, ZMQ::Raw->ZMQ_POLLIN);

	if (!$timer->running)
	{
		$timer->reset();
	}

	push @{$this->timers}, $timer;
}



sub _add_event
{
	my ($this, $event) = @_;

	$event->loop ($this);
	$this->poller->add ($event->read_handle, ZMQ::Raw->ZMQ_POLLIN);

	if ($event->timeout)
	{
		$event->timer (ZMQ::Raw::Timer->new ($this->context,
			after => $event->timeout)
		);
		$this->poller->add ($event->timer->socket, ZMQ::Raw->ZMQ_POLLIN);
	}

	push @{$this->events}, $event;
}



sub _add_promise
{
	my ($this, $promise) = @_;

	push @{$this->promises}, $promise;
}



sub _add_handle
{
	my ($this, $handle) = @_;

	my $events = 0;
	if ($handle->on_readable)
	{
		$events |= ZMQ::Raw->ZMQ_POLLIN;
	}
	if ($handle->on_writable)
	{
		$events |= ZMQ::Raw->ZMQ_POLLOUT;
	}
	if ($handle->timeout)
	{
		$handle->timer (ZMQ::Raw::Timer->new ($this->context,
			after => $handle->timeout)
		);
		$this->poller->add ($handle->timer->socket, ZMQ::Raw->ZMQ_POLLIN);
	}

	$handle->loop ($this);
	$this->poller->add ($handle->handle, $events);

	push @{$this->handles}, $handle;
}



sub remove
{
	my ($this, $item) = @_;

	if (ref ($item) eq 'ZMQ::Raw::Loop::Timer')
	{
		$this->_remove_timer ($item);
	}
	elsif (ref ($item) eq 'ZMQ::Raw::Loop::Handle')
	{
		$this->_remove_handle ($item);
	}
	elsif (ref ($item) eq 'ZMQ::Raw::Loop::Event')
	{
		$this->_remove_event ($item);
	}
	else
	{
		croak "don't know how to remove $item";
	}
}



sub _remove_timer
{
	my ($this, $timer) = @_;

	my @left;
	foreach my $t (@{$this->timers})
	{
		if ($timer == $t)
		{
			my $socket = $timer->timer->socket;
			$socket->recv (ZMQ::Raw->ZMQ_DONTWAIT);
			$this->poller->remove ($socket);
			next;
		}

		push @left, $t;
	}

	$this->timers (\@left);
}



sub _remove_handle
{
	my ($this, $handle) = @_;

	my @left;
	foreach my $h (@{$this->handles})
	{
		if ($h == $handle)
		{
			$this->poller->remove ($handle->handle);

			my $timer = $handle->timer;
			if ($timer)
			{
				$this->poller->remove ($timer->socket);
				$timer->cancel();
			}

			next;
		}

		push @left, $h;
	}

	$this->handles (\@left);
}



sub _remove_event
{
	my ($this, $event) = @_;

	my @left;
	foreach my $e (@{$this->events})
	{
		if ($e == $event)
		{
			$this->poller->remove ($event->read_handle);

			my $timer = $event->timer;
			if ($timer)
			{
				$this->poller->remove ($timer->socket);
				$timer->cancel();
			}

			next;
		}

		push @left, $e;
	}

	$this->events (\@left);
}



sub _dispatch_handles
{
	my ($this) = @_;

	foreach my $handle (@{$this->handles})
	{
		my $events = $this->poller->events ($handle->handle);
		if ($events)
		{
			$this->_remove_handle ($handle);

			if ($events & ZMQ::Raw->ZMQ_POLLIN)
			{
				my $readable = $handle->on_readable;
				&{$readable} ($handle) if $readable;
			}
			elsif ($events & ZMQ::Raw->ZMQ_POLLOUT)
			{
				my $writable = $handle->on_writable;
				&{$writable} ($handle) if $writable;
			}

			return 1;
		}

		if ($handle->timer)
		{
			my $events = $this->poller->events ($handle->timer->socket);
			if ($events)
			{
				$this->_remove_handle ($handle);

				my $timeout = $handle->on_timeout;
				&{$timeout} ($handle) if $timeout;

				return 1;
			}
		}
	}

	return 0;
}



sub _dispatch_events
{
	my ($this) = @_;

	foreach my $event (@{$this->events})
	{
		my $events = $this->poller->events ($event->read_handle);
		if ($events)
		{
			$event->reset();
			$this->_remove_event ($event);

			my $set = $event->on_set;
			&{$set} ($event) if $set;
			return 1;
		}

		if ($event->timer)
		{
			my $events = $this->poller->events ($event->timer->socket);
			if ($events)
			{
				$event->reset();
				$this->_remove_event ($event);

				my $timeout = $event->on_timeout;
				&{$timeout} ($event) if $timeout;

				return 1;
			}
		}
	}

	return 0;
}



sub _dispatch_timers
{
	my ($this) = @_;

	foreach my $timer (@{$this->timers})
	{
		my $socket = $timer->timer->socket;
		my $events = $this->poller->events ($socket);
		if ($events)
		{
			$this->_remove_timer ($timer);

			my $timeout = $timer->on_timeout;
			&{$timeout} ($timer) if ($timeout);

			if ($timer->timer->running())
			{
				$this->_add_timer ($timer);
			}

			return 1;
		}
	}

	return 0;
}



sub _cancel_timers
{
	my ($this) = @_;

AGAIN:
	foreach my $timer (@{$this->timers})
	{
		$timer->cancel();
		goto AGAIN;
	}
}



sub _cancel_events
{
	my ($this) = @_;

	foreach my $event (@{$this->events})
	{
		my $events = $this->poller->events ($event->read_handle);
		$this->poller->remove ($event->read_handle);

		if ($event->timer)
		{
			$event->timer->cancel();
			$this->poller->remove ($event->timer->socket);
		}
	}

	$this->events ([]);
}



sub _cancel_handles
{
	my ($this) = @_;

	foreach my $handle (@{$this->handles})
	{
		$this->poller->remove ($handle->handle);

		if ($handle->timer)
		{
			$handle->timer->cancel();
			$this->poller->remove ($handle->timer->socket);
		}
	}

	$this->handles ([]);
}



sub _clear_promises
{
	my ($this) = @_;

	$this->promises ([]);
}



sub terminate
{
	my ($this) = @_;

	$this->tevent->set;
}

=for Pod::Coverage context handles events poller timers promises terminated tevent

=head1 AUTHOR

Jacques Germishuys <jacquesg@striata.com>

=head1 LICENSE AND COPYRIGHT

Copyright 2017 Jacques Germishuys.

This program is free software; you can redistribute it and/or modify it
under the terms of either: the GNU General Public License as published
by the Free Software Foundation; or the Artistic License.

See http://dev.perl.org/licenses/ for more information.

=cut

1; # End of ZMQ::Raw::Loop