The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Gearman::Client::Async::Connection;
use strict;
use warnings;

use Danga::Socket;
use base 'Danga::Socket';
use fields (
            'state',       # one of 3 state constants below
            'waiting',     # hashref of $handle -> [ Task+ ]
            'need_handle', # arrayref of Gearman::Task objects which
                           # have been submitted but need handles.
            'parser',      # parser object
            'hostspec',    # scalar: "host:ip"
            'deadtime',    # unixtime we're marked dead until.
            'task2handle', # hashref of stringified Task -> scalar handle
            'on_ready',    # arrayref of on_ready callbacks to run on connect success
            'on_error',    # arrayref of on_error callbacks to run on connect failure
            't_offline',   # bool: fake being off the net for purposes of connecting, to force timeout
            );

our $T_ON_TIMEOUT;

use constant S_DISCONNECTED => \ "disconnected";
use constant S_CONNECTING   => \ "connecting";
use constant S_READY        => \ "ready";

use Carp qw(croak);
use Gearman::Task;
use Gearman::Util;
use Scalar::Util qw(weaken);

use IO::Handle;
use Socket qw(PF_INET IPPROTO_TCP TCP_NODELAY SOL_SOCKET SOCK_STREAM);

sub DEBUGGING () { 0 }

sub new {
    my Gearman::Client::Async::Connection $self = shift;

    my %opts = @_;

    $self = fields::new( $self ) unless ref $self;

    my $hostspec         = delete( $opts{hostspec} ) or
        croak("hostspec required");

    if (ref $hostspec eq 'GLOB') {
        # In this case we have been passed a globref, hopefully a socket that has already
        # been connected to the Gearman server in some way.
        $self->SUPER::new($hostspec);
        $self->{state}       = S_CONNECTING;
        $self->{parser} = Gearman::ResponseParser::Async->new( $self );
        $self->watch_write(1);
    } elsif (ref $hostspec && $hostspec->can("to_inprocess_server")) {
        # In this case we have been passed an object that looks like a Gearman::Server,
        # which we can just call "to_inprocess_server" on to get a socketpair connecting
        # to it.
        my $sock = $hostspec->to_inprocess_server;
        $self->SUPER::new($sock);
        $self->{state}       = S_CONNECTING;
        $self->{parser} = Gearman::ResponseParser::Async->new( $self );
        $self->watch_write(1);
    }else {
        $self->{state}       = S_DISCONNECTED;
    }

    $self->{hostspec}    = $hostspec;
    $self->{waiting}     = {};
    $self->{need_handle} = [];
    $self->{deadtime}    = 0;
    $self->{on_ready}    = [];
    $self->{on_error}    = [];
    $self->{task2handle} = {};

    croak "Unknown parameters: " . join(", ", keys %opts) if %opts;
    return $self;
}

sub close_when_finished {
    my Gearman::Client::Async::Connection $self = shift;
    # FIXME: implement
}

sub hostspec {
    my Gearman::Client::Async::Connection $self = shift;

    return $self->{hostspec};
}

sub connect {
    my Gearman::Client::Async::Connection $self = shift;

    $self->{state} = S_CONNECTING;

    my ($host, $port) = split /:/, $self->{hostspec};
    $port ||= 7003;

    warn "Connecting to $self->{hostspec}\n" if DEBUGGING;

    socket my $sock, PF_INET, SOCK_STREAM, IPPROTO_TCP;
    IO::Handle::blocking($sock, 0);
    setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;

    unless ($sock && defined fileno($sock)) {
        warn( "Error creating socket: $!\n" );
        return undef;
    }

    $self->SUPER::new( $sock );
    $self->{parser} = Gearman::ResponseParser::Async->new( $self );

    eval {
        connect $sock, Socket::sockaddr_in($port, Socket::inet_aton($host));
    };
    if ($@) {
        $self->on_connect_error;
        return;
    }

    Danga::Socket->AddTimer(0.25, sub {
        return unless $self->{state} == S_CONNECTING;
        $T_ON_TIMEOUT->() if $T_ON_TIMEOUT;
        $self->on_connect_error;
    });

    # unless we're faking being offline for the test suite, connect and watch
    # for writabilty so we know the connect worked...
    unless ($self->{t_offline}) {
        $self->watch_write(1);
    }
}

