The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Gearman::Taskset;
use version;
$Gearman::Taskset::VERSION = qv("1.130.004");

use strict;
use warnings;

use Socket;

=head1 NAME

Gearman::Taskset - a taskset in Gearman, from the point of view of a client

=head1 SYNOPSIS

    use Gearman::Client;
    my $client = Gearman::Client->new;

    # waiting on a set of tasks in parallel
    my $ts = $client->new_task_set;
    $ts->add_task( "add" => "1+2", {...});
    $ts->wait();


=head1 DESCRIPTION

Gearman::Taskset is a Gearman::Client's representation of tasks queue t in Gearman

=head1 METHODS

=cut

use fields (

    # { handle => [Task, ...] }
    'waiting',

    # Gearman::Client
    'client',

    # arrayref
    'need_handle',

    # default socket (non-merged requests)
    'default_sock',

    # default socket's ip/port
    'default_sockaddr',

    # { hostport => socket }
    'loaned_sock',

    # bool, if taskset has been cancelled mid-processing
    'cancelled',

    # hookname -> coderef
    'hooks',
);

use Carp ();
use Gearman::Util;
use Gearman::ResponseParser::Taskset;

# i thought about weakening taskset's client, but might be too weak.
use Scalar::Util ();
use Time::HiRes  ();

=head2 new($client)

=cut

sub new {
    my $self   = shift;
    my $client = shift;
    ref($client) eq "Gearman::Client"
        || Carp::croak
        "provided client argument is not a Gearman::Client reference";

    unless (ref $self) {
        $self = fields::new($self);
    }

    $self->{waiting}     = {};
    $self->{need_handle} = [];
    $self->{client}      = $client;
    $self->{loaned_sock} = {};
    $self->{cancelled}   = 0;
    $self->{hooks}       = {};

    return $self;
} ## end sub new

sub DESTROY {
    my Gearman::Taskset $ts = shift;

    # During global cleanup this may be called out of order, and the client my not exist in the taskset.
    return unless $ts->{client};

    if ($ts->{default_sock}) {
        $ts->{client}
            ->_put_js_sock($ts->{default_sockaddr}, $ts->{default_sock});
    }

    while (my ($hp, $sock) = each %{ $ts->{loaned_sock} }) {
        $ts->{client}->_put_js_sock($hp, $sock);
    }
} ## end sub DESTROY

=head2 run_hook($name)

run a hook callback if defined

=cut

sub run_hook {
    my Gearman::Taskset $self = shift;
    my $name = shift;
    ($name && $self->{hooks}->{$name}) || return;

    eval { $self->{hooks}->{$name}->(@_) };

    warn "Gearman::Taskset hook '$name' threw error: $@\n" if $@;
} ## end sub run_hook

=head2 add_hook($name)

add a hook

=cut

sub add_hook {
    my Gearman::Taskset $self = shift;
    my $name = shift || return;

    if (@_) {
        $self->{hooks}->{$name} = shift;
    }
    else {
        delete $self->{hooks}->{$name};
    }
} ## end sub add_hook

=head2 client ()

this method is part of the "Taskset" interface, also implemented by
Gearman::Client::Async, where no tasksets make sense, so instead the
Gearman::Client::Async object itself is also its taskset.  (the
client tracks all tasks).  so don't change this, without being aware
of Gearman::Client::Async.  similarly, don't access $ts->{client} without
going via this accessor.

=cut

sub client {
    my Gearman::Taskset $ts = shift;
    return $ts->{client};
}

=head2 cancel()

=cut

sub cancel {
    my Gearman::Taskset $ts = shift;

    $ts->{cancelled} = 1;

    if ($ts->{default_sock}) {
        close($ts->{default_sock});
        $ts->{default_sock} = undef;
    }

    while (my ($hp, $sock) = each %{ $ts->{loaned_sock} }) {
        $sock->close;
    }

    $ts->{waiting}     = {};
    $ts->{need_handle} = [];
    $ts->{client}      = undef;
} ## end sub cancel

#=head2 _get_loaned_sock($hostport)
#
#=cut

sub _get_loaned_sock {
    my Gearman::Taskset $ts = shift;
    my $hostport = shift;
    if (my $sock = $ts->{loaned_sock}{$hostport}) {
        return $sock if $sock->connected;
        delete $ts->{loaned_sock}{$hostport};
    }

    my $sock = $ts->{client}->_get_js_sock($hostport);
    return $ts->{loaned_sock}{$hostport} = $sock;
} ## end sub _get_loaned_sock

=head2 wait()

event loop for reading in replies

=cut

