package Mojo::IOLoop;
use Mojo::Base 'Mojo::EventEmitter';
# "Professor: Amy, technology isn't intrinsically good or evil. It's how it's
# used. Like the death ray."
use Carp 'croak';
use Mojo::IOLoop::Client;
use Mojo::IOLoop::Delay;
use Mojo::IOLoop::Server;
use Mojo::IOLoop::Stream;
use Mojo::Reactor::Poll;
use Mojo::Util qw(md5_sum steady_time);
use Scalar::Util qw(blessed weaken);
use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;
has max_accepts => 0;
has max_connections => 1000;
has multi_accept => sub { shift->max_connections > 50 ? 50 : 1 };
has reactor => sub {
my $class = Mojo::Reactor::Poll->detect;
warn "-- Reactor initialized ($class)\n" if DEBUG;
return $class->new->catch(sub { warn "@{[blessed $_[0]]}: $_[1]" });
};
# Ignore PIPE signal
$SIG{PIPE} = 'IGNORE';
# Initialize singleton reactor early
__PACKAGE__->singleton->reactor;
sub acceptor {
my ($self, $acceptor) = (_instance(shift), @_);
# Find acceptor for id
return $self->{acceptors}{$acceptor} unless ref $acceptor;
# Connect acceptor with reactor
my $id = $self->_id;
$self->{acceptors}{$id} = $acceptor->multi_accept($self->multi_accept);
weaken $acceptor->reactor($self->reactor)->{reactor};
# Allow new acceptor to get picked up
$self->_not_accepting->_maybe_accepting;
return $id;
}
sub client {
my ($self, $cb) = (_instance(shift), pop);
my $id = $self->_id;
my $client = $self->{connections}{$id}{client} = Mojo::IOLoop::Client->new;
weaken $client->reactor($self->reactor)->{reactor};
weaken $self;
$client->on(
connect => sub {
delete $self->{connections}{$id}{client};
my $stream = Mojo::IOLoop::Stream->new(pop);
$self->_stream($stream => $id);
$self->$cb(undef, $stream);
}
);
$client->on(error => sub { $self->_remove($id); $self->$cb(pop, undef) });
$client->connect(@_);
return $id;
}
sub delay {
my $delay = Mojo::IOLoop::Delay->new;
weaken $delay->ioloop(_instance(shift))->{ioloop};
return @_ ? $delay->steps(@_) : $delay;
}
sub is_running { _instance(shift)->reactor->is_running }
sub next_tick {
my ($self, $cb) = (_instance(shift), @_);
weaken $self;
return $self->reactor->next_tick(sub { $self->$cb });
}
sub one_tick { _instance(shift)->reactor->one_tick }
sub recurring { shift->_timer(recurring => @_) }
sub remove {
my ($self, $id) = (_instance(shift), @_);
my $c = $self->{connections}{$id};
if ($c && (my $stream = $c->{stream})) { return $stream->close_gracefully }
$self->_remove($id);
}
sub reset {
my $self = _instance(shift);
delete @$self{qw(accepting acceptors connections stop)};
$self->reactor->reset;
$self->stop;
}
sub server {
my ($self, $cb) = (_instance(shift), pop);
my $server = Mojo::IOLoop::Server->new;
weaken $self;
$server->on(
accept => sub {
# Enforce connection limit (randomize to improve load balancing)
if (my $max = $self->max_accepts) {
$self->{accepts} //= $max - int rand $max / 2;
$self->stop_gracefully if ($self->{accepts} -= 1) <= 0;
}
my $stream = Mojo::IOLoop::Stream->new(pop);
$self->$cb($stream, $self->stream($stream));
# Stop accepting if connection limit has been reached
$self->_not_accepting if $self->_limit;
}
);
$server->listen(@_);
return $self->acceptor($server);
}
sub singleton { state $loop = shift->SUPER::new }
sub start {
my $self = shift;
croak 'Mojo::IOLoop already running' if $self->is_running;
_instance($self)->reactor->start;
}
sub stop { _instance(shift)->reactor->stop }
sub stop_gracefully {
my $self = _instance(shift)->_not_accepting;
$self->{stop} ||= $self->emit('finish')->recurring(1 => \&_stop);
}
sub stream {
my ($self, $stream) = (_instance(shift), @_);
return ($self->{connections}{$stream} || {})->{stream} unless ref $stream;
return $self->_stream($stream => $self->_id);
}
sub timer { shift->_timer(timer => @_) }
sub _id {
my $self = shift;
my $id;
do { $id = md5_sum 'c' . steady_time . rand 999 }
while $self->{connections}{$id} || $self->{acceptors}{$id};
return $id;
}
sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton }
sub _limit {
my $self = shift;
return 1 if $self->{stop};
return keys %{$self->{connections}} >= $self->max_connections;
}
sub _maybe_accepting {
my $self = shift;
return if $self->{accepting} || $self->_limit;
$_->start for values %{$self->{acceptors} || {}};
$self->{accepting} = 1;
}
sub _not_accepting {
my $self = shift;
return $self unless delete $self->{accepting};
$_->stop for values %{$self->{acceptors} || {}};
return $self;
}
sub _remove {
my ($self, $id) = @_;
# Timer
return unless my $reactor = $self->reactor;
return if $reactor->remove($id);
# Acceptor
return $self->_not_accepting->_maybe_accepting
if delete $self->{acceptors}{$id};
# Connection
return unless delete $self->{connections}{$id};
$self->_maybe_accepting;
warn "-- $id <<< $$ (@{[scalar keys %{$self->{connections}}]})\n" if DEBUG;
}
sub _stop {
my $self = shift;
return if keys %{$self->{connections}};
$self->_remove(delete $self->{stop});
$self->stop;
}
sub _stream {
my ($self, $stream, $id) = @_;
# Connect stream with reactor
$self->{connections}{$id}{stream} = $stream;
warn "-- $id >>> $$ (@{[scalar keys %{$self->{connections}}]})\n" if DEBUG;
weaken $stream->reactor($self->reactor)->{reactor};
weaken $self;
$stream->on(close => sub { $self && $self->_remove($id) });
$stream->start;
return $id;
}
sub _timer {
my ($self, $method, $after, $cb) = (_instance(shift), @_);
weaken $self;
return $self->reactor->$method($after => sub { $self->$cb });
}
1;
=encoding utf8
=head1 NAME
Mojo::IOLoop - Minimalistic event loop
=head1 SYNOPSIS
use Mojo::IOLoop;
# Listen on port 3000
Mojo::IOLoop->server({port => 3000} => sub {
my ($loop, $stream) = @_;
$stream->on(read => sub {
my ($stream, $bytes) = @_;
# Process input chunk
say $bytes;
# Write response
$stream->write('HTTP/1.1 200 OK');
});
});
# Connect to port 3000
my $id = Mojo::IOLoop->client({port => 3000} => sub {
my ($loop, $err, $stream) = @_;
$stream->on(read => sub {
my ($stream, $bytes) = @_;
# Process input
say "Input: $bytes";
});
# Write request
$stream->write("GET / HTTP/1.1\x0d\x0a\x0d\x0a");
});
# Add a timer
Mojo::IOLoop->timer(5 => sub {
my $loop = shift;
$loop->remove($id);
});
# Start event loop if necessary
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
=head1 DESCRIPTION
L<Mojo::IOLoop> is a very minimalistic event loop based on L<Mojo::Reactor>, it
has been reduced to the absolute minimal feature set required to build solid
and scalable non-blocking TCP clients and servers.
Depending on operating system, the default per-process and system-wide file
descriptor limits are often very low and need to be tuned for better
scalability. The C<LIBEV_FLAGS> environment variable should also be used to
select the best possible L<EV> backend, which usually defaults to the not very
scalable C<select>.
LIBEV_FLAGS=1 # select
LIBEV_FLAGS=2 # poll
LIBEV_FLAGS=4 # epoll (Linux)
LIBEV_FLAGS=8 # kqueue (*BSD, OS X)
The event loop will be resilient to time jumps if a monotonic clock is
available through L<Time::HiRes>. A TLS certificate and key are also built
right in, to make writing test servers as easy as possible. Also note that for
convenience the C<PIPE> signal will be set to C<IGNORE> when L<Mojo::IOLoop> is
loaded.
For better scalability (epoll, kqueue) and to provide non-blocking name
resolution, SOCKS5 as well as TLS support, the optional modules L<EV> (4.0+),
L<Net::DNS::Native> (0.15+), L<IO::Socket::Socks> (0.64+) and
L<IO::Socket::SSL> (1.94+) will be used automatically if possible. Individual
features can also be disabled with the C<MOJO_NO_NDN>, C<MOJO_NO_SOCKS> and
C<MOJO_NO_TLS> environment variables.
See L<Mojolicious::Guides::Cookbook/"REAL-TIME WEB"> for more.
=head1 EVENTS
L<Mojo::IOLoop> inherits all events from L<Mojo::EventEmitter> and can emit the
following new ones.
=head2 finish
$loop->on(finish => sub {
my $loop = shift;
...
});
Emitted when the event loop wants to shut down gracefully and is just waiting
for all existing connections to be closed.
=head1 ATTRIBUTES
L<Mojo::IOLoop> implements the following attributes.
=head2 max_accepts
my $max = $loop->max_accepts;
$loop = $loop->max_accepts(1000);
The maximum number of connections this event loop is allowed to accept before
shutting down gracefully without interrupting existing connections, defaults to
C<0>. Setting the value to C<0> will allow this event loop to accept new
connections indefinitely. Note that up to half of this value can be subtracted
randomly to improve load balancing between multiple server processes.
=head2 max_connections
my $max = $loop->max_connections;
$loop = $loop->max_connections(1000);
The maximum number of concurrent connections this event loop is allowed to
handle before stopping to accept new incoming connections, defaults to C<1000>.
=head2 multi_accept
my $multi = $loop->multi_accept;
$loop = $loop->multi_accept(100);
Number of connections to accept at once, defaults to C<50> or C<1>, depending
on if the value of L</"max_connections"> is smaller than C<50>.
=head2 reactor
my $reactor = $loop->reactor;
$loop = $loop->reactor(Mojo::Reactor->new);
Low-level event reactor, usually a L<Mojo::Reactor::Poll> or
L<Mojo::Reactor::EV> object with a default subscriber to the event
L<Mojo::Reactor/"error">.
# Watch if handle becomes readable or writable
$loop->reactor->io($handle => sub {
my ($reactor, $writable) = @_;
say $writable ? 'Handle is writable' : 'Handle is readable';
});
# Change to watching only if handle becomes writable
$loop->reactor->watch($handle, 0, 1);
# Remove handle again
$loop->reactor->remove($handle);
=head1 METHODS
L<Mojo::IOLoop> inherits all methods from L<Mojo::EventEmitter> and implements
the following new ones.
=head2 acceptor
my $server = Mojo::IOLoop->acceptor($id);
my $server = $loop->acceptor($id);
my $id = $loop->acceptor(Mojo::IOLoop::Server->new);
Get L<Mojo::IOLoop::Server> object for id or turn object into an acceptor.
=head2 client
my $id
= Mojo::IOLoop->client(address => '127.0.0.1', port => 3000, sub {...});
my $id = $loop->client(address => '127.0.0.1', port => 3000, sub {...});
my $id = $loop->client({address => '127.0.0.1', port => 3000} => sub {...});
Open TCP connection with L<Mojo::IOLoop::Client>, takes the same arguments as
L<Mojo::IOLoop::Client/"connect">.
# Connect to 127.0.0.1 on port 3000
Mojo::IOLoop->client({port => 3000} => sub {
my ($loop, $err, $stream) = @_;
...
});
=head2 delay
my $delay = Mojo::IOLoop->delay;
my $delay = $loop->delay;
my $delay = $loop->delay(sub {...});
my $delay = $loop->delay(sub {...}, sub {...});
Build L<Mojo::IOLoop::Delay> object to manage callbacks and control the flow of
events for this event loop, which can help you avoid deep nested closures that
often result from continuation-passing style. Callbacks will be passed along to
L<Mojo::IOLoop::Delay/"steps">.
# Synchronize multiple events
my $delay = Mojo::IOLoop->delay(sub { say 'BOOM!' });
for my $i (1 .. 10) {
my $end = $delay->begin;
Mojo::IOLoop->timer($i => sub {
say 10 - $i;
$end->();
});
}
$delay->wait;
# Sequentialize multiple events
Mojo::IOLoop->delay(
# First step (simple timer)
sub {
my $delay = shift;
Mojo::IOLoop->timer(2 => $delay->begin);
say 'Second step in 2 seconds.';
},
# Second step (concurrent timers)
sub {
my $delay = shift;
Mojo::IOLoop->timer(1 => $delay->begin);
Mojo::IOLoop->timer(3 => $delay->begin);
say 'Third step in 3 seconds.';
},
# Third step (the end)
sub { say 'And done after 5 seconds total.' }
)->wait;
# Handle exceptions in all steps
Mojo::IOLoop->delay(
sub {
my $delay = shift;
die 'Intentional error';
},
sub {
my ($delay, @args) = @_;
say 'Never actually reached.';
}
)->catch(sub {
my ($delay, $err) = @_;
say "Something went wrong: $err";
})->wait;
=head2 is_running
my $bool = Mojo::IOLoop->is_running;
my $bool = $loop->is_running;
Check if event loop is running.
exit unless Mojo::IOLoop->is_running;
=head2 next_tick
my $undef = Mojo::IOLoop->next_tick(sub {...});
my $undef = $loop->next_tick(sub {...});
Invoke callback as soon as possible, but not before returning or other
callbacks that have been registered with this method, always returns C<undef>.
# Perform operation on next reactor tick
Mojo::IOLoop->next_tick(sub {
my $loop = shift;
...
});
=head2 one_tick
Mojo::IOLoop->one_tick;
$loop->one_tick;
Run event loop until an event occurs. Note that this method can recurse back
into the reactor, so you need to be careful.
# Don't block longer than 0.5 seconds
my $id = Mojo::IOLoop->timer(0.5 => sub {});
Mojo::IOLoop->one_tick;
Mojo::IOLoop->remove($id);
=head2 recurring
my $id = Mojo::IOLoop->recurring(3 => sub {...});
my $id = $loop->recurring(0 => sub {...});
my $id = $loop->recurring(0.25 => sub {...});
Create a new recurring timer, invoking the callback repeatedly after a given
amount of time in seconds.
# Perform operation every 5 seconds
Mojo::IOLoop->recurring(5 => sub {
my $loop = shift;
...
});
=head2 remove
Mojo::IOLoop->remove($id);
$loop->remove($id);
Remove anything with an id, connections will be dropped gracefully by allowing
them to finish writing all data in their write buffers.
=head2 reset
Mojo::IOLoop->reset;
$loop->reset;
Remove everything and stop the event loop.
=head2 server
my $id = Mojo::IOLoop->server(port => 3000, sub {...});
my $id = $loop->server(port => 3000, sub {...});
my $id = $loop->server({port => 3000} => sub {...});
Accept TCP connections with L<Mojo::IOLoop::Server>, takes the same arguments
as L<Mojo::IOLoop::Server/"listen">.
# Listen on port 3000
Mojo::IOLoop->server({port => 3000} => sub {
my ($loop, $stream, $id) = @_;
...
});
# Listen on random port
my $id = Mojo::IOLoop->server({address => '127.0.0.1'} => sub {
my ($loop, $stream, $id) = @_;
...
});
my $port = Mojo::IOLoop->acceptor($id)->port;
=head2 singleton
my $loop = Mojo::IOLoop->singleton;
The global L<Mojo::IOLoop> singleton, used to access a single shared event loop
object from everywhere inside the process.
# Many methods also allow you to take shortcuts
Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop });
Mojo::IOLoop->start;
# Restart active timer
my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' });
Mojo::IOLoop->singleton->reactor->again($id);
# Turn file descriptor into handle and watch if it becomes readable
my $handle = IO::Handle->new_from_fd($fd, 'r');
Mojo::IOLoop->singleton->reactor->io($handle => sub {
my ($reactor, $writable) = @_;
say $writable ? 'Handle is writable' : 'Handle is readable';
})->watch($handle, 1, 0);
=head2 start
Mojo::IOLoop->start;
$loop->start;
Start the event loop, this will block until L</"stop"> is called. Note that
some reactors stop automatically if there are no events being watched anymore.
# Start event loop only if it is not running already
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
=head2 stop
Mojo::IOLoop->stop;
$loop->stop;
Stop the event loop, this will not interrupt any existing connections and the
event loop can be restarted by running L</"start"> again.
=head2 stop_gracefully
Mojo::IOLoop->stop_gracefully;
$loop->stop_gracefully;
Stop accepting new connections and wait for all existing connections to be
closed before stopping the event loop.
=head2 stream
my $stream = Mojo::IOLoop->stream($id);
my $stream = $loop->stream($id);
my $id = $loop->stream(Mojo::IOLoop::Stream->new);
Get L<Mojo::IOLoop::Stream> object for id or turn object into a connection.
# Increase inactivity timeout for connection to 300 seconds
Mojo::IOLoop->stream($id)->timeout(300);
=head2 timer
my $id = Mojo::IOLoop->timer(3 => sub {...});
my $id = $loop->timer(0 => sub {...});
my $id = $loop->timer(0.25 => sub {...});
Create a new timer, invoking the callback after a given amount of time in
seconds.
# Perform operation in 5 seconds
Mojo::IOLoop->timer(5 => sub {
my $loop = shift;
...
});
=head1 DEBUGGING
You can set the C<MOJO_IOLOOP_DEBUG> environment variable to get some advanced
diagnostics information printed to C<STDERR>.
MOJO_IOLOOP_DEBUG=1
=head1 SEE ALSO
L<Mojolicious>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
=cut