package Queue::Q::ReliableFIFO::Lua;
use Redis;
use File::Slurp;
use Digest::SHA1;
use Carp qw(croak);
use Class::XSAccessor {
getters => [qw(redis_conn script_dir)]
};
my %scripts;
sub new {
my $class = shift;
my $self = bless { @_ }, $class;
$self->redis_conn || croak("need a redis connection");
$self->{script_dir} ||= $ENV{LUA_SCRIPT_DIR};
$self->{call} ||= {};
$self->register;
return $self;
}
sub register {
my $self = shift;
my $name = shift;
if ($self->script_dir) {
$name ||= '*';
for my $file (glob("$self->{script_dir}/$name.lua")) {
my $script = read_file($file);
my $sha1 = Digest::SHA1::sha1_hex($script);
my ($found) = @{$self->redis_conn->script_exists($sha1)};
if (!$found) {
print "registering $file\n";
my $rv = $self->redis_conn->script_load($script);
croak("returned sha1 is different from ours!")
if ($rv ne $sha1);
}
(my $call = $file) =~ s/\.lua$//;
$call =~ s/^.*\///;
$self->{call}{$call} = $sha1;
}
}
else {
croak("script $name not found") if $name && !exists $scripts{$name};
my @names = $name ? ($name) : (keys %script);
for my $scr_name (@names) {
my $script = $scripts{$scr_name};
my $sha1 = Digest::SHA1::sha1_hex($script);
my ($found) = @{$self->redis_conn->script_exists($sha1)};
if (!$found) {
my $rv = $self->redis_conn->script_load($script);
croak("returned sha1 is different from ours!")
if ($rv ne $sha1);
}
$self->{call}{$scr_name} = $sha1;
}
}
}
sub call {
my $self = shift;
my $name = shift;
$self->register($name) if not exists $self->{call}{$name};
my $sha1 = $self->{call}{$name};
croak("Unknown script $name") if ! $sha1;
return $self->redis_conn->evalsha($sha1, @_);
}
%scripts = (
requeue_busy => q{
-- requeue_busy (depending requeue limit items will be requeued or fail)
-- # KEYS[1] from queue name (busy queue)
-- # KEYS[2] dest queue name (main queue)
-- # KEYS[3] failed queue name (failed queue)
-- # ARGV[1] timestamp
-- # ARGV[2] item
-- # ARGV[3] requeue limit
-- # ARGV[4] place to requeue in dest-queue:
-- 0: at producer side, 1: consumer side
-- Note: failed items will always go to the tail of the failed queue
-- # ARGV[5] OPTIONAL error message
--
--redis.log(redis.LOG_WARNING, "requeue_tail")
if #KEYS ~= 3 then error('requeue_busy requires 3 keys') end
-- redis.log(redis.LOG_NOTICE, "nr keys: " .. #KEYS)
local from = assert(KEYS[1], 'busy queue name missing')
local dest = assert(KEYS[2], 'dest queue name missing')
local failed= assert(KEYS[3], 'failed queue name missing')
local ts = assert(tonumber(ARGV[1]), 'timestamp missing')
local item = assert(ARGV[2], 'item missing')
local limit = assert(tonumber(ARGV[3]), 'requeue limit missing')
local place = tonumber(ARGV[4])
assert(place == 0 or place == 1, 'requeue place should be 0 or 1')
local n = redis.call('lrem', from, 1, item)
if n > 0 then
local i= cjson.decode(item)
if i.rc == nil then
i.rc=1
else
i.rc=i.rc+1
end
if i.rc <= limit then
-- only adjust timestamps in case of requeuing
-- (not if busy item is place back in the front of the queue)
if place == 0 then
if i.t_created == nil then
i.t_created = i.t
end
i.t = ts
end
local v=cjson.encode(i)
if place == 0 then
redis.call('lpush', dest, v)
else
redis.call('rpush', dest, v)
end
else
-- reset requeue counter and increase fail counter
i.rc = nil
if i.fc == nil then
i.fc = 1
else
i.fc = i.fc + 1
end
if #ARGV == 5 then
i.error = ARGV[5]
else
i.error = nil
end
local v=cjson.encode(i)
redis.call('lpush', failed, v)
end
end
return n
},
requeue_failed => q{
-- requeue_failed: requeue a given number of failed items
-- # KEYS[1] from queue name (failed queue)
-- # KEYS[2] dest queue name (main queue)
-- # ARGV[1] timestamp
-- # ARGV[2] number of items to requeue. Value "0" means "all items"
--
if #KEYS ~= 2 then error('requeue_busy requires 2 key') end
-- redis.log(redis.LOG_NOTICE, "nr keys: " .. #KEYS)
local from = assert(KEYS[1], 'failed queue name missing')
local dest = assert(KEYS[2], 'dest queue name missing')
local ts = assert(tonumber(ARGV[1]), 'timestamp missing')
local num = assert(tonumber(ARGV[2]), 'number of items missing')
local n = 0;
if num == 0 then
num = redis.call('llen', from)
end
for i = 1, num do
local item = redis.call('rpop', from);
if item == nil then break end
local i = cjson.decode(item)
if i.t_created == nil then
i.t_created = i.t
end
i.t = ts
local v = cjson.encode(i)
redis.call('lpush', dest, v)
n = n + 1
end
return n
},
requeue_failed_item => q{
-- Requeue_busy_items
-- # KEYS[1] from queue name (failed queue)
-- # KEYS[2] dest queue name (main queue)
-- # ARGV[1] timestamp
-- # ARGV[2] item
--
-- redis.log(redis.LOG_WARNING, "requeue_tail")
if #KEYS ~= 2 then error('requeue_busy requires 2 key') end
-- redis.log(redis.LOG_NOTICE, "nr keys: " .. #KEYS)
local from = assert(KEYS[1], 'failed queue name missing')
local dest = assert(KEYS[2], 'dest queue name missing')
local ts = assert(tonumber(ARGV[1]), 'timestamp missing')
local item = assert(ARGV[2], 'item missing')
local n = redis.call('lrem', from, 1, item)
if n > 0 then
local i = cjson.decode(item)
if i.t_created == nil then
i.t_created = i.t
end
i.t = ts
local v = cjson.encode(i)
redis.call('lpush', dest, v)
end
return n
});
1;
__END__
=head1 NAME
Queue::Q::ReliableFIFO::Lua - Load lua scripts into Redis
=head1 SYNOPSIS
use Queue::Q::ReliableFIFO::Lua;
my $lua = Queue::Q::ReliableFIFO::Lua->new(
script_dir => /some/path
redis_conn => $redis_conn);
$lua->call('myscript', $n, @keys, @argv);
=head1 DESCRIPTION
This module offers two ways of loading/running lua scripts.
One way
is with separate lua scripts, which live at a location as indicated
by the script_dir parameter (passed to the constructor) or as
indicated by the LUA_SCRIPT_DIR environment variable.
The other way is by putting the source code of the lua scripts in
this module, in the %scripts hash.
Which way is actually used depends on whether or not passing info
about a path to lua scripts. If a lua script location is known, those
script will be used, otherwise the %scripts code is used.
During development it is more conveniant to use the separate lua files
of course. But for deploying it is less error prone if the lua code
is inside the perl module. So that is why this is done this way.
The scripts are loaded when the constructor is called.