package Queue::Q::ClaimFIFO::Redis;
use strict;
use warnings;
use Carp qw(croak);
use Scalar::Util qw(blessed);
use Digest::SHA1;
use Redis;
use Redis::ScriptCache;
use Queue::Q::ClaimFIFO;
use parent 'Queue::Q::ClaimFIFO';
use Class::XSAccessor {
getters => [qw(server port queue_name db _redis_conn _script_cache)],
};
use constant CLAIMED_SUFFIX => '_claimed';
use constant STORAGE_SUFFIX => '_storage';
# in: queue_name, itemkey, value
# out: nothing
our $EnqueueScript = qq#
redis.call('lpush', KEYS[1], ARGV[1])
redis.call('hset', KEYS[1] .. "${\STORAGE_SUFFIX()}", ARGV[1], ARGV[2])
#;
our $EnqueueScriptSHA = Digest::SHA1::sha1_hex($EnqueueScript);
# in: queue_name, time
# out: itemkey, value
our $ClaimScript = qq#
local itemkey = redis.call('rpop', KEYS[1])
if not itemkey then
return {nil, nil}
end
local data = redis.call('hget', KEYS[1] .. "${\STORAGE_SUFFIX()}", itemkey)
redis.call('zadd', KEYS[1] .. "${\CLAIMED_SUFFIX()}", ARGV[1], itemkey)
return {itemkey, data}
#;
our $ClaimScriptSHA = Digest::SHA1::sha1_hex($ClaimScript);
# in: queue_name, itemkey
# out: nothing
our $FinishScript = qq#
redis.call('hdel', KEYS[1] .. "${\STORAGE_SUFFIX()}", ARGV[1])
redis.call('zrem', KEYS[1] .. "${\CLAIMED_SUFFIX()}", ARGV[1])
#;
our $FinishScriptSHA = Digest::SHA1::sha1_hex($FinishScript);
sub new {
my ($class, %params) = @_;
for (qw(server port queue_name)) {
croak("Need '$_' parameter")
if not exists $params{$_};
}
my $self = bless({
(map {$_ => $params{$_}} qw(server port queue_name) ),
db => $params{db} || 0,
_redis_conn => undef,
_script_ok => 0, # not yet known if lua script available
} => $class);
$self->{_redis_conn} = Redis->new(
%{$params{redis_options} || {}},
encoding => undef, # force undef for binary data
server => join(":", $self->server, $self->port),
);
$self->{_script_cache}
= Redis::ScriptCache->new(redis_conn => $self->_redis_conn);
$self->_redis_conn->select($self->db) if $self->db;
return $self;
}
sub enqueue_item {
my $self = shift;
croak("Need exactly one item to enqeue")
if not @_ == 1;
my $item = shift;
if (blessed($item) and $item->isa("Queue::Q::ClaimFIFO::Item")) {
croak("Don't pass a Queue::Q::ClaimFIFO::Item object to enqueue_item: "
. "Your data structure will be wrapped in one");
}
$item = Queue::Q::ClaimFIFO::Item->new(item_data => $item);
$self->_script_cache->run_script(
$EnqueueScriptSHA,
[1, $self->queue_name, $item->_key, $item->_serialized_data],
\$EnqueueScript
);
return $item;
}
sub enqueue_items {
my $self = shift;
return if not @_;
my @items;
foreach my $item (@_) {
if (blessed($item) and $item->isa("Queue::Q::ClaimFIFO::Item")) {
croak("Don't pass a Queue::Q::ClaimFIFO::Item object to enqueue_items: "
. "Your data structure will be wrapped in one");
}
push @items, Queue::Q::ClaimFIFO::Item->new(item_data => $item);
}
# FIXME, move loop onto the server or pipeline if possible!
my $qn = $self->queue_name;
for (0..$#items) {
my $key = $items[$_]->_key;
my $data = $items[$_]->_serialized_data;
$self->_script_cache->run_script(
$EnqueueScriptSHA,
[1, $qn, $key, $data],
\$EnqueueScript
);
}
return @items;
}
sub claim_item {
my $self = shift;
my ($key, $serialized_data) = $self->_script_cache->run_script(
$ClaimScriptSHA,
[1, $self->queue_name, time()],
\$ClaimScript
);
return undef if not defined $key;
my $item = Queue::Q::ClaimFIFO::Item->new(
_serialized_data => $serialized_data,
_key => $key,
);
$item->{item_data} = $item->_deserialize_data($serialized_data);
return $item;
}
sub claim_items {
my $self = shift;
my $n = shift || 1;
my @items;
for (1..$n) {
# TODO Lua script for multiple items!
my ($key, $serialized_data) = $self->_script_cache->run_script(
$ClaimScriptSHA,
[1, $self->queue_name, time()],
\$ClaimScript
);
last if not defined $key;
my $item = Queue::Q::ClaimFIFO::Item->new(
_serialized_data => $serialized_data,
_key => $key,
);
$item->{item_data} = $item->_deserialize_data($serialized_data);
push @items, $item;
}
return @items;
}
sub mark_item_as_done {
my ($self, $item) = @_;
my $key = $item->_key;
$self->_script_cache->run_script(
$FinishScriptSHA,
[1, $self->queue_name, $key],
\$FinishScript,
);
}
sub mark_items_as_done {
my ($self) = shift;
foreach (@_) {
# TODO Lua script for multiple items!
my $key = $_->_key;
$self->_script_cache->run_script(
$FinishScriptSHA,
[1, $self->queue_name, $key],
\$FinishScript,
);
}
}
sub flush_queue {
my $self = shift;
$self->_redis_conn->del($self->queue_name);
$self->_redis_conn->del($self->queue_name . CLAIMED_SUFFIX);
$self->_redis_conn->del($self->queue_name . STORAGE_SUFFIX);
}
sub queue_length {
my $self = shift;
my ($len) = $self->_redis_conn->llen($self->queue_name);
return $len;
}
sub claimed_count {
my $self = shift;
my ($len) = $self->_redis_conn->zcard($self->queue_name . CLAIMED_SUFFIX);
return $len;
}
1;