The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Gearman::Server::Job;
use version;
$Gearman::Server::Job::VERSION = qv("v1.130.1");

use strict;
use warnings;

use Gearman::Server::Client;
use Scalar::Util;
use Sys::Hostname;

use fields (
            'func',
            'uniq',
            'argref',
            'listeners',  # arrayref of interested Clients
            'worker',
            'handle',
            'status',  # [1, 100]
            'require_listener',
            'server',  # Gearman::Server that owns us
            );

sub new {
    my Gearman::Server::Job $self = shift;
    my ($server, $func, $uniq, $argref, $highpri) = @_;

    $self = fields::new($self) unless ref $self;

    # if they specified a uniq, see if we have a dup job running already
    # to merge with
    if (length($uniq)) {
        # a unique value of "-" means "use my args as my unique key"
        $uniq = $$argref if $uniq eq "-";
        if (my $job = $server->job_of_unique($func, $uniq)) {
            # found a match
            return $job;
        }
        # create a new key
        $server->set_unique_job($func, $uniq => $self);
    }

    $self->{'server'} = $server;
    $self->{'func'}   = $func;
    $self->{'uniq'}   = $uniq;
    $self->{'argref'} = $argref;
    $self->{'require_listener'} = 1;
    $self->{'listeners'} = [];
    $self->{'handle'}  = $server->new_job_handle;

    $server->enqueue_job($self, $highpri);
    return $self;
}

sub add_listener {
    my Gearman::Server::Job $self = shift;
    my Gearman::Server::Client $li = shift;

    push @{$self->{listeners}}, $li;
    Scalar::Util::weaken($self->{listeners}->[-1]);
}

sub relay_to_listeners {
    my Gearman::Server::Job $self = shift;
    foreach my Gearman::Server::Client $c (@{$self->{listeners}}) {
        next if !$c || $c->{closed};
        $c->write($_[0]);
    }
}

sub relay_to_option_listeners {
    my Gearman::Server::Job $self = shift;
    my $option = $_[1];
    foreach my Gearman::Server::Client $c (@{$self->{listeners}}) {
        next if !$c || $c->{closed};
        next unless $c->option($option);
        $c->write($_[0]);
    }

}

sub clear_listeners {
    my Gearman::Server::Job $self = shift;
    $self->{listeners} = [];
}

sub listeners {
    my Gearman::Server::Job $self = shift;
    return @{$self->{listeners}};
}

sub uniq {
    my Gearman::Server::Job $self = shift;
    return $self->{uniq};
}

sub note_finished {
    my Gearman::Server::Job $self = shift;
    my $success = shift;

    $self->{server}->note_job_finished($self);

    if ($Gearmand::graceful_shutdown) {
        Gearmand::shutdown_if_calm();
    }
}

# accessors:
sub worker {
    my Gearman::Server::Job $self = shift;
    return $self->{'worker'} unless @_;
    return $self->{'worker'} = shift;
}
sub require_listener {
    my Gearman::Server::Job $self = shift;
    return $self->{'require_listener'} unless @_;
    return $self->{'require_listener'} = shift;
}

# takes arrayref of [numerator,denominator]
sub status {
    my Gearman::Server::Job $self = shift;
    return $self->{'status'} unless @_;
    return $self->{'status'} = shift;
}

sub handle {
    my Gearman::Server::Job $self = shift;
    return $self->{'handle'};
}

sub func {
    my Gearman::Server::Job $self = shift;
    return $self->{'func'};
}

sub argref {
    my Gearman::Server::Job $self = shift;
    return $self->{'argref'};
}

1;