-- retry(0, jid, queue, worker, now, [delay])
-- ------------------------------------------
-- This script accepts jid, queue, worker and delay for
-- retrying a job. This is similar in functionality to
-- `put`, except that this counts against the retries
-- a job has for a stage.
--
-- If the worker is not the worker with a lock on the job,
-- then it returns false. If the job is not actually running,
-- then it returns false. Otherwise, it returns the number
-- of retries remaining. If the allowed retries have been
-- exhausted, then it is automatically failed, and a negative
-- number is returned.
if #KEYS ~= 0 then
error('Retry(): Got ' .. #KEYS .. ', expected 0')
end
local jid = assert(ARGV[1] , 'Retry(): Arg "jid" missing')
local queue = assert(ARGV[2] , 'Retry(): Arg "queue" missing')
local worker = assert(ARGV[3] , 'Retry(): Arg "worker" missing')
local now = assert(tonumber(ARGV[4]) , 'Retry(): Arg "now" missing')
local delay = assert(tonumber(ARGV[5] or 0), 'Retry(): Arg "delay" not a number: ' .. tostring(ARGV[5]))
-- Let's see what the old priority, history and tags were
local oldqueue, state, retries, oldworker, priority = unpack(redis.call('hmget', 'ql:j:' .. jid, 'queue', 'state', 'retries', 'worker', 'priority'))
-- If this isn't the worker that owns
if oldworker ~= worker or (state ~= 'running') then
return false
end
-- Remove it from the locks key of the old queue
redis.call('zrem', 'ql:q:' .. oldqueue .. '-locks', jid)
local remaining = redis.call('hincrby', 'ql:j:' .. jid, 'remaining', -1)
-- Remove this job from the worker that was previously working it
redis.call('zrem', 'ql:w:' .. worker .. ':jobs', jid)
if remaining < 0 then
-- Now remove the instance from the schedule, and work queues for the queue it's in
local group = 'failed-retries-' .. queue
-- First things first, we should get the history
local history = redis.call('hget', 'ql:j:' .. jid, 'history')
-- Now, take the element of the history for which our provided worker is the worker, and update 'failed'
history = cjson.decode(history or '[]')
history[#history]['failed'] = now
redis.call('hmset', 'ql:j:' .. jid, 'state', 'failed', 'worker', '',
'expires', '', 'history', cjson.encode(history), 'failure', cjson.encode({
['group'] = group,
['message'] = 'Job exhuasted retries in queue "' .. queue .. '"',
['when'] = now,
['worker'] = worker
}))
-- Add this type of failure to the list of failures
redis.call('sadd', 'ql:failures', group)
-- And add this particular instance to the failed types
redis.call('lpush', 'ql:f:' .. group, jid)
else
-- Put it in the queue again with a delay. Like put()
if delay > 0 then
redis.call('zadd', 'ql:q:' .. queue .. '-scheduled', now + delay, jid)
redis.call('hset', 'ql:j:' .. jid, 'state', 'scheduled')
else
redis.call('zadd', 'ql:q:' .. queue .. '-work', priority - (now / 10000000000), jid)
redis.call('hset', 'ql:j:' .. jid, 'state', 'waiting')
end
end
return remaining