The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Mojo::IOLoop;
use Mojo::Base -base;

# "Professor: Amy, technology isn't intrinsically good or evil. It's how it's
#             used. Like the death ray."
use Carp 'croak';
use Mojo::IOLoop::Client;
use Mojo::IOLoop::Delay;
use Mojo::IOLoop::Server;
use Mojo::IOLoop::Stream;
use Mojo::Reactor::Poll;
use Mojo::Util qw(md5_sum steady_time);
use Scalar::Util qw(blessed weaken);

use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;

has accept_interval => 0.025;
has [qw(lock unlock)];
has max_accepts     => 0;
has max_connections => 1000;
has multi_accept    => 50;
has reactor         => sub {
  my $class = Mojo::Reactor::Poll->detect;
  warn "-- Reactor initialized ($class)\n" if DEBUG;
  return $class->new->catch(sub { warn "@{[blessed $_[0]]}: $_[1]" });
};

# Ignore PIPE signal
$SIG{PIPE} = 'IGNORE';

# Initialize singleton reactor early
__PACKAGE__->singleton->reactor;

sub acceptor {
  my ($self, $acceptor) = (_instance(shift), @_);

  # Find acceptor for id
  return $self->{acceptors}{$acceptor} unless ref $acceptor;

  # Connect acceptor with reactor
  my $id = $self->_id;
  $self->{acceptors}{$id} = $acceptor;
  weaken $acceptor->reactor($self->reactor)->{reactor};
  $self->{accepts} = $self->max_accepts if $self->max_accepts;

  # Allow new acceptor to get picked up
  $self->_not_accepting;

  return $id;
}

sub client {
  my ($self, $cb) = (_instance(shift), pop);

  # Make sure timers are running
  $self->_recurring;

  my $id = $self->_id;
  my $client = $self->{connections}{$id}{client} = Mojo::IOLoop::Client->new;
  weaken $client->reactor($self->reactor)->{reactor};

  weaken $self;
  $client->on(
    connect => sub {
      delete $self->{connections}{$id}{client};
      my $stream = Mojo::IOLoop::Stream->new(pop);
      $self->_stream($stream => $id);
      $self->$cb(undef, $stream);
    }
  );
  $client->on(
    error => sub {
      $self->_remove($id);
      $self->$cb(pop, undef);
    }
  );
  $client->connect(@_);

  return $id;
}

sub delay {
  my $delay = Mojo::IOLoop::Delay->new;
  weaken $delay->ioloop(_instance(shift))->{ioloop};
  return @_ ? $delay->steps(@_) : $delay;
}

sub is_running { _instance(shift)->reactor->is_running }
sub next_tick  { _instance(shift)->reactor->next_tick(@_) }
sub one_tick   { _instance(shift)->reactor->one_tick }

sub recurring { shift->_timer(recurring => @_) }

sub remove {
  my ($self, $id) = (_instance(shift), @_);
  my $c = $self->{connections}{$id};
  if ($c && (my $stream = $c->{stream})) { return $stream->close_gracefully }
  $self->_remove($id);
}

sub reset {
  my $self = _instance(shift);
  $self->_remove($_)
    for keys %{$self->{acceptors}}, keys %{$self->{connections}};
  $self->reactor->reset;
  $self->$_ for qw(_stop stop);
}

sub server {
  my ($self, $cb) = (_instance(shift), pop);

  my $server = Mojo::IOLoop::Server->new;
  weaken $self;
  $server->on(
    accept => sub {
      my $stream = Mojo::IOLoop::Stream->new(pop);
      $self->$cb($stream, $self->stream($stream));
    }
  );
  $server->listen(@_);

  return $self->acceptor($server);
}

sub singleton { state $loop = shift->SUPER::new }

sub start {
  my $self = shift;
  croak 'Mojo::IOLoop already running' if $self->is_running;
  _instance($self)->reactor->start;
}

sub stop { _instance(shift)->reactor->stop }

sub stream {
  my ($self, $stream) = (_instance(shift), @_);

  # Find stream for id
  return ($self->{connections}{$stream} || {})->{stream} unless ref $stream;

  # Release accept mutex
  $self->_not_accepting;

  # Enforce connection limit (randomize to improve load balancing)
  $self->max_connections(0)
    if defined $self->{accepts} && ($self->{accepts} -= int(rand 2) + 1) <= 0;

  return $self->_stream($stream, $self->_id);
}

sub timer { shift->_timer(timer => @_) }

sub _accepting {
  my $self = shift;

  # Check if we have acceptors
  my $acceptors = $self->{acceptors} ||= {};
  return $self->_remove(delete $self->{accept}) unless keys %$acceptors;

  # Check connection limit
  my $i   = keys %{$self->{connections}};
  my $max = $self->max_connections;
  return unless $i < $max;

  # Acquire accept mutex
  if (my $cb = $self->lock) { return unless $cb->(!$i) }
  $self->_remove(delete $self->{accept});

  # Check if multi-accept is desirable
  my $multi = $self->multi_accept;
  $_->multi_accept($max < $multi ? 1 : $multi)->start for values %$acceptors;
  $self->{accepting}++;
}