sub event_write {
    my Gearman::Client::Async::Connection $self = shift;

    if ($self->{state} == S_CONNECTING) {
        $self->{state} = S_READY;
        $self->watch_read(1);
        warn "$self->{hostspec} connected and ready.\n" if DEBUGGING;
        $_->() foreach @{$self->{on_ready}};
        $self->destroy_callbacks;
    }

    $self->watch_write(0) if $self->write(undef);
}

sub destroy_callbacks {
    my Gearman::Client::Async::Connection $self = shift;
    $self->{on_ready} = [];
    $self->{on_error} = [];
}

sub event_read {
    my Gearman::Client::Async::Connection $self = shift;

    my $input = $self->read( 128 * 1024 );
    unless (defined $input) {
        $self->mark_dead if $self->stuff_outstanding;
        $self->close( "EOF" );
        return;
    }

    $self->{parser}->parse_data( $input );
}

sub event_err {
    my Gearman::Client::Async::Connection $self = shift;

    my $was_connecting = ($self->{state} == S_CONNECTING);

    if ($was_connecting && $self->{t_offline}) {
        $self->SUPER::close( "error" );
        return;
    }

    $self->mark_dead;
    $self->close( "error" );
    $self->on_connect_error if $was_connecting;
}

sub on_connect_error {
    my Gearman::Client::Async::Connection $self = shift;
    warn "Jobserver, $self->{hostspec} ($self) has failed to connect properly\n" if DEBUGGING;

    $self->mark_dead;
    $self->close( "error" );
    $_->() foreach @{$self->{on_error}};
    $self->destroy_callbacks;
}

sub close {
    my Gearman::Client::Async::Connection $self = shift;
    my $reason = shift;

    if ($self->{state} != S_DISCONNECTED) {
        $self->{state} = S_DISCONNECTED;
        $self->SUPER::close( $reason );
    }

    $self->_requeue_all;
}

sub mark_dead {
    my Gearman::Client::Async::Connection $self = shift;
    $self->{deadtime} = time + 10;
    warn "$self->{hostspec} marked dead for a bit." if DEBUGGING;
}

sub alive {
    my Gearman::Client::Async::Connection $self = shift;
    return $self->{deadtime} <= time;
}

sub add_task {
    my Gearman::Client::Async::Connection $self = shift;
    my Gearman::Task $task = shift;

    Carp::confess("add_task called when in wrong state")
        unless $self->{state} == S_READY;

    warn "writing task $task to $self->{hostspec}\n" if DEBUGGING;

    $self->write( $task->pack_submit_packet );
    push @{$self->{need_handle}}, $task;
    Scalar::Util::weaken($self->{need_handle}->[-1]);
}

sub stuff_outstanding {
    my Gearman::Client::Async::Connection $self = shift;
    return
        @{$self->{need_handle}} ||
        %{$self->{waiting}};
}

sub _requeue_all {
    my Gearman::Client::Async::Connection $self = shift;

    my $need_handle = $self->{need_handle};
    my $waiting     = $self->{waiting};

    $self->{need_handle} = [];
    $self->{waiting}     = {};

    while (@$need_handle) {
        my $task = shift @$need_handle;
        warn "Task $task in need_handle queue during socket error, queueing for redispatch\n" if DEBUGGING;
        $task->fail if $task;
    }

    while (my ($shandle, $tasklist) = each( %$waiting )) {
        foreach my $task (@$tasklist) {
            warn "Task $task ($shandle) in waiting queue during socket error, queueing for redispatch\n" if DEBUGGING;
            $task->fail;
        }
    }
}

