const content = `--[[ Move next job to be processed to active, lock it and fetch its data. The job may be delayed, in that case we need to move it to the delayed set instead. This operation guarantees that the worker owns the job during the lock expiration time. The worker is responsible of keeping the lock fresh so that no other worker picks this job again. Input: KEYS[1] wait key KEYS[2] active key KEYS[3] prioritized key KEYS[4] stream events key KEYS[5] stalled key -- Rate limiting KEYS[6] rate limiter key KEYS[7] delayed key -- Delayed jobs KEYS[8] paused key KEYS[9] meta key KEYS[10] pc priority counter -- Marker KEYS[11] marker key -- Arguments ARGV[1] key prefix ARGV[2] timestamp ARGV[3] opts opts - token - lock token opts - lockDuration opts - limiter opts - name - worker name ]] local rcall = redis.call local waitKey = KEYS[1] local activeKey = KEYS[2] local eventStreamKey = KEYS[4] local rateLimiterKey = KEYS[6] local delayedKey = KEYS[7] local opts = cmsgpack.unpack(ARGV[3]) -- Includes --[[ Function to return the next delayed job timestamp. ]] local function getNextDelayedTimestamp(delayedKey) local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES") if #result then local nextTimestamp = tonumber(result[2]) if nextTimestamp ~= nil then return nextTimestamp / 0x1000 end end end --[[ Function to get current rate limit ttl. ]] local function getRateLimitTTL(maxJobs, rateLimiterKey) if maxJobs and maxJobs <= tonumber(rcall("GET", rateLimiterKey) or 0) then local pttl = rcall("PTTL", rateLimiterKey) if pttl == 0 then rcall("DEL", rateLimiterKey) end if pttl > 0 then return pttl end end return 0 end --[[ Function to check for the meta.paused key to decide if we are paused or not (since an empty list and !EXISTS are not really the same). ]] local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey) local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration") if queueAttributes[1] then return pausedKey, true, queueAttributes[3], queueAttributes[4] else if queueAttributes[2] then local activeCount = rcall("LLEN", activeKey) if activeCount >= tonumber(queueAttributes[2]) then return waitKey, true, queueAttributes[3], queueAttributes[4] else return waitKey, false, queueAttributes[3], queueAttributes[4] end end end return waitKey, false, queueAttributes[3], queueAttributes[4] end --[[ Function to move job from prioritized state to active. ]] local function moveJobFromPrioritizedToActive(priorityKey, activeKey, priorityCounterKey) local prioritizedJob = rcall("ZPOPMIN", priorityKey) if #prioritizedJob > 0 then rcall("LPUSH", activeKey, prioritizedJob[1]) return prioritizedJob[1] else rcall("DEL", priorityCounterKey) end end --[[ Function to move job from wait state to active. Input: opts - token - lock token opts - lockDuration opts - limiter ]] -- Includes --[[ Add marker if needed when a job is available. ]] local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) if not isPausedOrMaxed then rcall("ZADD", markerKey, 0, "0") end end local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey, jobId, processedOn, maxJobs, limiterDuration, markerKey, opts) local jobKey = keyPrefix .. jobId -- Check if we need to perform rate limiting. if maxJobs then local jobCounter = tonumber(rcall("INCR", rateLimiterKey)) if jobCounter == 1 then local integerDuration = math.floor(math.abs(limiterDuration)) rcall("PEXPIRE", rateLimiterKey, integerDuration) end end -- get a lock if opts['token'] ~= "0" then local lockKey = jobKey .. ':lock' rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration']) end local optionalValues = {} if opts['name'] then -- Set "processedBy" field to the worker name table.insert(optionalValues, "pb") table.insert(optionalValues, opts['name']) end rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting") rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues)) rcall("HINCRBY", jobKey, "ats", 1) addBaseMarkerIfNeeded(markerKey, false) -- rate limit delay must be 0 in this case to prevent adding more delay -- when job that is moved to active needs to be processed return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data end --[[ Updates the delay set, by moving delayed jobs that should be processed now to "wait". Events: 'waiting' ]] -- Includes --[[ Function to add job in target list and add marker if needed. ]] -- Includes local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId) rcall(pushCmd, targetKey, jobId) addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) end --[[ Function to add job considering priority. ]] -- Includes --[[ Function to get priority score. ]] local function getPriorityScore(priority, priorityCounterKey) local prioCounter = rcall("INCR", priorityCounterKey) return priority * 0x100000000 + prioCounter % 0x100000000 end local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPausedOrMaxed) local score = getPriorityScore(priority, priorityCounterKey) rcall("ZADD", prioritizedKey, score, jobId) addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) end -- Try to get as much as 1000 jobs at once local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey, eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused) local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000) if (#jobs > 0) then rcall("ZREM", delayedKey, unpack(jobs)) for _, jobId in ipairs(jobs) do local jobKey = prefix .. jobId local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0 if priority == 0 then -- LIFO or FIFO rcall("LPUSH", targetKey, jobId) else local score = getPriorityScore(priority, priorityCounterKey) rcall("ZADD", prioritizedKey, score, jobId) end -- Emit waiting event rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId", jobId, "prev", "delayed") rcall("HSET", jobKey, "delay", 0) end addBaseMarkerIfNeeded(markerKey, isPaused) end end local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration = getTargetQueueList(KEYS[9], activeKey, waitKey, KEYS[8]) -- Check if there are delayed jobs that we can move to wait. local markerKey = KEYS[11] promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1], ARGV[2], KEYS[10], isPausedOrMaxed) local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max'])) local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey) -- Check if we are rate limited first. if expireTime > 0 then return {0, 0, expireTime, 0} end -- paused or maxed queue if isPausedOrMaxed then return {0, 0, 0, 0} end local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration -- no job ID, try non-blocking move from wait to active local jobId = rcall("RPOPLPUSH", waitKey, activeKey) -- Markers in waitlist DEPRECATED in v5: Will be completely removed in v6. if jobId and string.sub(jobId, 1, 2) == "0:" then rcall("LREM", activeKey, 1, jobId) jobId = rcall("RPOPLPUSH", waitKey, activeKey) end if jobId then return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2], maxJobs, limiterDuration, markerKey, opts) else jobId = moveJobFromPrioritizedToActive(KEYS[3], activeKey, KEYS[10]) if jobId then return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2], maxJobs, limiterDuration, markerKey, opts) end end -- Return the timestamp for the next delayed job if any. local nextTimestamp = getNextDelayedTimestamp(delayedKey) if nextTimestamp ~= nil then return {0, 0, 0, nextTimestamp} end return {0, 0, 0, 0} `; export const moveToActive = { name: 'moveToActive', content, keys: 11, }; //# sourceMappingURL=moveToActive-11.js.map