package Qudo::Parallel::Manager;
use strict;
use warnings;
use Qudo;
use UNIVERSAL::require;
use Parallel::Prefork::SpareWorkers qw(:status);
use Sub::Throttle qw/throttle/;
use IO::Socket;
our $VERSION = '0.06';
sub new {
my ($class, %args) = @_;
my $max_request_par_child = delete $args{max_request_par_child} || 30;
my $max_workers = delete $args{max_workers} || 1;
my $min_spare_workers = delete $args{min_spare_workers} || 1;
my $max_spare_workers = delete $args{max_spare_workers} || $max_workers;
my $auto_load_worker = delete $args{auto_load_worker} || 1;
my $work_delay = $args{work_delay} || 5;
my $admin = delete $args{admin} || 0;
my $admin_host = delete $args{admin_host} || '127.0.0.1';
my $admin_port = delete $args{admin_port} || 90000;
my $debug = delete $args{debug} || 0;
my $qudo = Qudo->new(%args);
$qudo->manager->register_hooks(qw/Qudo::Hook::Scoreboard/);
my $self = bless {
max_workers => $max_workers,
max_request_par_child => $max_request_par_child,
min_spare_workers => $min_spare_workers,
max_spare_workers => $max_spare_workers,
work_delay => $work_delay,
admin => $admin,
admin_host => $admin_host,
admin_port => $admin_port,
debug => $debug,
qudo => $qudo,
}, $class;
if ($auto_load_worker) {
for my $worker (@{$qudo->{manager_abilities}}) {
$self->debug("Setting up the $worker\n");
$worker->use or die $@
}
}
$self;
}
sub debug {
my ($self, $msg) = @_;
warn $msg if $self->{debug};
}
sub run {
my $self = shift;
$self->debug("START WORKING : $$\n");
my $pm = $self->pm;
my $c_pid = $self->start_admin_port;
while ($pm->signal_received ne 'TERM') {
$pm->start and next;
$self->debug("spawn $$\n");
{
my $manager = $self->{qudo}->manager;
for my $dsn ($manager->shuffled_databases) {
my $db = $manager->driver_for($dsn);
$db->reconnect;
}
my $reqs_before_exit = $self->{max_request_par_child};
local $SIG{TERM} = sub { $reqs_before_exit = 0 };
while ($reqs_before_exit > 0) {
if (throttle(0.5, sub { $manager->work_once })) {
$self->debug("WORK $$\n");
--$reqs_before_exit
} else {
sleep $self->{work_delay};
}
}
}
$self->debug("FINISHED $$\n");
$pm->finish;
}
$pm->wait_all_children;
$self->stop_admin_port($c_pid);
}
sub stop_admin_port {
my ($self, $pid) = @_;
return unless $pid;
kill 'TERM', $pid;
}
sub start_admin_port {
my $self = shift;
return unless $self->{admin};
my $pid = fork();
die "fork failed: $!" unless defined $pid;
return $pid if $pid; # main process
my $admin = IO::Socket::INET->new(
Listen => 5,
LocalAddr => $self->{admin_host},
LocalPort => $self->{admin_port},
Proto => 'tcp',
Type => SOCK_STREAM,
ReuseAddr => 1,
) or die "Cannot open server socket: $!";
while (my $remote = $admin->accept) {
my $status = join ' ', $self->pm->scoreboard->get_statuses;
$remote->print($status);
$remote->close;
}
}
sub pm {
my $self = shift;
$self->{pm} ||= do {
my $pm = Parallel::Prefork::SpareWorkers->new({
max_workers => $self->{max_workers},
min_spare_workers => $self->{min_spare_workers},
max_spare_workers => $self->{max_spare_workers},
trap_signals => {
TERM => 'TERM',
HUP => 'TERM',
},
});
{
no strict 'refs'; ## no critic.
*{"Qudo::Parallel::Manager::Registrar::pm"} = sub { $pm }
}
$pm;
};
}
1;
__END__
=head1 NAME
Qudo::Parallel::Manager - auto control forking manager process.
=head1 SYNOPSIS
use Qudo::Parallel::Manager;
my $manager = Qudo::Parallel::Manager->new(
databases => [+{
dsn => 'dbi:SQLite:/tmp/qudo.db',
username => '',
password => '',
}],
work_delay => 3,
max_workers => 5,
min_spare_workers => 1,
max_spare_workers => 5,
max_request_par_chiled => 30,
auto_load_worker => 1,
admin => 1,
debug => 1,
);
$manager->run; # start fork and work.
# other process. get worker scoreborad.
use IO::Socket::INET;
my $sock = IO::Socket::INET->new(
PeerHost => '127.0.0.1',
PeerPort => 90000,
Proto => 'tcp',
) or die 'can not connect admin port.';
# get scoreborad
# ex) _ . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
my $status = $sock->getline;
$sock->close;
=head1 DESCRIPTION
Qudo::Parallel::Manager is auto control forking manager process.
and get worker scoreborad.
=head1 AUTHOR
Atsushi Kobayashi E<lt>nekokak _at_ gmail _dot_ comE<gt>
=head1 SEE ALSO
L<Qudo>
L<Parallel::Prefork::SpareWorkers>
L<IO::Socket::INET>
=head1 LICENSE
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
=cut