sub wait {
    my Gearman::Taskset $ts = shift;
    my %opts = @_;

    my $timeout;
    if (exists $opts{timeout}) {
        $timeout = delete $opts{timeout};
        $timeout += Time::HiRes::time() if defined $timeout;
    }

    Carp::carp "Unknown options: "
        . join(',', keys %opts)
        . " passed to Taskset->wait."
        if keys %opts;

    my %parser;    # fd -> Gearman::ResponseParser object

    my ($rin, $rout, $eout) = ('', '', '');
    my %watching;

    for my $sock ($ts->{default_sock}, values %{ $ts->{loaned_sock} }) {
        next unless $sock;
        my $fd = $sock->fileno;
        vec($rin, $fd, 1) = 1;
        $watching{$fd} = $sock;
    } ## end for my $sock ($ts->{default_sock...})

    my $tries = 0;
    while (!$ts->{cancelled} && keys %{ $ts->{waiting} }) {
        $tries++;

        my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5;
        my $nfound = select($rout = $rin, undef, $eout = $rin, $time_left)
            ;    # TODO drop the eout.
        if ($timeout && $time_left <= 0) {
            $ts->cancel;
            return;
        }
        next if !$nfound;

        foreach my $fd (keys %watching) {
            next unless vec($rout, $fd, 1);

            # TODO: deal with error vector

            my $sock   = $watching{$fd};
            my $parser = $parser{$fd}
                ||= Gearman::ResponseParser::Taskset->new(
                source  => $sock,
                taskset => $ts
                );
            eval { $parser->parse_sock($sock); };

            if ($@) {

                # TODO this should remove the fd from the list, and reassign any tasks to other jobserver, or bail.
                # We're not in an accessible place here, so if all job servers fail we must die to prevent hanging.
                Carp::croak("Job server failure: $@");
            } ## end if ($@)
        } ## end foreach my $fd (keys %watching)

    } ## end while (!$ts->{cancelled} ...)
} ## end sub wait

=head2 add_task(Gearman::Task)

=head2 add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hr>

C<$opts_hr> see L<Gearman::Task>

=cut

sub add_task {
    my Gearman::Taskset $ts = shift;
    my $task = $ts->client()->_get_task_from_args(@_);

    $task->taskset($ts);

    $ts->run_hook('add_task', $ts, $task);

    my $jssock = $task->{jssock};

    return $task->fail("undefined jssock") unless ($jssock);

    my $req = $task->pack_submit_packet($ts->client);
    my $len = length($req);
    my $rv  = $jssock->syswrite($req, $len);
    $rv ||= 0;
    Carp::croak "Wrote $rv but expected to write $len" unless $rv == $len;

    push @{ $ts->{need_handle} }, $task;
    while (@{ $ts->{need_handle} }) {
        my $rv
            = $ts->_wait_for_packet($jssock, $ts->{client}->{command_timeout});
        if (!$rv) {
            shift @{ $ts->{need_handle} }; # ditch it, it failed.
                                           # this will resubmit it if it failed.
            return $task->fail(
                join(' ',
                    "no rv on waiting for packet",
                    defined($rv) ? $rv : $!)
            );
        } ## end if (!$rv)
    } ## end while (@{ $ts->{need_handle...}})

    return $task->handle;
} ## end sub add_task

#
# _get_default_soc()
# used in Gearman::Task->taskset only
#
sub _get_default_sock {
    my Gearman::Taskset $ts = shift;
    return $ts->{default_sock} if $ts->{default_sock};

    my $getter = sub {
        my $hostport = shift;
        return $ts->{loaned_sock}{$hostport}
            || $ts->{client}->_get_js_sock($hostport);
    };

    my ($jst, $jss) = $ts->{client}->_get_random_js_sock($getter);
    return unless $jss;
    $ts->{loaned_sock}{$jst} ||= $jss;

    $ts->{default_sock}     = $jss;
    $ts->{default_sockaddr} = $jst;
    return $jss;
} ## end sub _get_default_sock

#
#  _get_hashed_sock($hv)
#
# only used in Gearman::Task->taskset only
#
# return a socket
sub _get_hashed_sock {
    my Gearman::Taskset $ts = shift;
    my $hv = shift;

    my $cl = $ts->client;
    my $sock;
    for (my $off = 0; $off < $cl->{js_count}; $off++) {
        my $idx = ($hv + $off) % ($cl->{js_count});
        $sock = $ts->_get_loaned_sock($cl->{job_servers}[$idx]);
        last;
    }

    return $sock;
} ## end sub _get_hashed_sock

#
#  _wait_for_packet($sock, $timeout)
#
# returns boolean when given a sock to wait on.
# otherwise, return value is undefined.
sub _wait_for_packet {
    my Gearman::Taskset $ts = shift;

    # socket to singularly read from
    my $sock    = shift;
    my $timeout = shift;

    my ($res, $err);
    $res = Gearman::Util::read_res_packet($sock, \$err, $timeout);

    return $res ? $ts->process_packet($res, $sock) : 0;
} ## end sub _wait_for_packet