sub _id {
  my $self = shift;
  my $id;
  do { $id = md5_sum('c' . steady_time . rand 999) }
    while $self->{connections}{$id} || $self->{acceptors}{$id};
  return $id;
}

sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton }

sub _not_accepting {
  my $self = shift;

  # Make sure timers are running
  $self->_recurring;

  # Release accept mutex
  return unless delete $self->{accepting};
  return unless my $cb = $self->unlock;
  $cb->();

  $_->stop for values %{$self->{acceptors} || {}};
}

sub _recurring {
  my $self = shift;
  $self->{accept} ||= $self->recurring($self->accept_interval => \&_accepting);
  $self->{stop} ||= $self->recurring(1 => \&_stop);
}

sub _remove {
  my ($self, $id) = @_;

  # Timer
  return unless my $reactor = $self->reactor;
  return if $reactor->remove($id);

  # Acceptor
  if (delete $self->{acceptors}{$id}) { $self->_not_accepting }

  # Connection
  else { delete $self->{connections}{$id} }
}

sub _stop {
  my $self = shift;
  return      if keys %{$self->{connections}};
  $self->stop if $self->max_connections == 0;
  return      if keys %{$self->{acceptors}};
  $self->{$_} && $self->_remove(delete $self->{$_}) for qw(accept stop);
}

sub _stream {
  my ($self, $stream, $id) = @_;

  # Make sure timers are running
  $self->_recurring;

  # Connect stream with reactor
  $self->{connections}{$id}{stream} = $stream;
  weaken $stream->reactor($self->reactor)->{reactor};
  weaken $self;
  $stream->on(close => sub { $self && $self->_remove($id) });
  $stream->start;

  return $id;
}

sub _timer {
  my ($self, $method, $after, $cb) = (_instance(shift), @_);
  weaken $self;
  return $self->reactor->$method($after => sub { $self->$cb });
}

1;

=encoding utf8

=head1 NAME

Mojo::IOLoop - Minimalistic event loop

=head1 SYNOPSIS

  use Mojo::IOLoop;

  # Listen on port 3000
  Mojo::IOLoop->server({port => 3000} => sub {
    my ($loop, $stream) = @_;

    $stream->on(read => sub {
      my ($stream, $bytes) = @_;

      # Process input chunk
      say $bytes;

      # Write response
      $stream->write('HTTP/1.1 200 OK');
    });
  });

  # Connect to port 3000
  my $id = Mojo::IOLoop->client({port => 3000} => sub {
    my ($loop, $err, $stream) = @_;

    $stream->on(read => sub {
      my ($stream, $bytes) = @_;

      # Process input
      say "Input: $bytes";
    });

    # Write request
    $stream->write("GET / HTTP/1.1\x0d\x0a\x0d\x0a");
  });

  # Add a timer
  Mojo::IOLoop->timer(5 => sub {
    my $loop = shift;
    $loop->remove($id);
  });

  # Start event loop if necessary
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head1 DESCRIPTION

L<Mojo::IOLoop> is a very minimalistic event loop based on L<Mojo::Reactor>,
it has been reduced to the absolute minimal feature set required to build
solid and scalable non-blocking TCP clients and servers.

Depending on operating system, the default per-process and system-wide file
descriptor limits are often very low and need to be tuned for better
scalability. The C<LIBEV_FLAGS> environment variable should also be used to
select the best possible L<EV> backend, which usually defaults to the not very
scalable C<select>.

  LIBEV_FLAGS=1   # select
  LIBEV_FLAGS=2   # poll
  LIBEV_FLAGS=4   # epoll (Linux)
  LIBEV_FLAGS=8   # kqueue (*BSD, OS X)

The event loop will be resilient to time jumps if a monotonic clock is
available through L<Time::HiRes>. A TLS certificate and key are also built
right in, to make writing test servers as easy as possible. Also note that for
convenience the C<PIPE> signal will be set to C<IGNORE> when L<Mojo::IOLoop>
is loaded.

For better scalability (epoll, kqueue) and to provide IPv6, SOCKS5 as well as
TLS support, the optional modules L<EV> (4.0+), L<IO::Socket::IP> (0.20+),
L<IO::Socket::Socks> (0.64+) and L<IO::Socket::SSL> (1.84+) will be used
automatically if they are installed. Individual features can also be disabled
with the C<MOJO_NO_IPV6>, C<MOJO_NO_SOCKS> and C<MOJO_NO_TLS> environment
variables.

