The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Queue::Q4M::Worker;
use strict;
use DBI;
use POSIX qw(:signal_h);
use Time::HiRes ();
use Class::Accessor::Lite
    rw => [ qw(
        before_loop_cb
        dbh
        delay
        loop_iteration_cb
        max_workers
        min_requests_per_child
        max_requests_per_child
        signal_received
        sql
        _work_once
    ) ]
;

our $VERSION = '0.06';

my $GUARD_CB;
BEGIN {
    if ( eval { require Scope::Guard } && !$@ ) {
        $GUARD_CB = \&Scope::Guard::guard;
    } else {
        *Queue::Q4M::Worker::Guard::DESTROY = sub {
            if (! $_[0][0]) {
                $_[0]->();
            }
        };
        $GUARD_CB = sub { bless [ 1, $_[0] ], 'Queue::Q4M::Worker::Guard' };
    }
}

sub new {
    my ($class, %args) = @_;

    bless {
        max_workers => 0,
        max_requests_per_child  => 10_000,
        min_requests_per_child  => 0,
        _work_once => delete $args{work_once},
        %args
    }, $class;
}

sub _get_sql {
    my $self = shift;
    my $sql = $self->sql;

    my ($stmt, @binds);
    if (ref $sql eq 'CODE') {
        ($stmt, @binds) = $sql->($self);
    } else {
        $stmt = $sql;
    }
    return ($stmt, @binds);
}


sub _get_before_loop_guard {
    my $self = shift;
    my $cb = $self->before_loop_cb();
    if ($cb) {
        return $cb->($self);
    }
}

sub _get_loop_iteration_guard {
    my $self = shift;
    my $cb = $self->loop_iteration_cb();
    if ($cb) {
        return $cb->($self);
    }
}

sub _get_dbh {
    my $self = shift;
    my $dbh = $self->dbh;

    my $handle;
    if ( ref $dbh eq 'CODE' ) {
        $handle = $dbh->($self);
    } else {
        $handle = $dbh;
    }
    return $handle;
}

sub work_once {
    my $self = shift;
    if ( my $cb = $self->_work_once) {
        return $cb->( $self, @_ );
    }
}

# XXX can we  process more jobs? 
sub should_process_more { $_[0]->{stop_at} > $_[0]->{processed} }

sub should_loop { 
    $_[0]->should_process_more &&
    ! $_[0]->signal_received
}

sub work {
    my $self = shift;

    if ( $self->max_workers > 1 ) {
        $self->run_multi();
    } else {
        $self->run_single();
    }
}

# Run multiple children using Parallel::Prefork (if you want more
# control over how this is done, please subclass).
sub run_multi {
    my $self = shift;
    require Parallel::Prefork;
    my $pp = Parallel::Prefork->new({
        max_workers => $self->max_workers,
        trap_signals => {
            TERM => 'TERM',
            HUP  => 'TERM',
        }
    });

    while ( $pp->signal_received ne 'TERM' ) {
        $pp->start(sub { $self->run_single });
    }

    $pp->wait_all_children()
}

sub run_single {
    my $self = shift;

    my $min_requests = $self->min_requests_per_child;
    my $max_requests = $self->max_requests_per_child;

    # WTF? min_requests can't be 0
    if ($min_requests < 0) {
        $min_requests = 0;
    }

    # WTF? max_requests must be > min_requests
    # arbitrarily choose min + 5_000
    if ($max_requests <= $min_requests) {
        $max_requests = $min_requests + 5000;
    }
    my $stop_at = int(rand($max_requests));
    $self->{stop_at} = $stop_at;

    my $dbh;
    my $sth;
    my $sigset = POSIX::SigSet->new( SIGINT, SIGQUIT, SIGTERM );
    my $cancel_q4m = POSIX::SigAction->new(sub {
        my $signame = shift;
        eval { $sth->cancel };
        eval { $dbh->disconnect };
        $self->signal_received( $signame );
    }, $sigset, &POSIX::SA_NOCLDSTOP);
    my $install_sig = sub {
        # XXX use SigSet to properly interrupt the process
        POSIX::sigaction( SIGINT,  $cancel_q4m );
        POSIX::sigaction( SIGQUIT, $cancel_q4m );
        POSIX::sigaction( SIGTERM, $cancel_q4m );
    };

    $install_sig->();

    # Run arbitrary code before loop. Optionally return a guard object
    my $before_loop = $self->_get_before_loop_guard();
    my $default_sig = POSIX::SigAction->new('DEFAULT');
    while ($self->should_loop) {
        # This is entirely optional. If you want do something that only
        # has an effect during this particular iteration of the loop,
        # you can create a guard here.
        my $guard = $self->_get_loop_iteration_guard();

        # This may seem like a waste, but sometimes you have multiple queues
        # to fetch from, and you want multiplex between each database, so
        # we fetch the database per-iteration
        $dbh = $self->_get_dbh();

        my ($stmt, @binds) = $self->_get_sql();
        $sth = $dbh->prepare($stmt);

        my $rv = $sth->execute( @binds );
        if ( $rv == 0 ) { # nothing
            $sth->finish;
            next;
        }

        if ( my $h = $sth->fetchrow_hashref ) {
            $self->{processed}++;
            $dbh->do("SELECT queue_end()");

            # while the consumer is working, we need to reset the
            # signal handlers that we previously set
            my $gobj = $GUARD_CB->($install_sig);
            POSIX::sigaction( SIGINT,  $default_sig );
            POSIX::sigaction( SIGQUIT, $default_sig );
            POSIX::sigaction( SIGTERM, $default_sig );

            $self->work_once( $h );
        }
        if (my $delay = $self->delay) {
            Time::HiRes::sleep(rand($delay));
        }
    }
    POSIX::sigaction( SIGINT,  $default_sig );
    POSIX::sigaction( SIGQUIT, $default_sig );
    POSIX::sigaction( SIGTERM, $default_sig );
}

1;

__END__

=head1 NAME

Queue::Q4M::Worker - Worker Object Receiving Items From Q4M

=head1 SYNOPSIS

    use Queue::Q4M::Worker;

    my $worker = Queue::Q4M::Worker->new(
        sql => "SELECT * FROM my_queue WHERE queue_wait(...)",
        max_workers => 10, # use Parallel::Prefork
        work_once => sub {
            my ($worker, $row) = @_;
            # $row is a HASH
        }
    );

    $worker->work;

=head1 DESCRIPTION

Queue::Q4M::Worker abstracts a worker subscribing to a Q4M queue.

=head1 CAVEATS

This is a proof of concept release. Please report bugs, and send pull
requests if you like the idea.

=head1 AUTHOR

Daisuke Maki C<< <daisuke@endeworks.jp> >>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2011 by Daisuke Maki

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.0 or,
at your option, any later version of Perl 5 you may have available.

=cut