The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Gearman::Client;
use version ();
$Gearman::Client::VERSION = version->declare("2.003.001");

use strict;
use warnings;

=head1 NAME

Gearman::Client - Client for gearman distributed job system

=head1 SYNOPSIS

    use Gearman::Client;
    my $client = Gearman::Client->new;
    $client->job_servers(
      '127.0.0.1',
      {
        ca_certs  => ...,
        cert_file  => ...,
        host      => '10.0.0.1',
        key_file   => ...,
        port      => 4733,
        socket_cb => sub {...},
        use_ssl   => 1,
      }
    );

    # running a single task
    my $result_ref = $client->do_task("add", "1+2");
    print "1 + 2 = $$result_ref\n";

    # waiting on a set of tasks in parallel
    my $taskset = $client->new_task_set;
    $taskset->add_task( "add" => "1+2", {
       on_complete => sub { ... }
    });
    $taskset->add_task( "divide" => "5/0", {
       on_fail => sub { print "divide by zero error!\n"; },
    });
    $taskset->wait;


=head1 DESCRIPTION

I<Gearman::Client> is a client class for the Gearman distributed job
system, providing a framework for sending jobs to one or more Gearman
servers.  These jobs are then distributed out to a farm of workers.

Callers instantiate a I<Gearman::Client> object and from it dispatch
single tasks, sets of tasks, or check on the status of tasks.

=head1 USAGE

=head2 Gearman::Client->new(%options)

Creates a new I<Gearman::Client> object, and returns the object.

If I<%options> is provided, initializes the new client object with the
settings in I<%options>, which can contain:

=over 4

=item * job_servers

Calls I<job_servers> (see below) to initialize the list of job
servers.  Value in this case should be an arrayref.

=item * prefix

Calls I<prefix> (see below) to set the prefix / namespace.

=back

=head2 $client->job_servers(@servers)

Initializes the client I<$client> with the list of job servers in I<@servers>.
I<@servers> should contain a list of IP addresses, with optional port
numbers. For example:

    $client->job_servers('127.0.0.1', '192.168.1.100:4730');

If the port number is not provided, C<4730> is used as the default.

=head2 $client-E<gt>do_task($task)

=head2 $client-E<gt>do_task($funcname, $arg, \%options)

Dispatches a task and waits on the results.  May either provide a
L<Gearman::Task> object, or the 3 arguments that the Gearman::Task
constructor takes.

Returns a scalar reference to the result, or undef on failure.

If you provide on_complete and on_fail handlers, they're ignored, as
this function currently overrides them.

=head2 $client-E<gt>dispatch_background($task)

=head2 $client-E<gt>dispatch_background($funcname, $arg, \%options)

Dispatches a task and doesn't wait for the result. Return value
is an opaque scalar that can be used to refer to the task with get_status.

=head2 $taskset = $client-E<gt>new_task_set

Creates and returns a new L<Gearman::Taskset> object.

=head2 $taskset-E<gt>add_task($task)

=head2 $taskset-E<gt>add_task($funcname, $arg, $uniq)

=head2 $taskset-E<gt>add_task($funcname, $arg, \%options)

Adds a task to a taskset.  Three different calling conventions are
available.

=head2 $taskset-E<gt>wait

Waits for a response from the job server for any of the tasks listed
in the taskset. Will call the I<on_*> handlers for each of the tasks
that have been completed, updated, etc.  Doesn't return until
everything has finished running or failing.

=head2 $client-E<gt>prefix($prefix)

Sets the namespace / prefix for the function names.

See L<Gearman::Worker> for more details.


=head1 EXAMPLES

=head2 Summation

This is an example client that sends off a request to sum up a list of
integers.

    use Gearman::Client;
    use Storable qw( freeze );
    my $client = Gearman::Client->new;
    $client->job_servers('127.0.0.1');
    my $tasks = $client->new_task_set;
    my $handle = $tasks->add_task(sum => freeze([ 3, 5 ]), {
        on_complete => sub { print ${ $_[0] }, "\n" }
    });
    $tasks->wait;

