const content = `--[[ Remove jobs from the specific set. Input: KEYS[1] set key, KEYS[2] events stream key KEYS[3] repeat key ARGV[1] jobKey prefix ARGV[2] timestamp ARGV[3] limit the number of jobs to be removed. 0 is unlimited ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed' ]] local rcall = redis.call local repeatKey = KEYS[3] local rangeStart = 0 local rangeEnd = -1 local limit = tonumber(ARGV[3]) -- If we're only deleting _n_ items, avoid retrieving all items -- for faster performance -- -- Start from the tail of the list, since that's where oldest elements -- are generally added for FIFO lists if limit > 0 then rangeStart = -1 - limit + 1 rangeEnd = -1 end -- Includes --[[ Function to clean job list. Returns jobIds and deleted count number. ]] -- Includes --[[ Function to get the latest saved timestamp. ]] local function getTimestamp(jobKey, attributes) if #attributes == 1 then return rcall("HGET", jobKey, attributes[1]) end local jobTs for _, ts in ipairs(rcall("HMGET", jobKey, unpack(attributes))) do if (ts) then jobTs = ts break end end return jobTs end --[[ Function to check if the job belongs to a job scheduler and current delayed job matches with jobId ]] local function isJobSchedulerJob(jobId, jobKey, jobSchedulersKey) local repeatJobKey = rcall("HGET", jobKey, "rjk") if repeatJobKey then local prevMillis = rcall("ZSCORE", jobSchedulersKey, repeatJobKey) if prevMillis then local currentDelayedJobId = "repeat:" .. repeatJobKey .. ":" .. prevMillis return jobId == currentDelayedJobId end end return false end --[[ Function to remove job. ]] -- Includes --[[ Function to remove deduplication key if needed when a job is being removed. ]] local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey, jobId, deduplicationId) if deduplicationId then local deduplicationKey = prefixKey .. "de:" .. deduplicationId local currentJobId = rcall('GET', deduplicationKey) if currentJobId and currentJobId == jobId then return rcall("DEL", deduplicationKey) end end end --[[ Function to remove job keys. ]] local function removeJobKeys(jobKey) return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies', jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful') end --[[ Check if this job has a parent. If so we will just remove it from the parent child list, but if it is the last child we should move the parent to "wait/paused" which requires code from "moveToFinished" ]] -- Includes --[[ Function to add job in target list and add marker if needed. ]] -- Includes --[[ Add marker if needed when a job is available. ]] local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) if not isPausedOrMaxed then rcall("ZADD", markerKey, 0, "0") end end local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId) rcall(pushCmd, targetKey, jobId) addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) end --[[ Functions to destructure job key. Just a bit of warning, these functions may be a bit slow and affect performance significantly. ]] local getJobIdFromKey = function (jobKey) return string.match(jobKey, ".*:(.*)") end local getJobKeyPrefix = function (jobKey, jobId) return string.sub(jobKey, 0, #jobKey - #jobId) end --[[ Function to check for the meta.paused key to decide if we are paused or not (since an empty list and !EXISTS are not really the same). ]] local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey) local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration") if queueAttributes[1] then return pausedKey, true, queueAttributes[3], queueAttributes[4] else if queueAttributes[2] then local activeCount = rcall("LLEN", activeKey) if activeCount >= tonumber(queueAttributes[2]) then return waitKey, true, queueAttributes[3], queueAttributes[4] else return waitKey, false, queueAttributes[3], queueAttributes[4] end end end return waitKey, false, queueAttributes[3], queueAttributes[4] end local function _moveParentToWait(parentPrefix, parentId, emitEvent) local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active", parentPrefix .. "wait", parentPrefix .. "paused") addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId) if emitEvent then local parentEventStream = parentPrefix .. "events" rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children") end end local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId) if parentKey then local parentDependenciesKey = parentKey .. ":dependencies" local result = rcall("SREM", parentDependenciesKey, jobKey) if result > 0 then local pendingDependencies = rcall("SCARD", parentDependenciesKey) if pendingDependencies == 0 then local parentId = getJobIdFromKey(parentKey) local parentPrefix = getJobKeyPrefix(parentKey, parentId) local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId) if numRemovedElements == 1 then if hard then -- remove parent in same queue if parentPrefix == baseKey then removeParentDependencyKey(parentKey, hard, nil, baseKey, nil) removeJobKeys(parentKey) if debounceId then rcall("DEL", parentPrefix .. "de:" .. debounceId) end else _moveParentToWait(parentPrefix, parentId) end else _moveParentToWait(parentPrefix, parentId, true) end end end return true end else local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid") local missedParentKey = parentAttributes[1] if( (type(missedParentKey) == "string") and missedParentKey ~= "" and (rcall("EXISTS", missedParentKey) == 1)) then local parentDependenciesKey = missedParentKey .. ":dependencies" local result = rcall("SREM", parentDependenciesKey, jobKey) if result > 0 then local pendingDependencies = rcall("SCARD", parentDependenciesKey) if pendingDependencies == 0 then local parentId = getJobIdFromKey(missedParentKey) local parentPrefix = getJobKeyPrefix(missedParentKey, parentId) local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId) if numRemovedElements == 1 then if hard then if parentPrefix == baseKey then removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil) removeJobKeys(missedParentKey) if parentAttributes[2] then rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2]) end else _moveParentToWait(parentPrefix, parentId) end else _moveParentToWait(parentPrefix, parentId, true) end end end return true end end end return false end local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey) local jobKey = baseKey .. jobId removeParentDependencyKey(jobKey, hard, nil, baseKey) if shouldRemoveDeduplicationKey then local deduplicationId = rcall("HGET", jobKey, "deid") removeDeduplicationKeyIfNeededOnRemoval(baseKey, jobId, deduplicationId) end removeJobKeys(jobKey) end local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, isWaiting, jobSchedulersKey) local jobs = rcall("LRANGE", listKey, rangeStart, rangeEnd) local deleted = {} local deletedCount = 0 local jobTS local deletionMarker = '' local jobIdsLen = #jobs for i, job in ipairs(jobs) do if limit > 0 and deletedCount >= limit then break end local jobKey = jobKeyPrefix .. job if (isWaiting or rcall("EXISTS", jobKey .. ":lock") == 0) and not isJobSchedulerJob(job, jobKey, jobSchedulersKey) then -- Find the right timestamp of the job to compare to maxTimestamp: -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed -- * processedOn represents when the job was last attempted, but it doesn't get populated until -- the job is first tried -- * timestamp is the original job submission time -- Fetch all three of these (in that order) and use the first one that is set so that we'll leave jobs -- that have been active within the grace period: jobTS = getTimestamp(jobKey, {"finishedOn", "processedOn", "timestamp"}) if (not jobTS or jobTS <= timestamp) then -- replace the entry with a deletion marker; the actual deletion will -- occur at the end of the script rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker) removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]]) deletedCount = deletedCount + 1 table.insert(deleted, job) end end end rcall("LREM", listKey, 0, deletionMarker) return {deleted, deletedCount} end --[[ Function to clean job set. Returns jobIds and deleted count number. ]] -- Includes --[[ Function to loop in batches. Just a bit of warning, some commands as ZREM could receive a maximum of 7000 parameters per call. ]] local function batches(n, batchSize) local i = 0 return function() local from = i * batchSize + 1 i = i + 1 if (from <= n) then local to = math.min(from + batchSize - 1, n) return from, to end end end --[[ We use ZRANGEBYSCORE to make the case where we're deleting a limited number of items in a sorted set only run a single iteration. If we simply used ZRANGE, we may take a long time traversing through jobs that are within the grace period. ]] local function getJobsInZset(zsetKey, rangeEnd, limit) if limit > 0 then return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd, "LIMIT", 0, limit) else return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd) end end local function cleanSet( setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attributes, isFinished, jobSchedulersKey) local jobs = getJobsInZset(setKey, rangeEnd, limit) local deleted = {} local deletedCount = 0 local jobTS for i, job in ipairs(jobs) do if limit > 0 and deletedCount >= limit then break end local jobKey = jobKeyPrefix .. job -- Extract a Job Scheduler Id from jobId ("repeat:job-scheduler-id:millis") -- and check if it is in the scheduled jobs if not (jobSchedulersKey and isJobSchedulerJob(job, jobKey, jobSchedulersKey)) then if isFinished then removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] ) deletedCount = deletedCount + 1 table.insert(deleted, job) else -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed jobTS = getTimestamp(jobKey, attributes) if (not jobTS or jobTS <= timestamp) then removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] ) deletedCount = deletedCount + 1 table.insert(deleted, job) end end end end if (#deleted > 0) then for from, to in batches(#deleted, 7000) do rcall("ZREM", setKey, unpack(deleted, from, to)) end end return {deleted, deletedCount} end local result if ARGV[4] == "active" then result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false --[[ hasFinished ]], repeatKey) elseif ARGV[4] == "delayed" then rangeEnd = "+inf" result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, {"processedOn", "timestamp"}, false --[[ hasFinished ]], repeatKey) elseif ARGV[4] == "prioritized" then rangeEnd = "+inf" result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, {"timestamp"}, false --[[ hasFinished ]], repeatKey) elseif ARGV[4] == "wait" or ARGV[4] == "paused" then result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true --[[ hasFinished ]], repeatKey) else rangeEnd = ARGV[2] -- No need to pass repeat key as in that moment job won't be related to a job scheduler result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, {"finishedOn"}, true --[[ hasFinished ]]) end rcall("XADD", KEYS[2], "*", "event", "cleaned", "count", result[2]) return result[1] `; export const cleanJobsInSet = { name: 'cleanJobsInSet', content, keys: 3, }; //# sourceMappingURL=cleanJobsInSet-3.js.map