The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
-- Put(1, queue, jid, klass, data, now, delay, [priority, p], [tags, t], [retries, r], [depends, '[...]'])
-- -------------------------------------------------------------------------------------------------------
-- This script takes the name of the queue and then the 
-- info about the work item, and makes sure that it's 
-- enqueued.
--
-- At some point, I'd like to able to provide functionality
-- that enables this to generate a unique ID for this piece
-- of work. As such, client libraries should not expose 
-- setting the id from the user, as this is an implementation
-- detail that's likely to change and users should not grow
-- to depend on it.
--
-- Keys:
--    1) queue name
-- Args:
--    1) jid
--    2) klass
--    3) data
--    4) now
--    5) delay
--    *) [priority, p], [tags, t], [retries, r], [depends, '[...]']

if #KEYS ~= 1 then
	if #KEYS < 1 then
		error('Put(): Expected 1 KEYS argument')
	else
		error('Put(): Got ' .. #KEYS .. ', expected 1 KEYS argument')
	end
end

local queue    = assert(KEYS[1]               , 'Put(): Key "queue" missing')
local jid      = assert(ARGV[1]               , 'Put(): Arg "jid" missing')
local klass    = assert(ARGV[2]               , 'Put(): Arg "klass" missing')
local data     = assert(cjson.decode(ARGV[3]) , 'Put(): Arg "data" missing or not JSON: '    .. tostring(ARGV[3]))
local now      = assert(tonumber(ARGV[4])     , 'Put(): Arg "now" missing or not a number: ' .. tostring(ARGV[4]))
local delay    = assert(tonumber(ARGV[5])     , 'Put(): Arg "delay" not a number: '          .. tostring(ARGV[5]))

-- Read in all the optional parameters
local options = {}
for i = 6, #ARGV, 2 do options[ARGV[i]] = ARGV[i + 1] end

-- Let's see what the old priority, history and tags were
local history, priority, tags, oldqueue, state, failure, retries, worker = unpack(redis.call('hmget', 'ql:j:' .. jid, 'history', 'priority', 'tags', 'queue', 'state', 'failure', 'retries', 'worker'))

-- Sanity check on optional args
retries  = assert(tonumber(options['retries']  or retries or 5) , 'Put(): Arg "retries" not a number: ' .. tostring(options['retries']))
tags     = assert(cjson.decode(options['tags'] or tags or '[]' ), 'Put(): Arg "tags" not JSON'          .. tostring(options['tags']))
priority = assert(tonumber(options['priority'] or priority or 0), 'Put(): Arg "priority" not a number'  .. tostring(options['priority']))
local depends = assert(cjson.decode(options['depends'] or '[]') , 'Put(): Arg "depends" not JSON: '     .. tostring(options['depends']))

-- Delay and depends are not allowed together
if delay > 0 and #depends > 0 then
	error('Put(): "delay" and "depends" are not allowed to be used together')
end

-- Update the history to include this new change
local history = cjson.decode(history or '{}')
table.insert(history, {
	q     = queue,
	put   = math.floor(now)
})

-- If this item was previously in another queue, then we should remove it from there
if oldqueue then
	redis.call('zrem', 'ql:q:' .. oldqueue .. '-work', jid)
	redis.call('zrem', 'ql:q:' .. oldqueue .. '-locks', jid)
	redis.call('zrem', 'ql:q:' .. oldqueue .. '-scheduled', jid)
	redis.call('zrem', 'ql:q:' .. oldqueue .. '-depends', jid)
end

-- If this had previously been given out to a worker,
-- make sure to remove it from that worker's jobs
if worker then
	redis.call('zrem', 'ql:w:' .. worker .. ':jobs', jid)
end

-- If the job was previously in the 'completed' state, then we should remove
-- it from being enqueued for destructination
if state == 'complete' then
	redis.call('zrem', 'ql:completed', jid)
end

-- Add this job to the list of jobs tagged with whatever tags were supplied
for i, tag in ipairs(tags) do
	redis.call('zadd', 'ql:t:' .. tag, now, jid)
	redis.call('zincrby', 'ql:tags', 1, tag)
end

-- If we're in the failed state, remove all of our data
if state == 'failed' then
	failure = cjson.decode(failure)
	-- We need to make this remove it from the failed queues
	redis.call('lrem', 'ql:f:' .. failure.group, 0, jid)
	if redis.call('llen', 'ql:f:' .. failure.group) == 0 then
		redis.call('srem', 'ql:failures', failure.group)
	end
	-- The bin is midnight of the provided day
	-- 24 * 60 * 60 = 86400
	local bin = failure.when - (failure.when % 86400)
	-- We also need to decrement the stats about the queue on
	-- the day that this failure actually happened.
	redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failed'  , -1)
end

-- First, let's save its data
redis.call('hmset', 'ql:j:' .. jid,
    'jid'      , jid,
	'klass'    , klass,
    'data'     , cjson.encode(data),
    'priority' , priority,
    'tags'     , cjson.encode(tags),
    'state'    , ((delay > 0) and 'scheduled') or 'waiting',
    'worker'   , '',
	'expires'  , 0,
    'queue'    , queue,
	'retries'  , retries,
	'remaining', retries,
    'history'  , cjson.encode(history))

-- These are the jids we legitimately have to wait on
for i, j in ipairs(depends) do
	-- Make sure it's something other than 'nil' or complete.
	local state = redis.call('hget', 'ql:j:' .. j, 'state')
	if (state and state ~= 'complete') then
		redis.call('sadd', 'ql:j:' .. j .. '-dependents'  , jid)
		redis.call('sadd', 'ql:j:' .. jid .. '-dependencies', j)
	end
end

-- Now, if a delay was provided, and if it's in the future,
-- then we'll have to schedule it. Otherwise, we're just
-- going to add it to the work queue.
if delay > 0 then
    redis.call('zadd', 'ql:q:' .. queue .. '-scheduled', now + delay, jid)
else
	if redis.call('scard', 'ql:j:' .. jid .. '-dependencies') > 0 then
		redis.call('zadd', 'ql:q:' .. queue .. '-depends', now, jid)
		redis.call('hset', 'ql:j:' .. jid, 'state', 'depends')
	else
		redis.call('zadd', 'ql:q:' .. queue .. '-work', priority - (now / 10000000000), jid)
	end
end

-- Lastly, we're going to make sure that this item is in the
-- set of known queues. We should keep this sorted by the 
-- order in which we saw each of these queues
if redis.call('zscore', 'ql:queues', queue) == false then
	redis.call('zadd', 'ql:queues', now, queue)
end

if redis.call('zscore', 'ql:tracked', jid) ~= false then
	redis.call('publish', 'put', jid)
end

return jid