@@ -1,5 +1,8 @@
Revision history for Perl extension Starlet
+0.22
+ - listen to muliple ports passed from Server::Starter (ttakezawa)
+
0.21
- support listening to unix socket (passed by Server::Starter) (kazeburo)
@@ -29,3 +29,4 @@ t/07remote_port.t
t/08chunked_req.t
t/09chunked_zero_length.t
t/10unix_domain_socket.t
+t/11multi-sockets.t
@@ -28,4 +28,4 @@ requires:
perl: 5.8.1
resources:
license: http://dev.perl.org/licenses/
-version: 0.21
+version: 0.22
@@ -48,6 +48,6 @@
"http://dev.perl.org/licenses/"
]
},
- "version" : "0.20",
+ "version" : "0.21",
"x_module_name" : "Starlet"
}
@@ -27,5 +27,5 @@ requires:
perl: 5.008001
resources:
license: http://dev.perl.org/licenses/
-version: 0.20
+version: 0.21
x_module_name: Starlet
@@ -76,6 +76,8 @@ AUTHOR
kazeburo
+ Tomohiro Takezawa
+
LICENSE
This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.
@@ -11,20 +11,29 @@ sub new {
my ($klass, %args) = @_;
# setup before instantiation
- my $listen_sock;
if (defined $ENV{SERVER_STARTER_PORT}) {
- my ($hostport, $fd) = %{Server::Starter::server_ports()};
- if ($hostport =~ /(.*):(\d+)/) {
- $args{host} = $1;
- $args{port} = $2;
- } else {
- $args{port} = $hostport;
+ $args{listens} = [];
+ my $server_ports = Server::Starter::server_ports();
+ for my $hostport (keys %$server_ports) {
+ my $fd = $server_ports->{$hostport};
+ my $listen = {};
+ if ($hostport =~ /(.*):(\d+)/) {
+ $listen->{host} = $1;
+ $listen->{port} = $2;
+ } else {
+ $listen->{port} = $hostport;
+ }
+ $listen->{sock} = IO::Socket::INET->new(
+ Proto => 'tcp',
+ ) or die "failed to create socket:$!";
+ $listen->{sock}->fdopen($fd, 'w')
+ or die "failed to bind to listening socket:$!";
+ unless (@{$args{listens}}) {
+ $args{host} = $listen->{host};
+ $args{port} = $listen->{port};
+ }
+ $args{listens}[$fd] = $listen;
}
- $listen_sock = IO::Socket::INET->new(
- Proto => 'tcp',
- ) or die "failed to create socket:$!";
- $listen_sock->fdopen($fd, 'w')
- or die "failed to bind to listening socket:$!";
}
my $max_workers = 10;
for (qw(max_workers workers)) {
@@ -35,8 +44,6 @@ sub new {
# instantiate and set the variables
my $self = $klass->SUPER::new(%args);
$self->{is_multiprocess} = 1;
- $self->{listen_sock} = $listen_sock
- if $listen_sock;
$self->{max_workers} = $max_workers;
$self;
@@ -27,6 +27,7 @@ sub new {
my($class, %args) = @_;
my $self = bless {
+ listens => $args{listens} || [],
host => $args{host} || 0,
port => $args{port} || 8080,
timeout => $args{timeout} || 300,
@@ -68,21 +69,31 @@ sub run {
sub setup_listener {
my $self = shift;
- $self->{listen_sock} ||= IO::Socket::INET->new(
- Listen => SOMAXCONN,
- LocalPort => $self->{port},
- LocalAddr => $self->{host},
- Proto => 'tcp',
- ReuseAddr => 1,
- ) or die "failed to listen to port $self->{port}:$!";
-
- my $family = Socket::sockaddr_family(getsockname($self->{listen_sock}));
- $self->{_listen_sock_is_tcp} = $family != AF_UNIX;
-
- # set defer accept
- if ($^O eq 'linux' && $self->{_listen_sock_is_tcp}) {
- setsockopt($self->{listen_sock}, IPPROTO_TCP, 9, 1)
- and $self->{_using_defer_accept} = 1;
+ if (scalar(grep {defined $_} @{$self->{listens}}) == 0) {
+ my $sock =
+ IO::Socket::INET->new(
+ Listen => SOMAXCONN,
+ LocalPort => $self->{port},
+ LocalAddr => $self->{host},
+ Proto => 'tcp',
+ ReuseAddr => 1,
+ ) or die "failed to listen to port $self->{port}:$!";
+ $self->{listens}[fileno($sock)] = {
+ host => $self->{host},
+ port => $self->{port},
+ sock => $sock,
+ };
+ }
+
+ for my $listen (grep {defined $_} @{$self->{listens}}) {
+ my $family = Socket::sockaddr_family(getsockname($listen->{sock}));
+ $listen->{_is_tcp} = $family != AF_UNIX;
+
+ # set defer accept
+ if ($^O eq 'linux' && $listen->{_is_tcp}) {
+ setsockopt($listen->{sock}, IPPROTO_TCP, 9, 1)
+ and $listen->{_using_defer_accept} = 1;
+ }
}
$self->{server_ready}->($self);
@@ -105,61 +116,103 @@ sub accept_loop {
local $SIG{PIPE} = 'IGNORE';
+ my $acceptor = $self->_get_acceptor;
+
while (! defined $max_reqs_per_child || $proc_req_count < $max_reqs_per_child) {
- if (my ($conn,$peer) = $self->{listen_sock}->accept) {
- $self->{_is_deferred_accept} = $self->{_using_defer_accept};
- $conn->blocking(0)
- or die "failed to set socket to nonblocking mode:$!";
- my ($peerport, $peerhost, $peeraddr) = (0, undef, undef);
- if ($self->{_listen_sock_is_tcp}) {
- $conn->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
- or die "setsockopt(TCP_NODELAY) failed:$!";
- ($peerport, $peerhost) = unpack_sockaddr_in $peer;
- $peeraddr = inet_ntoa($peerhost);
+ my ($conn, $peer, $listen) = $acceptor->();
+ $self->{_is_deferred_accept} = $listen->{_using_defer_accept};
+ $conn->blocking(0)
+ or die "failed to set socket to nonblocking mode:$!";
+ my ($peerport, $peerhost, $peeraddr) = (0, undef, undef);
+ if ($listen->{_is_tcp}) {
+ $conn->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
+ or die "setsockopt(TCP_NODELAY) failed:$!";
+ ($peerport, $peerhost) = unpack_sockaddr_in $peer;
+ $peeraddr = inet_ntoa($peerhost);
+ }
+ my $req_count = 0;
+ my $pipelined_buf = '';
+
+ while (1) {
+ ++$req_count;
+ ++$proc_req_count;
+ my $env = {
+ SERVER_PORT => $listen->{port} || 0,
+ SERVER_NAME => $listen->{host} || 0,
+ SCRIPT_NAME => '',
+ REMOTE_ADDR => $peeraddr,
+ REMOTE_PORT => $peerport,
+ 'psgi.version' => [ 1, 1 ],
+ 'psgi.errors' => *STDERR,
+ 'psgi.url_scheme' => 'http',
+ 'psgi.run_once' => Plack::Util::FALSE,
+ 'psgi.multithread' => Plack::Util::FALSE,
+ 'psgi.multiprocess' => $self->{is_multiprocess},
+ 'psgi.streaming' => Plack::Util::TRUE,
+ 'psgi.nonblocking' => Plack::Util::FALSE,
+ 'psgix.input.buffered' => Plack::Util::TRUE,
+ 'psgix.io' => $conn,
+ 'psgix.harakiri' => 1,
+ };
+
+ my $may_keepalive = $req_count < $self->{max_keepalive_reqs};
+ if ($may_keepalive && $max_reqs_per_child && $proc_req_count >= $max_reqs_per_child) {
+ $may_keepalive = undef;
}
- my $req_count = 0;
- my $pipelined_buf = '';
+ $may_keepalive = 1 if length $pipelined_buf;
+ my $keepalive;
+ ($keepalive, $pipelined_buf) = $self->handle_connection($env, $conn, $app,
+ $may_keepalive, $req_count != 1, $pipelined_buf);
+
+ if ($env->{'psgix.harakiri.commit'}) {
+ $conn->close;
+ return;
+ }
+ last unless $keepalive;
+ # TODO add special cases for clients with broken keep-alive support, as well as disabling keep-alive for HTTP/1.0 proxies
+ }
+ $conn->close;
+ }
+}
+sub _get_acceptor {
+ my $self = shift;
+ my @listens = grep {defined $_} @{$self->{listens}};
+
+ if (scalar(@listens) == 1) {
+ my $listen = $listens[0];
+ return sub {
while (1) {
- ++$req_count;
- ++$proc_req_count;
- my $env = {
- SERVER_PORT => $self->{port} || 0,
- SERVER_NAME => $self->{host} || 0,
- SCRIPT_NAME => '',
- REMOTE_ADDR => $peeraddr,
- REMOTE_PORT => $peerport,
- 'psgi.version' => [ 1, 1 ],
- 'psgi.errors' => *STDERR,
- 'psgi.url_scheme' => 'http',
- 'psgi.run_once' => Plack::Util::FALSE,
- 'psgi.multithread' => Plack::Util::FALSE,
- 'psgi.multiprocess' => $self->{is_multiprocess},
- 'psgi.streaming' => Plack::Util::TRUE,
- 'psgi.nonblocking' => Plack::Util::FALSE,
- 'psgix.input.buffered' => Plack::Util::TRUE,
- 'psgix.io' => $conn,
- 'psgix.harakiri' => 1,
- };
-
- my $may_keepalive = $req_count < $self->{max_keepalive_reqs};
- if ($may_keepalive && $max_reqs_per_child && $proc_req_count >= $max_reqs_per_child) {
- $may_keepalive = undef;
- }
- $may_keepalive = 1 if length $pipelined_buf;
- my $keepalive;
- ($keepalive, $pipelined_buf) = $self->handle_connection($env, $conn, $app,
- $may_keepalive, $req_count != 1, $pipelined_buf);
-
- if ($env->{'psgix.harakiri.commit'}) {
- $conn->close;
- return;
+ if (my ($conn, $peer) = $listen->{sock}->accept) {
+ return ($conn, $peer, $listen);
}
- last unless $keepalive;
- # TODO add special cases for clients with broken keep-alive support, as well as disabling keep-alive for HTTP/1.0 proxies
}
- $conn->close;
+ };
+ }
+ else {
+ # wait for multiple sockets with select(2)
+ my @fds;
+ my $rin = '';
+ for my $listen (@listens) {
+ $listen->{sock}->blocking(0);
+ my $fd = fileno($listen->{sock});
+ push @fds, $fd;
+ vec($rin, $fd, 1) = 1;
}
+ return sub {
+ while (1) {
+ my $nfound = select(my $rout = $rin, undef, undef, undef);
+ for (my $i = 0; $nfound > 0; ++$i) {
+ my $fd = $fds[$i];
+ next unless vec($rout, $fd, 1);
+ --$nfound;
+ my $listen = $self->{listens}[$fd];
+ if (my ($conn, $peer) = $listen->{sock}->accept) {
+ return ($conn, $peer, $listen)
+ }
+ }
+ }
+ };
}
}
@@ -2,7 +2,7 @@ package Starlet;
use 5.008_001;
-our $VERSION = '0.21';
+our $VERSION = '0.22';
1;
__END__
@@ -83,6 +83,8 @@ miyagawa
kazeburo
+Tomohiro Takezawa
+
=head1 LICENSE
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
@@ -0,0 +1,87 @@
+use strict;
+use warnings;
+
+use Test::More;
+use Plack::Loader;
+use File::Temp;
+use IO::Socket::INET;
+use Net::EmptyPort qw(empty_port);
+use Socket;
+
+my $PORT_NUM = 3;
+my $UDS_NUM = 4;
+my $WORKER_NUM = 2;
+
+my @tcp_socks = map {
+ IO::Socket::INET->new(
+ Listen => Socket::SOMAXCONN(),
+ Proto => 'tcp',
+ LocalPort => empty_port(),
+ LocalAddr => '127.0.0.1',
+ ReuseAddr => 1,
+ ) or die "failed to listen:$!";
+} (1..$PORT_NUM);
+
+my @uds_socks = map {
+ my ($fh, $filename) = File::Temp::tempfile(UNLINK=>0);
+ close($fh);
+ unlink($filename);
+ IO::Socket::UNIX->new(
+ Listen => Socket::SOMAXCONN(),
+ Local => $filename,
+ ) or die "failed to listen to socket $filename:$!";
+} (1..$UDS_NUM);
+
+$ENV{SERVER_STARTER_PORT} = join ';', (
+ map($_->sockport.'='.$_->fileno, @tcp_socks),
+ map($_->hostpath.'='.$_->fileno, @uds_socks),
+);
+
+my $pid = fork;
+if ( $pid == 0 ) {
+ # server
+ my $loader = Plack::Loader->load(
+ 'Starlet',
+ max_workers => $WORKER_NUM,
+ );
+ $loader->run(sub{
+ my $env = shift;
+ [200, ['Content-Type'=>'text/html'], ["HELLO $env->{SERVER_PORT}"]];
+ });
+ exit;
+}
+
+sleep 1;
+
+for my $listen_sock (@tcp_socks, @uds_socks) {
+ my ($client, $port);
+ if ($listen_sock->sockdomain == AF_INET) {
+ $port = $listen_sock->sockport;
+ $client = IO::Socket::INET->new(
+ Proto => 'tcp',
+ PeerAddr => '127.0.0.1',
+ PeerPort => $listen_sock->sockport,
+ timeout => 3,
+ ) or die "failed to connect to socket $port:$!";
+ }
+ elsif ($listen_sock->sockdomain == AF_UNIX) {
+ $port = $listen_sock->hostpath;
+ $client = IO::Socket::UNIX->new(
+ Peer => $port,
+ timeout => 3,
+ ) or die "failed to connect to socket $port:$!";
+ }
+ else {
+ die "unknown socket";
+ }
+
+ $client->syswrite("GET / HTTP/1.0\015\012\015\012");
+ $client->sysread(my $buf, 1024);
+ like $buf, qr/Starlet/;
+ like $buf, qr/HELLO $port/;
+}
+
+done_testing();
+
+kill 'TERM', $pid;
+waitpid($pid,0);