-- This script takes the name of the queue and then checks
-- for any expired locks, then inserts any scheduled items
-- that are now valid, and lastly returns any work items
-- that can be handed over.
--
-- Keys:
-- 1) queue name
-- Args:
-- 1) worker name
-- 2) the number of items to return
-- 3) the current time
if #KEYS ~= 1 then
if #KEYS < 1 then
error('Pop(): Expected 1 KEYS argument')
else
error('Pop(): Got ' .. #KEYS .. ', expected 1 KEYS argument')
end
end
local queue = assert(KEYS[1] , 'Pop(): Key "queue" missing')
local key = 'ql:q:' .. queue
local worker = assert(ARGV[1] , 'Pop(): Arg "worker" missing')
local count = assert(tonumber(ARGV[2]) , 'Pop(): Arg "count" missing or not a number: ' .. (ARGV[2] or 'nil'))
local now = assert(tonumber(ARGV[3]) , 'Pop(): Arg "now" missing or not a number: ' .. (ARGV[3] or 'nil'))
-- We should find the heartbeat interval for this queue
-- heartbeat
local _hb, _qhb = unpack(redis.call('hmget', 'ql:config', 'heartbeat', queue .. '-heartbeat'))
local expires = now + tonumber(_qhb or _hb or 60)
-- The bin is midnight of the provided day
-- 24 * 60 * 60 = 86400
local bin = now - (now % 86400)
-- These are the ids that we're going to return
local keys = {}
-- Make sure we this worker to the list of seen workers
redis.call('zadd', 'ql:workers', now, worker)
-- Iterate through all the expired locks and add them to the list
-- of keys that we'll return
for index, jid in ipairs(redis.call('zrangebyscore', key .. '-locks', 0, now, 'LIMIT', 0, count)) do
-- Remove this job from the jobs that the worker that was running it has
local w = redis.call('hget', 'ql:j:' .. jid, 'worker')
redis.call('zrem', 'ql:w:' .. w .. ':jobs', jid)
-- For each of these, decrement their retries. If any of them
-- have exhausted their retries, then we should mark them as
-- failed.
if redis.call('hincrby', 'ql:j:' .. jid, 'remaining', -1) < 0 then
-- Now remove the instance from the schedule, and work queues for the queue it's in
redis.call('zrem', 'ql:q:' .. queue .. '-work', jid)
redis.call('zrem', 'ql:q:' .. queue .. '-locks', jid)
redis.call('zrem', 'ql:q:' .. queue .. '-scheduled', jid)
local group = 'failed-retries-' .. queue
-- First things first, we should get the history
local history = redis.call('hget', 'ql:j:' .. jid, 'history')
-- Now, take the element of the history for which our provided worker is the worker, and update 'failed'
history = cjson.decode(history or '[]')
history[#history]['failed'] = now
redis.call('hmset', 'ql:j:' .. jid, 'state', 'failed', 'worker', '',
'expires', '', 'history', cjson.encode(history), 'failure', cjson.encode({
['group'] = group,
['message'] = 'Job exhuasted retries in queue "' .. queue .. '"',
['when'] = now,
['worker'] = history[#history]['worker']
}))
-- Add this type of failure to the list of failures
redis.call('sadd', 'ql:failures', group)
-- And add this particular instance to the failed types
redis.call('lpush', 'ql:f:' .. group, jid)
if redis.call('zscore', 'ql:tracked', jid) ~= false then
redis.call('publish', 'failed', jid)
end
else
table.insert(keys, jid)
if redis.call('zscore', 'ql:tracked', jid) ~= false then
redis.call('publish', 'stalled', jid)
end
end
end
-- Now we've checked __all__ the locks for this queue the could
-- have expired, and are no more than the number requested.
-- If we got any expired locks, then we should increment the
-- number of retries for this stage for this bin
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'retries', #keys)
-- If we still need jobs in order to meet demand, then we should
-- look for all the recurring jobs that need jobs run
if #keys < count then
-- This is how many jobs we've moved so far
local moved = 0
-- These are the recurring jobs that need work
local r = redis.call('zrangebyscore', key .. '-recur', 0, now, 'LIMIT', 0, (count - #keys))
for index, jid in ipairs(r) do
-- For each of the jids that need jobs scheduled, first
-- get the last time each of them was run, and then increment
-- it by its interval. While this time is less than now,
-- we need to keep putting jobs on the queue
local klass, data, priority, tags, retries, interval = unpack(redis.call('hmget', 'ql:r:' .. jid, 'klass', 'data', 'priority', 'tags', 'retries', 'interval'))
local _tags = cjson.decode(tags)
-- We're saving this value so that in the history, we can accurately
-- reflect when the job would normally have been scheduled
local score = math.floor(tonumber(redis.call('zscore', key .. '-recur', jid)))
while (score <= now) and (moved < (count - #keys)) do
-- Increment the count of how many jobs we've moved from recurring
-- to 'work'
moved = moved + 1
-- the count'th job that we've moved from this recurring job
local count = redis.call('hincrby', 'ql:r:' .. jid, 'count', 1)
-- Add this job to the list of jobs tagged with whatever tags were supplied
for i, tag in ipairs(_tags) do
redis.call('zadd', 'ql:t:' .. tag, now, jid .. '-' .. count)
redis.call('zincrby', 'ql:tags', 1, tag)
end
-- First, let's save its data
redis.call('hmset', 'ql:j:' .. jid .. '-' .. count,
'jid' , jid .. '-' .. count,
'klass' , klass,
'data' , data,
'priority' , priority,
'tags' , tags,
'state' , 'waiting',
'worker' , '',
'expires' , 0,
'queue' , queue,
'retries' , retries,
'remaining', retries,
'history' , cjson.encode({{
-- The job was essentially put in this queue at this time,
-- and not the current time
q = queue,
put = math.floor(score)
}}))
-- Now, if a delay was provided, and if it's in the future,
-- then we'll have to schedule it. Otherwise, we're just
-- going to add it to the work queue.
redis.call('zadd', key .. '-work', priority - (score / 10000000000), jid .. '-' .. count)
redis.call('zincrby', key .. '-recur', interval, jid)
score = score + interval
end
end
end
-- If we still need values in order to meet the demand, then we
-- should check if any scheduled items, and if so, we should
-- insert them to ensure correctness when pulling off the next
-- unit of work.
if #keys < count then
-- zadd is a list of arguments that we'll be able to use to
-- insert into the work queue
local zadd = {}
local r = redis.call('zrangebyscore', key .. '-scheduled', 0, now, 'LIMIT', 0, (count - #keys))
for index, jid in ipairs(r) do
-- With these in hand, we'll have to go out and find the
-- priorities of these jobs, and then we'll insert them
-- into the work queue and then when that's complete, we'll
-- remove them from the scheduled queue
table.insert(zadd, tonumber(redis.call('hget', 'ql:j:' .. jid, 'priority') or 0))
table.insert(zadd, jid)
end
-- Now add these to the work list, and then remove them
-- from the scheduled list
if #zadd > 0 then
redis.call('zadd', key .. '-work', unpack(zadd))
redis.call('zrem', key .. '-scheduled', unpack(r))
end
-- And now we should get up to the maximum number of requested
-- work items from the work queue.
for index, jid in ipairs(redis.call('zrevrange', key .. '-work', 0, (count - #keys) - 1)) do
table.insert(keys, jid)
end
end
-- Alright, now the `keys` table is filled with all the job
-- ids which we'll be returning. Now we need to get the
-- metadeata about each of these, update their metadata to
-- reflect which worker they're on, when the lock expires,
-- etc., add them to the locks queue and then we have to
-- finally return a list of json blobs
local response = {}
local state
local history
for index, jid in ipairs(keys) do
-- First, we should get the state and history of the item
state, history = unpack(redis.call('hmget', 'ql:j:' .. jid, 'state', 'history'))
history = cjson.decode(history or '{}')
history[#history]['worker'] = worker
history[#history]['popped'] = math.floor(now)
----------------------------------------------------------
-- This is the massive stats update that we have to do
----------------------------------------------------------
-- This is how long we've been waiting to get popped
local waiting = math.floor(now) - history[#history]['put']
-- Now we'll go through the apparently long and arduous process of update
local count, mean, vk = unpack(redis.call('hmget', 'ql:s:wait:' .. bin .. ':' .. queue, 'total', 'mean', 'vk'))
count = count or 0
if count == 0 then
mean = waiting
vk = 0
count = 1
else
count = count + 1
local oldmean = mean
mean = mean + (waiting - mean) / count
vk = vk + (waiting - mean) * (waiting - oldmean)
end
-- Now, update the histogram
-- - `s1`, `s2`, ..., -- second-resolution histogram counts
-- - `m1`, `m2`, ..., -- minute-resolution
-- - `h1`, `h2`, ..., -- hour-resolution
-- - `d1`, `d2`, ..., -- day-resolution
waiting = math.floor(waiting)
if waiting < 60 then -- seconds
redis.call('hincrby', 'ql:s:wait:' .. bin .. ':' .. queue, 's' .. waiting, 1)
elseif waiting < 3600 then -- minutes
redis.call('hincrby', 'ql:s:wait:' .. bin .. ':' .. queue, 'm' .. math.floor(waiting / 60), 1)
elseif waiting < 86400 then -- hours
redis.call('hincrby', 'ql:s:wait:' .. bin .. ':' .. queue, 'h' .. math.floor(waiting / 3600), 1)
else -- days
redis.call('hincrby', 'ql:s:wait:' .. bin .. ':' .. queue, 'd' .. math.floor(waiting / 86400), 1)
end
redis.call('hmset', 'ql:s:wait:' .. bin .. ':' .. queue, 'total', count, 'mean', mean, 'vk', vk)
----------------------------------------------------------
-- Add this job to the list of jobs handled by this worker
redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, jid)
-- Update the jobs data, and add its locks, and return the job
redis.call(
'hmset', 'ql:j:' .. jid, 'worker', worker, 'expires', expires,
'state', 'running', 'history', cjson.encode(history))
redis.call('zadd', key .. '-locks', expires, jid)
local job = redis.call(
'hmget', 'ql:j:' .. jid, 'jid', 'klass', 'state', 'queue', 'worker', 'priority',
'expires', 'retries', 'remaining', 'data', 'tags', 'history', 'failure')
local tracked = redis.call('zscore', 'ql:tracked', jid) ~= false
if tracked then
redis.call('publish', 'popped', jid)
end
table.insert(response, cjson.encode({
jid = job[1],
klass = job[2],
state = job[3],
queue = job[4],
worker = job[5] or '',
tracked = tracked,
priority = tonumber(job[6]),
expires = tonumber(job[7]) or 0,
retries = tonumber(job[8]),
remaining = tonumber(job[9]),
data = cjson.decode(job[10]),
tags = cjson.decode(job[11]),
history = cjson.decode(job[12]),
failure = cjson.decode(job[13] or '{}'),
dependents = redis.call('smembers', 'ql:j:' .. jid .. '-dependents'),
-- A job in the waiting state can not have dependencies
-- because it has been popped off of a queue, which
-- means all of its dependencies have been satisfied
dependencies = {}
}))
end
if #keys > 0 then
redis.call('zrem', key .. '-work', unpack(keys))
end
return response