See L<Mojolicious::Guides::Cookbook/"REAL-TIME WEB"> for more.

=head1 ATTRIBUTES

L<Mojo::IOLoop> implements the following attributes.

=head2 accept_interval

  my $interval = $loop->accept_interval;
  $loop        = $loop->accept_interval(0.5);

Interval in seconds for trying to reacquire the accept mutex, defaults to
C<0.025>. Note that changing this value can affect performance and idle CPU
usage.

=head2 lock

  my $cb = $loop->lock;
  $loop  = $loop->lock(sub {...});

A callback for acquiring the accept mutex, used to sync multiple server
processes. The callback should return true or false. Note that exceptions in
this callback are not captured.

  $loop->lock(sub {
    my $blocking = shift;

    # Got the accept mutex, start accepting new connections
    return 1;
  });

=head2 max_accepts

  my $max = $loop->max_accepts;
  $loop   = $loop->max_accepts(1000);

The maximum number of connections this event loop is allowed to accept before
shutting down gracefully without interrupting existing connections, defaults
to C<0>. Setting the value to C<0> will allow this event loop to accept new
connections indefinitely. Note that up to half of this value can be subtracted
randomly to improve load balancing between multiple server processes.

=head2 max_connections

  my $max = $loop->max_connections;
  $loop   = $loop->max_connections(1000);

The maximum number of concurrent connections this event loop is allowed to
handle before stopping to accept new incoming connections, defaults to
C<1000>. Setting the value to C<0> will make this event loop stop accepting
new connections and allow it to shut down gracefully without interrupting
existing connections.

=head2 multi_accept

  my $multi = $loop->multi_accept;
  $loop     = $loop->multi_accept(100);

Number of connections to accept at once, defaults to C<50>.

=head2 reactor

  my $reactor = $loop->reactor;
  $loop       = $loop->reactor(Mojo::Reactor->new);

Low-level event reactor, usually a L<Mojo::Reactor::Poll> or
L<Mojo::Reactor::EV> object with a default subscriber to the event
L<Mojo::Reactor/"error">.

  # Watch if handle becomes readable or writable
  $loop->reactor->io($handle => sub {
    my ($reactor, $writable) = @_;
    say $writable ? 'Handle is writable' : 'Handle is readable';
  });

  # Change to watching only if handle becomes writable
  $loop->reactor->watch($handle, 0, 1);

=head2 unlock

  my $cb = $loop->unlock;
  $loop  = $loop->unlock(sub {...});

A callback for releasing the accept mutex, used to sync multiple server
processes. Note that exceptions in this callback are not captured.

=head1 METHODS

L<Mojo::IOLoop> inherits all methods from L<Mojo::Base> and implements the
following new ones.

=head2 acceptor

  my $server = Mojo::IOLoop->acceptor($id);
  my $server = $loop->acceptor($id);
  my $id     = $loop->acceptor(Mojo::IOLoop::Server->new);

Get L<Mojo::IOLoop::Server> object for id or turn object into an acceptor.

=head2 client

  my $id
    = Mojo::IOLoop->client(address => '127.0.0.1', port => 3000, sub {...});
  my $id = $loop->client(address => '127.0.0.1', port => 3000, sub {...});
  my $id = $loop->client({address => '127.0.0.1', port => 3000} => sub {...});

Open TCP connection with L<Mojo::IOLoop::Client>, takes the same arguments as
L<Mojo::IOLoop::Client/"connect">.

  # Connect to localhost on port 3000
  Mojo::IOLoop->client({port => 3000} => sub {
    my ($loop, $err, $stream) = @_;
    ...
  });

=head2 delay

  my $delay = Mojo::IOLoop->delay;
  my $delay = $loop->delay;
  my $delay = $loop->delay(sub {...});
  my $delay = $loop->delay(sub {...}, sub {...});

Build L<Mojo::IOLoop::Delay> object to manage callbacks and control the flow
of events for this event loop, which can help you avoid deep nested closures
and memory leaks that often result from continuation-passing style. Callbacks
will be passed along to L<Mojo::IOLoop::Delay/"steps">.

  # Synchronize multiple events
  my $delay = Mojo::IOLoop->delay(sub { say 'BOOM!' });
  for my $i (1 .. 10) {
    my $end = $delay->begin;
    Mojo::IOLoop->timer($i => sub {
      say 10 - $i;
      $end->();
    });
  }
  $delay->wait;

  # Sequentialize multiple events
  Mojo::IOLoop->delay(

    # First step (simple timer)
    sub {
      my $delay = shift;
      Mojo::IOLoop->timer(2 => $delay->begin);
      say 'Second step in 2 seconds.';
    },

    # Second step (concurrent timers)
    sub {
      my $delay = shift;
      Mojo::IOLoop->timer(1 => $delay->begin);
      Mojo::IOLoop->timer(3 => $delay->begin);
      say 'Third step in 3 seconds.';
    },

    # Third step (the end)
    sub { say 'And done after 5 seconds total.' }
  )->wait;

  # Handle exceptions in all steps
  Mojo::IOLoop->delay(
    sub {
      my $delay = shift;
      die 'Intentional error';
    },
    sub {
      my ($delay, @args) = @_;
      say 'Never actually reached.';
    }
  )->catch(sub {
    my ($delay, $err) = @_;
    say "Something went wrong: $err";
  })->wait;

