-- Fail(0, jid, worker, group, message, now, [data])
-- -------------------------------------------------
-- Mark the particular job as failed, with the provided group, and a more specific
-- message. By `group`, we mean some phrase that might be one of several categorical
-- modes of failure. The `message` is something more job-specific, like perhaps
-- a traceback.
--
-- This method should __not__ be used to note that a job has been dropped or has
-- failed in a transient way. This method __should__ be used to note that a job has
-- something really wrong with it that must be remedied.
--
-- The motivation behind the `group` is so that similar errors can be grouped together.
-- Optionally, updated data can be provided for the job. A job in any state can be
-- marked as failed. If it has been given to a worker as a job, then its subsequent
-- requests to heartbeat or complete that job will fail. Failed jobs are kept until
-- they are canceled or completed. __Returns__ the id of the failed job if successful,
-- or `False` on failure.
--
-- Args:
-- 1) jid
-- 2) worker
-- 3) group
-- 4) message
-- 5) the current time
-- 6) [data]
if #KEYS > 0 then error('Fail(): No Keys should be provided') end
local jid = assert(ARGV[1] , 'Fail(): Arg "jid" missing')
local worker = assert(ARGV[2] , 'Fail(): Arg "worker" missing')
local group = assert(ARGV[3] , 'Fail(): Arg "group" missing')
local message = assert(ARGV[4] , 'Fail(): Arg "message" missing')
local now = assert(tonumber(ARGV[5]), 'Fail(): Arg "now" missing or malformed: ' .. (ARGV[5] or 'nil'))
local data = ARGV[6]
-- The bin is midnight of the provided day
-- 24 * 60 * 60 = 86400
local bin = now - (now % 86400)
if data then
data = cjson.decode(data)
end
-- First things first, we should get the history
local history, queue, state = unpack(redis.call('hmget', 'ql:j:' .. jid, 'history', 'queue', 'state'))
-- If the job has been completed, we cannot fail it
if state ~= 'running' then
return false
end
if redis.call('zscore', 'ql:tracked', jid) ~= false then
redis.call('publish', 'failed', jid)
end
-- Remove this job from the jobs that the worker that was running it has
redis.call('zrem', 'ql:w:' .. worker .. ':jobs', jid)
-- Now, take the element of the history for which our provided worker is the worker, and update 'failed'
history = cjson.decode(history or '[]')
if #history > 0 then
for i=#history,1,-1 do
if history[i]['worker'] == worker then
history[i]['failed'] = math.floor(now)
end
end
else
history = {
{
worker = worker,
failed = math.floor(now)
}
}
end
-- Increment the number of failures for that queue for the
-- given day.
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failures', 1)
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failed' , 1)
-- Now remove the instance from the schedule, and work queues for the queue it's in
redis.call('zrem', 'ql:q:' .. queue .. '-work', jid)
redis.call('zrem', 'ql:q:' .. queue .. '-locks', jid)
redis.call('zrem', 'ql:q:' .. queue .. '-scheduled', jid)
-- The reason that this appears here is that the above will fail if the job doesn't exist
if data then
redis.call('hset', 'ql:j:' .. jid, 'data', cjson.encode(data))
end
redis.call('hmset', 'ql:j:' .. jid, 'state', 'failed', 'worker', '',
'expires', '', 'history', cjson.encode(history), 'failure', cjson.encode({
['group'] = group,
['message'] = message,
['when'] = math.floor(now),
['worker'] = worker
}))
-- Add this group of failure to the list of failures
redis.call('sadd', 'ql:failures', group)
-- And add this particular instance to the failed groups
redis.call('lpush', 'ql:f:' .. group, jid)
-- Here is where we'd intcrement stats about the particular stage
-- and possibly the workers
return jid