sub process_packet {
    my Gearman::Client::Async::Connection $self = shift;
    my $res = shift;

    warn "Got packet '$res->{type}' from $self->{hostspec}\n" if DEBUGGING;

    if ($res->{type} eq "job_created") {

        die "Um, got an unexpected job_created notification" unless @{ $self->{need_handle} };
        my Gearman::Task $task = shift @{ $self->{need_handle} } or
            return 1;


        my $shandle = ${ $res->{'blobref'} };
        if ($task) {
            $self->{task2handle}{"$task"} = $shandle;
            push @{ $self->{waiting}->{$shandle} ||= [] }, $task;
        }
        return 1;
    }

    if ($res->{type} eq "work_fail") {
        my $shandle = ${ $res->{'blobref'} };
        $self->_fail_jshandle($shandle);
        return 1;
    }

    if ($res->{type} eq "work_complete") {
        ${ $res->{'blobref'} } =~ s/^(.+?)\0//
            or die "Bogus work_complete from server";
        my $shandle = $1;

        my $task_list = $self->{waiting}{$shandle} or
            return;

        my Gearman::Task $task = shift @$task_list or
            return;

        $task->complete($res->{'blobref'});

        unless (@$task_list) {
            delete $self->{waiting}{$shandle};
            delete $self->{task2handle}{"$task"};
        }

        warn "Jobs: " . scalar( keys( %{$self->{waiting}} ) ) . "\n" if DEBUGGING;

        return 1;
    }

    if ($res->{type} eq "work_status") {
        my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });

        my $task_list = $self->{waiting}{$shandle} or
            return;

        foreach my Gearman::Task $task (@$task_list) {
            $task->status($nu, $de);
        }

        return 1;
    }

    die "Unknown/unimplemented packet type: $res->{type}";

}

sub give_up_on {
    my Gearman::Client::Async::Connection $self = shift;
    my $task = shift;

    my $shandle = $self->{task2handle}{"$task"} or return;
    my $task_list = $self->{waiting}{$shandle} or return;
    @$task_list = grep { $_ != $task } @$task_list;
    unless (@$task_list) {
        delete $self->{waiting}{$shandle};
    }

}

# note the failure of a task given by its jobserver-specific handle
sub _fail_jshandle {
    my Gearman::Client::Async::Connection $self = shift;
    my $shandle = shift;

    my $task_list = $self->{waiting}->{$shandle} or
        return;

    my Gearman::Task $task = shift @$task_list or
        return;

    # cleanup
    unless (@$task_list) {
        delete $self->{task2handle}{"$task"};
        delete $self->{waiting}{$shandle};
    }

    $task->fail;
}

sub get_in_ready_state {
    my ($self, $on_ready, $on_error) = @_;

    if ($self->{state} == S_READY) {
        $on_ready->();
        return;
    }

    push @{$self->{on_ready}}, $on_ready if $on_ready;
    push @{$self->{on_error}}, $on_error if $on_error;

    $self->connect if $self->{state} == S_DISCONNECTED;
}

sub t_set_offline {
    my ($self, $val) = @_;
    $val = 1 unless defined $val;
    $self->{t_offline} = $val;
}

package Gearman::ResponseParser::Async;

use strict;
use warnings;
use Scalar::Util qw(weaken);

use Gearman::ResponseParser;
use base 'Gearman::ResponseParser';

sub new {
    my $class = shift;

    my $self = $class->SUPER::new;

    $self->{_conn} = shift;
    weaken($self->{_conn});

    return $self;
}

sub on_packet {
    my $self = shift;
    my $packet = shift;

    return unless $self->{_conn};
    $self->{_conn}->process_packet( $packet );
}

sub on_error {
    my $self = shift;

    return unless $self->{_conn};
    $self->{_conn}->mark_unsafe;
    $self->{_conn}->close;
}

1;