See the L<Gearman::Worker> documentation for the worker for the I<sum>
function.

=cut

use base 'Gearman::Objects';

use fields (
    'sock_info',    # hostport -> hashref
    'hooks',        # hookname -> coderef
    'exceptions',
    'backoff_max',

    # maximum time a gearman command should take to get a result (not a job timeout)
    'command_timeout',
);

use Carp;
use Gearman::Task;
use Gearman::Taskset;
use Gearman::JobStatus;
use Time::HiRes;

sub new {
    my ($self, %opts) = @_;
    unless (ref $self) {
        $self = fields::new($self);
    }

    $self->SUPER::new(%opts);

    $self->{hooks}           = {};
    $self->{exceptions}      = 0;
    $self->{backoff_max}     = 90;
    $self->{command_timeout} = 30;

    $self->{exceptions} = delete $opts{exceptions}
        if exists $opts{exceptions};

    $self->{backoff_max} = $opts{backoff_max}
        if defined $opts{backoff_max};

    $self->{command_timeout} = $opts{command_timeout}
        if defined $opts{command_timeout};

    return $self;
} ## end sub new

=head1 METHODS

=head2 new_task_set()

B<return> Gearman::Taskset

=cut

sub new_task_set {
    my $self    = shift;
    my $taskset = Gearman::Taskset->new($self);
    $self->run_hook('new_task_set', $self, $taskset);
    return $taskset;
} ## end sub new_task_set

#
# _job_server_status_command($command, $each_line_sub)
# $command e.g. "status\n".
# $each_line_sub A sub to be called on each line of response;
#                takes $hostport and the $line as args.
#
sub _job_server_status_command {
    my ($self, $command, $each_line_sub) = (shift, shift, shift);

    my $list
        = scalar(@_)
        ? $self->canonicalize_job_servers(@_)
        : $self->job_servers();
    my %js_map = map { $self->_js_str($_) => 1 } $self->job_servers();

    foreach my $js (@{$list}) {
        defined($js_map{ $self->_js_str($js) }) || next;

        my $sock = $self->_get_js_sock($js)
            or next;

        my $rv = $sock->write($command);

        my $err;
        my @lines = Gearman::Util::read_text_status($sock, \$err);
        if ($err) {

            #TODO warn
            next;
        }

        foreach my $l (@lines) {
            $each_line_sub->($js, $l);
        }

        $self->_sock_cache($js, $sock);
    } ## end foreach my $js (@{$list})
} ## end sub _job_server_status_command

=head2 get_job_server_status()

B<return> {job => {capable, queued, running}}

=cut

sub get_job_server_status {
    my $self = shift;

    my $js_status = {};
    $self->_job_server_status_command(
        "status\n",
        sub {
            my ($hostport, $line) = @_;

            unless ($line =~ /^(\S+)\s+(\d+)\s+(\d+)\s+(\d+)$/) {
                return;
            }

            my ($job, $queued, $running, $capable) = ($1, $2, $3, $4);
            $js_status->{$hostport}->{$job} = {
                queued  => $queued,
                running => $running,
                capable => $capable,
            };
        },
        @_
    );
    return $js_status;
} ## end sub get_job_server_status

=head2 get_job_server_jobs()

supported only by L<Gearman::Server>

B<return> {job => {address, listeners, key}}

=cut

sub get_job_server_jobs {
    my $self    = shift;
    my $js_jobs = {};
    $self->_job_server_status_command(
        "jobs\n",
        sub {
            my ($hostport, $line) = @_;

            # Yes, the unique key is sometimes omitted.
            return unless $line =~ /^(\S+)\s+(\S*)\s+(\S+)\s+(\d+)$/;

            my ($job, $key, $address, $listeners) = ($1, $2, $3, $4);
            $js_jobs->{$hostport}->{$job} = {
                key       => $key,
                address   => $address,
                listeners => $listeners,
            };
        },
        @_
    );
    return $js_jobs;
} ## end sub get_job_server_jobs

