Files
simple-mail-cleaner/backend/node_modules/bullmq/dist/esm/scripts/addPrioritizedJob-9.js
2026-01-22 15:49:12 +01:00

447 lines
17 KiB
JavaScript

const content = `--[[
Adds a priotitized job to the queue by doing the following:
- Increases the job counter if needed.
- Creates a new job key with the job data.
- Adds the job to the "added" list so that workers gets notified.
Input:
KEYS[1] 'marker',
KEYS[2] 'meta'
KEYS[3] 'id'
KEYS[4] 'prioritized'
KEYS[5] 'delayed'
KEYS[6] 'completed'
KEYS[7] 'active'
KEYS[8] events stream key
KEYS[9] 'pc' priority counter
ARGV[1] msgpacked arguments array
[1] key prefix,
[2] custom id (will not generate one automatically)
[3] name
[4] timestamp
[5] parentKey?
[6] parent dependencies key.
[7] parent? {id, queueKey}
[8] repeat job key
[9] deduplication key
ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Output:
jobId - OK
-5 - Missing parent key
]]
local metaKey = KEYS[2]
local idKey = KEYS[3]
local priorityKey = KEYS[4]
local completedKey = KEYS[6]
local activeKey = KEYS[7]
local eventsKey = KEYS[8]
local priorityCounterKey = KEYS[9]
local jobId
local jobIdKey
local rcall = redis.call
local args = cmsgpack.unpack(ARGV[1])
local data = ARGV[2]
local opts = cmsgpack.unpack(ARGV[3])
local parentKey = args[5]
local parent = args[7]
local repeatJobKey = args[8]
local deduplicationKey = args[9]
local parentData
-- Includes
--[[
Function to add job considering priority.
]]
-- Includes
--[[
Add marker if needed when a job is available.
]]
local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
if not isPausedOrMaxed then
rcall("ZADD", markerKey, 0, "0")
end
end
--[[
Function to get priority score.
]]
local function getPriorityScore(priority, priorityCounterKey)
local prioCounter = rcall("INCR", priorityCounterKey)
return priority * 0x100000000 + prioCounter % 0x100000000
end
local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
isPausedOrMaxed)
local score = getPriorityScore(priority, priorityCounterKey)
rcall("ZADD", prioritizedKey, score, jobId)
addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
end
--[[
Function to debounce a job.
]]
-- Includes
--[[
Function to remove job keys.
]]
local function removeJobKeys(jobKey)
return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
end
local function removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents, currentDeduplicatedJobId,
jobId, deduplicationId, prefix)
if rcall("ZREM", delayedKey, currentDeduplicatedJobId) > 0 then
removeJobKeys(prefix .. currentDeduplicatedJobId)
rcall("XADD", eventsKey, "*", "event", "removed", "jobId", currentDeduplicatedJobId,
"prev", "delayed")
-- TODO remove debounced event in next breaking change
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
jobId, "debounceId", deduplicationId)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
jobId, "deduplicationId", deduplicationId, "deduplicatedJobId", currentDeduplicatedJobId)
return true
end
return false
end
local function deduplicateJob(deduplicationOpts, jobId, delayedKey, deduplicationKey, eventsKey, maxEvents,
prefix)
local deduplicationId = deduplicationOpts and deduplicationOpts['id']
if deduplicationId then
local ttl = deduplicationOpts['ttl']
if deduplicationOpts['replace'] then
if ttl and ttl > 0 then
local currentDebounceJobId = rcall('GET', deduplicationKey)
if currentDebounceJobId then
local isRemoved = removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents,
currentDebounceJobId, jobId, deduplicationId, prefix)
if isRemoved then
if deduplicationOpts['extend'] then
rcall('SET', deduplicationKey, jobId, 'PX', ttl)
else
rcall('SET', deduplicationKey, jobId, 'KEEPTTL')
end
return
else
return currentDebounceJobId
end
else
rcall('SET', deduplicationKey, jobId, 'PX', ttl)
return
end
else
local currentDebounceJobId = rcall('GET', deduplicationKey)
if currentDebounceJobId then
local isRemoved = removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents,
currentDebounceJobId, jobId, deduplicationId, prefix)
if isRemoved then
rcall('SET', deduplicationKey, jobId)
return
else
return currentDebounceJobId
end
else
rcall('SET', deduplicationKey, jobId)
return
end
end
else
local deduplicationKeyExists
if ttl and ttl > 0 then
if deduplicationOpts['extend'] then
local currentDebounceJobId = rcall('GET', deduplicationKey)
if currentDebounceJobId then
rcall('SET', deduplicationKey, currentDebounceJobId, 'PX', ttl)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced",
"jobId", currentDebounceJobId, "debounceId", deduplicationId)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
return currentDebounceJobId
else
rcall('SET', deduplicationKey, jobId, 'PX', ttl)
return
end
else
deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX')
end
else
deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
end
if deduplicationKeyExists then
local currentDebounceJobId = rcall('GET', deduplicationKey)
-- TODO remove debounced event in next breaking change
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
currentDebounceJobId, "debounceId", deduplicationId)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
return currentDebounceJobId
end
end
end
end
--[[
Function to store a job
]]
local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
parentKey, parentData, repeatJobKey)
local jsonOpts = cjson.encode(opts)
local delay = opts['delay'] or 0
local priority = opts['priority'] or 0
local debounceId = opts['de'] and opts['de']['id']
local optionalValues = {}
if parentKey ~= nil then
table.insert(optionalValues, "parentKey")
table.insert(optionalValues, parentKey)
table.insert(optionalValues, "parent")
table.insert(optionalValues, parentData)
end
if repeatJobKey then
table.insert(optionalValues, "rjk")
table.insert(optionalValues, repeatJobKey)
end
if debounceId then
table.insert(optionalValues, "deid")
table.insert(optionalValues, debounceId)
end
rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
"timestamp", timestamp, "delay", delay, "priority", priority,
unpack(optionalValues))
rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
return delay, priority
end
--[[
Function to get max events value or set by default 10000.
]]
local function getOrSetMaxEvents(metaKey)
local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
if not maxEvents then
maxEvents = 10000
rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
end
return maxEvents
end
--[[
Function to handle the case when job is duplicated.
]]
-- Includes
--[[
This function is used to update the parent's dependencies if the job
is already completed and about to be ignored. The parent must get its
dependencies updated to avoid the parent job being stuck forever in
the waiting-children state.
]]
-- Includes
--[[
Validate and move or add dependencies to parent.
]]
-- Includes
--[[
Validate and move parent to a wait status (waiting, delayed or prioritized)
if no pending dependencies.
]]
-- Includes
--[[
Validate and move parent to a wait status (waiting, delayed or prioritized) if needed.
]]
-- Includes
--[[
Move parent to a wait status (wait, prioritized or delayed)
]]
-- Includes
--[[
Add delay marker if needed.
]]
-- Includes
--[[
Function to return the next delayed job timestamp.
]]
local function getNextDelayedTimestamp(delayedKey)
local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
if #result then
local nextTimestamp = tonumber(result[2])
if nextTimestamp ~= nil then
return nextTimestamp / 0x1000
end
end
end
local function addDelayMarkerIfNeeded(markerKey, delayedKey)
local nextTimestamp = getNextDelayedTimestamp(delayedKey)
if nextTimestamp ~= nil then
-- Replace the score of the marker with the newest known
-- next timestamp.
rcall("ZADD", markerKey, nextTimestamp, "1")
end
end
--[[
Function to add job in target list and add marker if needed.
]]
-- Includes
local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
rcall(pushCmd, targetKey, jobId)
addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
end
--[[
Function to check if queue is paused or maxed
(since an empty list and !EXISTS are not really the same).
]]
local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
if queueAttributes[1] then
return true
else
if queueAttributes[2] then
local activeCount = rcall("LLEN", activeKey)
return activeCount >= tonumber(queueAttributes[2])
end
end
return false
end
--[[
Function to check for the meta.paused key to decide if we are paused or not
(since an empty list and !EXISTS are not really the same).
]]
local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
if queueAttributes[1] then
return pausedKey, true, queueAttributes[3], queueAttributes[4]
else
if queueAttributes[2] then
local activeCount = rcall("LLEN", activeKey)
if activeCount >= tonumber(queueAttributes[2]) then
return waitKey, true, queueAttributes[3], queueAttributes[4]
else
return waitKey, false, queueAttributes[3], queueAttributes[4]
end
end
end
return waitKey, false, queueAttributes[3], queueAttributes[4]
end
local function moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
local parentWaitKey = parentQueueKey .. ":wait"
local parentPausedKey = parentQueueKey .. ":paused"
local parentActiveKey = parentQueueKey .. ":active"
local parentMetaKey = parentQueueKey .. ":meta"
local parentMarkerKey = parentQueueKey .. ":marker"
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
local priority = tonumber(jobAttributes[1]) or 0
local delay = tonumber(jobAttributes[2]) or 0
if delay > 0 then
local delayedTimestamp = tonumber(timestamp) + delay
local score = delayedTimestamp * 0x1000
local parentDelayedKey = parentQueueKey .. ":delayed"
rcall("ZADD", parentDelayedKey, score, parentId)
rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, "delay",
delayedTimestamp)
addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
else
if priority == 0 then
local parentTarget, isParentPausedOrMaxed = getTargetQueueList(parentMetaKey, parentActiveKey,
parentWaitKey, parentPausedKey)
addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId)
else
local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)
addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, parentId,
parentQueueKey .. ":pc", isPausedOrMaxed)
end
rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev",
"waiting-children")
end
end
local function moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
if rcall("EXISTS", parentKey) == 1 then
local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
if rcall("ZSCORE", parentWaitingChildrenKey, parentId) then
rcall("ZREM", parentWaitingChildrenKey, parentId)
moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
end
end
end
local function moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey,
parentId, timestamp)
local doNotHavePendingDependencies = rcall("SCARD", parentDependenciesKey) == 0
if doNotHavePendingDependencies then
moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
end
end
local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
parentId, jobIdKey, returnvalue, timestamp )
local processedSet = parentKey .. ":processed"
rcall("HSET", processedSet, jobIdKey, returnvalue)
moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
end
local function updateExistingJobsParent(parentKey, parent, parentData,
parentDependenciesKey, completedKey,
jobIdKey, jobId, timestamp)
if parentKey ~= nil then
if rcall("ZSCORE", completedKey, jobId) then
local returnvalue = rcall("HGET", jobIdKey, "returnvalue")
updateParentDepsIfNeeded(parentKey, parent['queueKey'],
parentDependenciesKey, parent['id'],
jobIdKey, returnvalue, timestamp)
else
if parentDependenciesKey ~= nil then
rcall("SADD", parentDependenciesKey, jobIdKey)
end
end
rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
end
end
local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,
parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)
local existedParentKey = rcall("HGET", jobKey, "parentKey")
if not existedParentKey or existedParentKey == currentParentKey then
updateExistingJobsParent(currentParentKey, currentParent, parentData,
parentDependenciesKey, completedKey, jobKey,
jobId, timestamp)
else
if currentParentKey ~= nil and currentParentKey ~= existedParentKey
and (rcall("EXISTS", existedParentKey) == 1) then
return -7
end
end
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
"duplicated", "jobId", jobId)
return jobId .. "" -- convert to string
end
if parentKey ~= nil then
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
parentData = cjson.encode(parent)
end
local jobCounter = rcall("INCR", idKey)
local maxEvents = getOrSetMaxEvents(metaKey)
local parentDependenciesKey = args[6]
local timestamp = args[4]
if args[2] == "" then
jobId = jobCounter
jobIdKey = args[1] .. jobId
else
jobId = args[2]
jobIdKey = args[1] .. jobId
if rcall("EXISTS", jobIdKey) == 1 then
return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
parentData, parentDependenciesKey, completedKey, eventsKey,
maxEvents, timestamp)
end
end
local deduplicationJobId = deduplicateJob(opts['de'], jobId, KEYS[5],
deduplicationKey, eventsKey, maxEvents, args[1])
if deduplicationJobId then
return deduplicationJobId
end
-- Store the job.
local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
opts, timestamp, parentKey, parentData,
repeatJobKey)
-- Add the job to the prioritized set
local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey)
addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed)
-- Emit waiting event
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
"jobId", jobId)
-- Check if this job is a child of another job, if so add it to the parents dependencies
if parentDependenciesKey ~= nil then
rcall("SADD", parentDependenciesKey, jobIdKey)
end
return jobId .. "" -- convert to string
`;
export const addPrioritizedJob = {
name: 'addPrioritizedJob',
content,
keys: 9,
};
//# sourceMappingURL=addPrioritizedJob-9.js.map