The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
-- Fail(0, jid, worker, group, message, now, [data])
-- -------------------------------------------------
-- Mark the particular job as failed, with the provided group, and a more specific
-- message. By `group`, we mean some phrase that might be one of several categorical
-- modes of failure. The `message` is something more job-specific, like perhaps
-- a traceback.
-- 
-- This method should __not__ be used to note that a job has been dropped or has 
-- failed in a transient way. This method __should__ be used to note that a job has
-- something really wrong with it that must be remedied.
-- 
-- The motivation behind the `group` is so that similar errors can be grouped together.
-- Optionally, updated data can be provided for the job. A job in any state can be
-- marked as failed. If it has been given to a worker as a job, then its subsequent
-- requests to heartbeat or complete that job will fail. Failed jobs are kept until
-- they are canceled or completed. __Returns__ the id of the failed job if successful,
-- or `False` on failure.
--
-- Args:
--    1) jid
--    2) worker
--    3) group
--    4) message
--    5) the current time
--    6) [data]

if #KEYS > 0 then error('Fail(): No Keys should be provided') end

local jid     = assert(ARGV[1]          , 'Fail(): Arg "jid" missing')
local worker  = assert(ARGV[2]          , 'Fail(): Arg "worker" missing')
local group   = assert(ARGV[3]          , 'Fail(): Arg "group" missing')
local message = assert(ARGV[4]          , 'Fail(): Arg "message" missing')
local now     = assert(tonumber(ARGV[5]), 'Fail(): Arg "now" missing or malformed: ' .. (ARGV[5] or 'nil'))
local data    = ARGV[6]

-- The bin is midnight of the provided day
-- 24 * 60 * 60 = 86400
local bin = now - (now % 86400)

if data then
	data = cjson.decode(data)
end

-- First things first, we should get the history
local history, queue, state = unpack(redis.call('hmget', 'ql:j:' .. jid, 'history', 'queue', 'state'))

-- If the job has been completed, we cannot fail it
if state ~= 'running' then
	return false
end

if redis.call('zscore', 'ql:tracked', jid) ~= false then
	redis.call('publish', 'failed', jid)
end

-- Remove this job from the jobs that the worker that was running it has
redis.call('zrem', 'ql:w:' .. worker .. ':jobs', jid)

-- Now, take the element of the history for which our provided worker is the worker, and update 'failed'
history = cjson.decode(history or '[]')
if #history > 0 then
	for i=#history,1,-1 do
		if history[i]['worker'] == worker then
			history[i]['failed'] = math.floor(now)
		end
	end
else
	history = {
		{
			worker = worker,
			failed = math.floor(now)
		}
	}
end

-- Increment the number of failures for that queue for the
-- given day.
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failures', 1)
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failed'  , 1)

-- Now remove the instance from the schedule, and work queues for the queue it's in
redis.call('zrem', 'ql:q:' .. queue .. '-work', jid)
redis.call('zrem', 'ql:q:' .. queue .. '-locks', jid)
redis.call('zrem', 'ql:q:' .. queue .. '-scheduled', jid)

-- The reason that this appears here is that the above will fail if the job doesn't exist
if data then
	redis.call('hset', 'ql:j:' .. jid, 'data', cjson.encode(data))
end

redis.call('hmset', 'ql:j:' .. jid, 'state', 'failed', 'worker', '',
	'expires', '', 'history', cjson.encode(history), 'failure', cjson.encode({
		['group']   = group,
		['message'] = message,
		['when']    = math.floor(now),
		['worker']  = worker
	}))

-- Add this group of failure to the list of failures
redis.call('sadd', 'ql:failures', group)
-- And add this particular instance to the failed groups
redis.call('lpush', 'ql:f:' .. group, jid)

-- Here is where we'd intcrement stats about the particular stage
-- and possibly the workers

return jid