=head2 get_job_server_clients()

supported only by L<Gearman::Server>

=cut

sub get_job_server_clients {
    my $self = shift;

    my $js_clients = {};
    my $client;
    $self->_job_server_status_command(
        "clients\n",
        sub {
            my ($hostport, $line) = @_;

            if ($line =~ /^(\S+)$/) {
                $client = $1;
                $js_clients->{$hostport}->{$client} ||= {};
            }
            elsif ($client && $line =~ /^\s+(\S+)\s+(\S*)\s+(\S+)$/) {
                my ($job, $key, $address) = ($1, $2, $3);
                $js_clients->{$hostport}->{$client}->{$job} = {
                    key     => $key,
                    address => $address,
                };
            } ## end elsif ($client && $line =~...)
        },
        @_
    );

    return $js_clients;
} ## end sub get_job_server_clients

#
# _get_task_from_args
#
sub _get_task_from_args {
    my $self = shift;
    my $task;
    if (ref $_[0]) {
        $task = shift;
        $task->isa("Gearman::Task")
            || Carp::croak("Argument isn't a Gearman::Task");
    }
    else {
        my $func   = shift;
        my $arg_p  = shift;
        my $opts   = shift;
        my $argref = ref $arg_p ? $arg_p : \$arg_p;
        Carp::croak("Function argument must be scalar or scalarref")
            unless ref $argref eq "SCALAR";

        $task = Gearman::Task->new($func, $argref, $opts);
    } ## end else [ if (ref $_[0]) ]
    return $task;

} ## end sub _get_task_from_args

=head2 do_task($task)

=head2 do_task($funcname, $arg, \%options)

given a (func, arg_p, opts?)

B<return> scalarref of WORK_COMPLETE result

=cut

sub do_task {
    my $self = shift;
    my $task = $self->_get_task_from_args(@_);
    my $ret     = undef;

    $task->{on_complete} = sub {
        $ret = shift;
    };

    my $ts = $self->new_task_set;
    $ts->add_task($task);
    $ts->wait(timeout => $task->timeout);

    return $ret;
} ## end sub do_task

=head2 dispatch_background($func, $arg_p, $opts)

=head2 dispatch_background($task)

dispatches job in background

return the handle from the jobserver, or undef on failure

=cut

sub dispatch_background {
    my $self = shift;
    my $task = $self->_get_task_from_args(@_);

    $task->{background} = 1;

    my $ts = $self->new_task_set;
    return $ts->add_task($task);
} ## end sub dispatch_background

=head2 run_hook($name)

run a hook callback if defined

=cut

sub run_hook {
    my ($self, $hookname) = @_;
    $hookname || return;

    my $hook = $self->{hooks}->{$hookname};
    return unless $hook;

    eval { $hook->(@_) };

    warn "Gearman::Client hook '$hookname' threw error: $@\n" if $@;
} ## end sub run_hook

=head2 add_hook($name, $cb)

add a hook

=cut

sub add_hook {
    my ($self, $hookname) = (shift, shift);
    $hookname || return;

    if (@_) {
        $self->{hooks}->{$hookname} = shift;
    }
    else {
        delete $self->{hooks}->{$hookname};
    }
} ## end sub add_hook

=head2 get_status($handle)

The Gearman Server will assign a scalar job handle when you request a 
background job with dispatch_background. Save this scalar, and use it later in 
order to request the status of this job. 

B<return> L<Gearman::JobStatus> on success

=cut