#
# _is_port($sock)
#
# return hostport || ipport
#
sub _ip_port {
    my ($self, $sock) = @_;
    $sock || return;

    my $pn = getpeername($sock);
    $pn || return;

    # look for a hostport in loaned_sock
    my $hostport;
    while (my ($hp, $s) = each %{ $self->{loaned_sock} }) {
        $s || next;
        if ($sock == $s) {
            $hostport = $hp;
            last;
        }
    } ## end while (my ($hp, $s) = each...)

    # hopefully it solves client->get_status mismatch
    $hostport && return $hostport;

    my ($port, $iaddr) = Socket::sockaddr_in($pn);
    return join ':', Socket::inet_ntoa($iaddr), $port;
} ## end sub _ip_port

#
# _fail_jshandle($shandle)
#
# note the failure of a task given by its jobserver-specific handle
#
sub _fail_jshandle {
    my Gearman::Taskset $ts = shift;
    my $shandle = shift;
    $shandle
        or Carp::croak sprintf
        "_fail_jshandle() called without shandle parameter";

    my $task_list = $ts->{waiting}{$shandle}
        or Carp::croak "Uhhhh:  got work_fail for unknown handle: $shandle";

    my $task = shift @$task_list;
    ($task && ref($task) eq "Gearman::Task")
        or Carp::croak
        "Uhhhh:  task_list is empty on work_fail for handle $shandle\n";

    $task->fail("jshandle fail");
    delete $ts->{waiting}{$shandle} unless @$task_list;
} ## end sub _fail_jshandle

=head2 process_packet($res, $sock)

=cut

sub process_packet {
    my Gearman::Taskset $ts = shift;
    my ($res, $sock) = @_;

    if ($res->{type} eq "job_created") {
        my $task = shift @{ $ts->{need_handle} };
        ($task && ref($task) eq "Gearman::Task")
            or Carp::croak "Um, got an unexpected job_created notification";
        my $shandle = ${ $res->{'blobref'} };
        my $ipport  = $ts->_ip_port($sock);

        # did sock become disconnected in the meantime?
        if (!$ipport) {
            $ts->_fail_jshandle($shandle);
            return 1;
        }

        $task->handle("$ipport//$shandle");
        return 1 if $task->{background};
        push @{ $ts->{waiting}{$shandle} ||= [] }, $task;
        return 1;
    } ## end if ($res->{type} eq "job_created")

    if ($res->{type} eq "work_fail") {
        my $shandle = ${ $res->{'blobref'} };
        $ts->_fail_jshandle($shandle);
        return 1;
    }

    my $qr = qr/(.+?)\0/;

    if ($res->{type} eq "work_complete") {
        (${ $res->{'blobref'} } =~ /^$qr/)
            or Carp::croak "Bogus work_complete from server";
        ${ $res->{'blobref'} } =~ s/^$qr//;
        my $shandle = $1;

        my $task_list = $ts->{waiting}{$shandle}
            or Carp::croak
            "Uhhhh:  got work_complete for unknown handle: $shandle\n";

        my $task = shift @$task_list;
        ($task && ref($task) eq "Gearman::Task")
            or Carp::croak
            "Uhhhh:  task_list is empty on work_complete for handle $shandle\n";

        $task->complete($res->{'blobref'});
        delete $ts->{waiting}{$shandle} unless @$task_list;

        return 1;
    } ## end if ($res->{type} eq "work_complete")

    if ($res->{type} eq "work_exception") {

        # ${ $res->{'blobref'} } =~ s/^(.+?)\0//
        #     or Carp::croak "Bogus work_exception from server";

        (${ $res->{'blobref'} } =~ /^$qr/)
            or Carp::croak "Bogus work_exception from server";
        ${ $res->{'blobref'} } =~ s/^$qr//;
        my $shandle = $1;

        my $task_list = $ts->{waiting}{$shandle}
            or Carp::croak
            "Uhhhh:  got work_exception for unknown handle: $shandle\n";

        my $task = $task_list->[0];
        ($task && ref($task) eq "Gearman::Task")
            or Carp::croak
            "Uhhhh:  task_list is empty on work_exception for handle $shandle\n";

        $task->exception($res->{'blobref'});

        return 1;
    } ## end if ($res->{type} eq "work_exception")

    if ($res->{type} eq "work_status") {
        my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });

        my $task_list = $ts->{waiting}{$shandle}
            or Carp::croak
            "Uhhhh:  got work_status for unknown handle: $shandle\n";

        # FIXME: the server is (probably) sending a work_status packet for each
        # interested client, even if the clients are the same, so probably need
        # to fix the server not to do that.  just put this FIXME here for now,
        # though really it's a server issue.
        foreach my $task (@$task_list) {
            $task->status($nu, $de);
        }

        return 1;
    } ## end if ($res->{type} eq "work_status")

    Carp::croak
        "Unknown/unimplemented packet type: $res->{type} [${$res->{blobref}}]";
} ## end sub process_packet

1;