package Gearman::Client::Async::Connection;
use strict;
use warnings;
use Danga::Socket;
use base 'Danga::Socket';
use fields (
'state', # one of 3 state constants below
'waiting', # hashref of $handle -> [ Task+ ]
'need_handle', # arrayref of Gearman::Task objects which
# have been submitted but need handles.
'parser', # parser object
'hostspec', # scalar: "host:ip"
'deadtime', # unixtime we're marked dead until.
'task2handle', # hashref of stringified Task -> scalar handle
'on_ready', # arrayref of on_ready callbacks to run on connect success
'on_error', # arrayref of on_error callbacks to run on connect failure
't_offline', # bool: fake being off the net for purposes of connecting, to force timeout
);
our $T_ON_TIMEOUT;
use constant S_DISCONNECTED => \ "disconnected";
use constant S_CONNECTING => \ "connecting";
use constant S_READY => \ "ready";
use Carp qw(croak);
use Gearman::Task;
use Gearman::Util;
use Scalar::Util qw(weaken);
use IO::Handle;
use Socket qw(PF_INET IPPROTO_TCP TCP_NODELAY SOL_SOCKET SOCK_STREAM);
sub DEBUGGING () { 0 }
sub new {
my Gearman::Client::Async::Connection $self = shift;
my %opts = @_;
$self = fields::new( $self ) unless ref $self;
my $hostspec = delete( $opts{hostspec} ) or
croak("hostspec required");
if (ref $hostspec eq 'GLOB') {
# In this case we have been passed a globref, hopefully a socket that has already
# been connected to the Gearman server in some way.
$self->SUPER::new($hostspec);
$self->{state} = S_CONNECTING;
$self->{parser} = Gearman::ResponseParser::Async->new( $self );
$self->watch_write(1);
} elsif (ref $hostspec && $hostspec->can("to_inprocess_server")) {
# In this case we have been passed an object that looks like a Gearman::Server,
# which we can just call "to_inprocess_server" on to get a socketpair connecting
# to it.
my $sock = $hostspec->to_inprocess_server;
$self->SUPER::new($sock);
$self->{state} = S_CONNECTING;
$self->{parser} = Gearman::ResponseParser::Async->new( $self );
$self->watch_write(1);
}else {
$self->{state} = S_DISCONNECTED;
}
$self->{hostspec} = $hostspec;
$self->{waiting} = {};
$self->{need_handle} = [];
$self->{deadtime} = 0;
$self->{on_ready} = [];
$self->{on_error} = [];
$self->{task2handle} = {};
croak "Unknown parameters: " . join(", ", keys %opts) if %opts;
return $self;
}
sub close_when_finished {
my Gearman::Client::Async::Connection $self = shift;
# FIXME: implement
}
sub hostspec {
my Gearman::Client::Async::Connection $self = shift;
return $self->{hostspec};
}
sub connect {
my Gearman::Client::Async::Connection $self = shift;
$self->{state} = S_CONNECTING;
my ($host, $port) = split /:/, $self->{hostspec};
$port ||= 7003;
warn "Connecting to $self->{hostspec}\n" if DEBUGGING;
socket my $sock, PF_INET, SOCK_STREAM, IPPROTO_TCP;
IO::Handle::blocking($sock, 0);
setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
unless ($sock && defined fileno($sock)) {
warn( "Error creating socket: $!\n" );
return undef;
}
$self->SUPER::new( $sock );
$self->{parser} = Gearman::ResponseParser::Async->new( $self );
eval {
connect $sock, Socket::sockaddr_in($port, Socket::inet_aton($host));
};
if ($@) {
$self->on_connect_error;
return;
}
Danga::Socket->AddTimer(0.25, sub {
return unless $self->{state} == S_CONNECTING;
$T_ON_TIMEOUT->() if $T_ON_TIMEOUT;
$self->on_connect_error;
});
# unless we're faking being offline for the test suite, connect and watch
# for writabilty so we know the connect worked...
unless ($self->{t_offline}) {
$self->watch_write(1);
}
}
sub event_write {
my Gearman::Client::Async::Connection $self = shift;
if ($self->{state} == S_CONNECTING) {
$self->{state} = S_READY;
$self->watch_read(1);
warn "$self->{hostspec} connected and ready.\n" if DEBUGGING;
$_->() foreach @{$self->{on_ready}};
$self->destroy_callbacks;
}
$self->watch_write(0) if $self->write(undef);
}
sub destroy_callbacks {
my Gearman::Client::Async::Connection $self = shift;
$self->{on_ready} = [];
$self->{on_error} = [];
}
sub event_read {
my Gearman::Client::Async::Connection $self = shift;
my $input = $self->read( 128 * 1024 );
unless (defined $input) {
$self->mark_dead if $self->stuff_outstanding;
$self->close( "EOF" );
return;
}
$self->{parser}->parse_data( $input );
}
sub event_err {
my Gearman::Client::Async::Connection $self = shift;
my $was_connecting = ($self->{state} == S_CONNECTING);
if ($was_connecting && $self->{t_offline}) {
$self->SUPER::close( "error" );
return;
}
$self->mark_dead;
$self->close( "error" );
$self->on_connect_error if $was_connecting;
}
sub on_connect_error {
my Gearman::Client::Async::Connection $self = shift;
warn "Jobserver, $self->{hostspec} ($self) has failed to connect properly\n" if DEBUGGING;
$self->mark_dead;
$self->close( "error" );
$_->() foreach @{$self->{on_error}};
$self->destroy_callbacks;
}
sub close {
my Gearman::Client::Async::Connection $self = shift;
my $reason = shift;
if ($self->{state} != S_DISCONNECTED) {
$self->{state} = S_DISCONNECTED;
$self->SUPER::close( $reason );
}
$self->_requeue_all;
}
sub mark_dead {
my Gearman::Client::Async::Connection $self = shift;
$self->{deadtime} = time + 10;
warn "$self->{hostspec} marked dead for a bit." if DEBUGGING;
}
sub alive {
my Gearman::Client::Async::Connection $self = shift;
return $self->{deadtime} <= time;
}
sub add_task {
my Gearman::Client::Async::Connection $self = shift;
my Gearman::Task $task = shift;
Carp::confess("add_task called when in wrong state")
unless $self->{state} == S_READY;
warn "writing task $task to $self->{hostspec}\n" if DEBUGGING;
$self->write( $task->pack_submit_packet );
push @{$self->{need_handle}}, $task;
Scalar::Util::weaken($self->{need_handle}->[-1]);
}
sub stuff_outstanding {
my Gearman::Client::Async::Connection $self = shift;
return
@{$self->{need_handle}} ||
%{$self->{waiting}};
}
sub _requeue_all {
my Gearman::Client::Async::Connection $self = shift;
my $need_handle = $self->{need_handle};
my $waiting = $self->{waiting};
$self->{need_handle} = [];
$self->{waiting} = {};
while (@$need_handle) {
my $task = shift @$need_handle;
warn "Task $task in need_handle queue during socket error, queueing for redispatch\n" if DEBUGGING;
$task->fail if $task;
}
while (my ($shandle, $tasklist) = each( %$waiting )) {
foreach my $task (@$tasklist) {
warn "Task $task ($shandle) in waiting queue during socket error, queueing for redispatch\n" if DEBUGGING;
$task->fail;
}
}
}
sub process_packet {
my Gearman::Client::Async::Connection $self = shift;
my $res = shift;
warn "Got packet '$res->{type}' from $self->{hostspec}\n" if DEBUGGING;
if ($res->{type} eq "job_created") {
die "Um, got an unexpected job_created notification" unless @{ $self->{need_handle} };
my Gearman::Task $task = shift @{ $self->{need_handle} } or
return 1;
my $shandle = ${ $res->{'blobref'} };
if ($task) {
$self->{task2handle}{"$task"} = $shandle;
push @{ $self->{waiting}->{$shandle} ||= [] }, $task;
}
return 1;
}
if ($res->{type} eq "work_fail") {
my $shandle = ${ $res->{'blobref'} };
$self->_fail_jshandle($shandle);
return 1;
}
if ($res->{type} eq "work_complete") {
${ $res->{'blobref'} } =~ s/^(.+?)\0//
or die "Bogus work_complete from server";
my $shandle = $1;
my $task_list = $self->{waiting}{$shandle} or
return;
my Gearman::Task $task = shift @$task_list or
return;
$task->complete($res->{'blobref'});
unless (@$task_list) {
delete $self->{waiting}{$shandle};
delete $self->{task2handle}{"$task"};
}
warn "Jobs: " . scalar( keys( %{$self->{waiting}} ) ) . "\n" if DEBUGGING;
return 1;
}
if ($res->{type} eq "work_status") {
my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });
my $task_list = $self->{waiting}{$shandle} or
return;
foreach my Gearman::Task $task (@$task_list) {
$task->status($nu, $de);
}
return 1;
}
die "Unknown/unimplemented packet type: $res->{type}";
}
sub give_up_on {
my Gearman::Client::Async::Connection $self = shift;
my $task = shift;
my $shandle = $self->{task2handle}{"$task"} or return;
my $task_list = $self->{waiting}{$shandle} or return;
@$task_list = grep { $_ != $task } @$task_list;
unless (@$task_list) {
delete $self->{waiting}{$shandle};
}
}
# note the failure of a task given by its jobserver-specific handle
sub _fail_jshandle {
my Gearman::Client::Async::Connection $self = shift;
my $shandle = shift;
my $task_list = $self->{waiting}->{$shandle} or
return;
my Gearman::Task $task = shift @$task_list or
return;
# cleanup
unless (@$task_list) {
delete $self->{task2handle}{"$task"};
delete $self->{waiting}{$shandle};
}
$task->fail;
}
sub get_in_ready_state {
my ($self, $on_ready, $on_error) = @_;
if ($self->{state} == S_READY) {
$on_ready->();
return;
}
push @{$self->{on_ready}}, $on_ready if $on_ready;
push @{$self->{on_error}}, $on_error if $on_error;
$self->connect if $self->{state} == S_DISCONNECTED;
}
sub t_set_offline {
my ($self, $val) = @_;
$val = 1 unless defined $val;
$self->{t_offline} = $val;
}
package Gearman::ResponseParser::Async;
use strict;
use warnings;
use Scalar::Util qw(weaken);
use Gearman::ResponseParser;
use base 'Gearman::ResponseParser';
sub new {
my $class = shift;
my $self = $class->SUPER::new;
$self->{_conn} = shift;
weaken($self->{_conn});
return $self;
}
sub on_packet {
my $self = shift;
my $packet = shift;
return unless $self->{_conn};
$self->{_conn}->process_packet( $packet );
}
sub on_error {
my $self = shift;
return unless $self->{_conn};
$self->{_conn}->mark_unsafe;
$self->{_conn}->close;
}
1;