sub get_status {
    my ($self, $handle) = @_;
    $handle || return;

    my ($js_str, $shandle) = split(m!//!, $handle);

    #TODO simple check for $js_str in job_server doesn't work if
    # $js_str is not contained in job_servers
    # job_servers = ["localhost:4730"]
    # handle = 127.0.0.1:4730//H:...
    #
    # hopefully commit 58e2aa5 solves this TODO

    my $js = $self->_js($js_str);
    $js || return;

    my $sock = $self->_get_js_sock($js);
    $sock || return;

    my $req = Gearman::Util::pack_req_command("get_status", $shandle);
    my $len = length($req);
    my $rv  = $sock->write($req, $len);
    my $err;
    my $res = Gearman::Util::read_res_packet($sock, \$err);

    if ($res && $res->{type} eq "error") {
        Carp::croak
            "Error packet from server after get_status: ${$res->{blobref}}\n";
    }

    return undef unless $res && $res->{type} eq "status_res";

    my @args = split(/\0/, ${ $res->{blobref} });

    #FIXME returns on '', 0
    $args[0] || return;

    shift @args;
    $self->_sock_cache($js_str, $sock);

    return Gearman::JobStatus->new(@args);
} ## end sub get_status

#
# _option_request($sock, $option)
#
sub _option_request {
    my ($self, $sock, $option) = @_;

    my $req = Gearman::Util::pack_req_command("option_req", $option);
    my $len = length($req);
    my $rv  = $sock->write($req, $len);

    my $err;
    my $res = Gearman::Util::read_res_packet($sock, \$err,
        $self->{command_timeout});

    return unless $res;

    return 0 if $res->{type} eq "error";
    return 1 if $res->{type} eq "option_res";

    warn "Got unknown response to option request: $res->{type}\n";
    return;
} ## end sub _option_request

#
# _get_js_sock($js)
#
# returns a socket from the cache. it should be returned to the
# cache with _sock_cache($js, $sock).
# The hostport isn't verified. the caller
# should verify that $js is in the set of jobservers.
sub _get_js_sock {
    my ($self, $js) = @_;
    if (my $sock = $self->_sock_cache($js, undef, 1)) {
        return $sock if $sock->connected;
    }

    my $sockinfo = $self->{sock_info}{ $self->_js_str($js) } ||= {};
    my $disabled_until = $sockinfo->{disabled_until};
    return if defined $disabled_until && $disabled_until > Time::HiRes::time();

    my $sock = $self->socket($js, 1);
    unless ($sock) {
        my $count       = ++$sockinfo->{failed_connects};
        my $disable_for = $count**2;
        my $max         = $self->{backoff_max};
        $disable_for = $disable_for > $max ? $max : $disable_for;
        $sockinfo->{disabled_until} = $disable_for + Time::HiRes::time();
        return;
    } ## end unless ($sock)

    $self->sock_nodelay($sock);
    $sock->autoflush(1);

    # If exceptions support is to be requested, and the request fails, disable
    # exceptions for this client.
    if ($self->{exceptions} && !$self->_option_request($sock, 'exceptions')) {
        warn "Exceptions support denied by server, disabling.\n";
        $self->{exceptions} = 0;
    }

    delete $sockinfo->{failed_connects};    # Success, mark the socket as such.
    delete $sockinfo->{disabled_until};

    return $sock;
} ## end sub _get_js_sock

sub _get_random_js_sock {
    my ($self, $getter) = @_;

    $self->{js_count} || return;

    $getter ||= sub {
        my $js = shift;
        return $self->_get_js_sock($js);
    };

    my $ridx = int(rand($self->{js_count}));
    for (my $try = 0; $try < $self->{js_count}; $try++) {
        my $aidx = ($ridx + $try) % $self->{js_count};
        my $js   = $self->{job_servers}[$aidx];
        my $sock = $getter->($js) or next;
        return ($js, $sock);
    } ## end for (my $try = 0; $try ...)
    return ();
} ## end sub _get_random_js_sock

1;
__END__


=head1 COPYRIGHT

Copyright 2006-2007 Six Apart, Ltd.

License granted to use/distribute under the same terms as Perl itself.

=head1 WARRANTY

This is free software. This comes with no warranty whatsoever.

=head1 AUTHORS

 Brad Fitzpatrick (<brad at danga dot com>)
 Jonathan Steinert (<hachi at cpan dot org>)
 Alexei Pastuchov (<palik at cpan dot org>) co-maintainer

=head1 REPOSITORY

L<https://github.com/p-alik/perl-Gearman.git>