=head2 is_running

  my $bool = Mojo::IOLoop->is_running;
  my $bool = $loop->is_running;

Check if event loop is running.

  exit unless Mojo::IOLoop->is_running;

=head2 next_tick

  my $undef = Mojo::IOLoop->next_tick(sub {...});
  my $undef = $loop->next_tick(sub {...});

Invoke callback as soon as possible, but not before returning, always returns
C<undef>.

  # Perform operation on next reactor tick
  Mojo::IOLoop->next_tick(sub {
    my $loop = shift;
    ...
  });

=head2 one_tick

  Mojo::IOLoop->one_tick;
  $loop->one_tick;

Run event loop until an event occurs. Note that this method can recurse back
into the reactor, so you need to be careful.

  # Don't block longer than 0.5 seconds
  my $id = Mojo::IOLoop->timer(0.5 => sub {});
  Mojo::IOLoop->one_tick;
  Mojo::IOLoop->remove($id);

=head2 recurring

  my $id = Mojo::IOLoop->recurring(3 => sub {...});
  my $id = $loop->recurring(0 => sub {...});
  my $id = $loop->recurring(0.25 => sub {...});

Create a new recurring timer, invoking the callback repeatedly after a given
amount of time in seconds.

  # Perform operation every 5 seconds
  Mojo::IOLoop->recurring(5 => sub {
    my $loop = shift;
    ...
  });

=head2 remove

  Mojo::IOLoop->remove($id);
  $loop->remove($id);

Remove anything with an id, connections will be dropped gracefully by allowing
them to finish writing all data in their write buffers.

=head2 reset

  Mojo::IOLoop->reset;
  $loop->reset;

Remove everything and stop the event loop.

=head2 server

  my $id = Mojo::IOLoop->server(port => 3000, sub {...});
  my $id = $loop->server(port => 3000, sub {...});
  my $id = $loop->server({port => 3000} => sub {...});

Accept TCP connections with L<Mojo::IOLoop::Server>, takes the same arguments
as L<Mojo::IOLoop::Server/"listen">.

  # Listen on port 3000
  Mojo::IOLoop->server({port => 3000} => sub {
    my ($loop, $stream, $id) = @_;
    ...
  });

  # Listen on random port
  my $id = Mojo::IOLoop->server({address => '127.0.0.1'} => sub {
    my ($loop, $stream, $id) = @_;
    ...
  });
  my $port = Mojo::IOLoop->acceptor($id)->handle->sockport;

=head2 singleton

  my $loop = Mojo::IOLoop->singleton;

The global L<Mojo::IOLoop> singleton, used to access a single shared event
loop object from everywhere inside the process.

  # Many methods also allow you to take shortcuts
  Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop });
  Mojo::IOLoop->start;

  # Restart active timer
  my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' });
  Mojo::IOLoop->singleton->reactor->again($id);

=head2 start

  Mojo::IOLoop->start;
  $loop->start;

Start the event loop, this will block until L</"stop"> is called. Note that
some reactors stop automatically if there are no events being watched anymore.

  # Start event loop only if it is not running already
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 stop

  Mojo::IOLoop->stop;
  $loop->stop;

Stop the event loop, this will not interrupt any existing connections and the
event loop can be restarted by running L</"start"> again.

=head2 stream

  my $stream = Mojo::IOLoop->stream($id);
  my $stream = $loop->stream($id);
  my $id     = $loop->stream(Mojo::IOLoop::Stream->new);

Get L<Mojo::IOLoop::Stream> object for id or turn object into a connection.

  # Increase inactivity timeout for connection to 300 seconds
  Mojo::IOLoop->stream($id)->timeout(300);

=head2 timer

  my $id = Mojo::IOLoop->timer(3 => sub {...});
  my $id = $loop->timer(0 => sub {...});
  my $id = $loop->timer(0.25 => sub {...});

Create a new timer, invoking the callback after a given amount of time in
seconds.

  # Perform operation in 5 seconds
  Mojo::IOLoop->timer(5 => sub {
    my $loop = shift;
    ...
  });

=head1 DEBUGGING

You can set the C<MOJO_IOLOOP_DEBUG> environment variable to get some advanced
diagnostics information printed to C<STDERR>.

  MOJO_IOLOOP_DEBUG=1

=head1 SEE ALSO

L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>.

=cut