"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); const utils_1 = require("../utils"); const util_1 = require("./util"); const calculateSlot = require("cluster-key-slot"); const ShardedSubscriber_1 = require("./ShardedSubscriber"); const debug = (0, utils_1.Debug)("cluster:subscriberGroup"); /** * Redis distinguishes between "normal" and sharded PubSub. When using the normal PubSub feature, * exactly one subscriber exists per cluster instance because the Redis cluster bus forwards * messages between shards. Sharded PubSub removes this limitation by making each shard * responsible for its own messages. * * This class coordinates one ShardedSubscriber per master node in the cluster, providing * sharded PubSub support while keeping the public API backward compatible. */ class ClusterSubscriberGroup { /** * Register callbacks * * @param cluster */ constructor(subscriberGroupEmitter) { this.subscriberGroupEmitter = subscriberGroupEmitter; this.shardedSubscribers = new Map(); this.clusterSlots = []; // Simple [min, max] slot ranges aren't enough because you can migrate single slots this.subscriberToSlotsIndex = new Map(); this.channels = new Map(); this.failedAttemptsByNode = new Map(); // Only latest pending reset kept; throttled by refreshSlotsCache's isRefreshing + backoff delay this.isResetting = false; this.pendingReset = null; /** * Handles failed subscriber connections by emitting an event to refresh the slots cache * after a backoff period. * * @param error * @param nodeKey */ this.handleSubscriberConnectFailed = (error, nodeKey) => { const currentAttempts = this.failedAttemptsByNode.get(nodeKey) || 0; const failedAttempts = currentAttempts + 1; this.failedAttemptsByNode.set(nodeKey, failedAttempts); const attempts = Math.min(failedAttempts, ClusterSubscriberGroup.MAX_RETRY_ATTEMPTS); const backoff = Math.min(ClusterSubscriberGroup.BASE_BACKOFF_MS * 2 ** attempts, ClusterSubscriberGroup.MAX_BACKOFF_MS); const jitter = Math.floor((Math.random() - 0.5) * (backoff * 0.5)); const delay = Math.max(0, backoff + jitter); debug("Failed to connect subscriber for %s. Refreshing slots in %dms", nodeKey, delay); this.subscriberGroupEmitter.emit("subscriberConnectFailed", { delay, error, }); }; /** * Handles successful subscriber connections by resetting the failed attempts counter. * * @param nodeKey */ this.handleSubscriberConnectSucceeded = (nodeKey) => { this.failedAttemptsByNode.delete(nodeKey); }; } /** * Get the responsible subscriber. * * @param slot */ getResponsibleSubscriber(slot) { const nodeKey = this.clusterSlots[slot][0]; return this.shardedSubscribers.get(nodeKey); } /** * Adds a channel for which this subscriber group is responsible * * @param channels */ addChannels(channels) { const slot = calculateSlot(channels[0]); // Check if the all channels belong to the same slot and otherwise reject the operation for (const c of channels) { if (calculateSlot(c) !== slot) { return -1; } } const currChannels = this.channels.get(slot); if (!currChannels) { this.channels.set(slot, channels); } else { this.channels.set(slot, currChannels.concat(channels)); } return Array.from(this.channels.values()).reduce((sum, array) => sum + array.length, 0); } /** * Removes channels for which the subscriber group is responsible by optionally unsubscribing * @param channels */ removeChannels(channels) { const slot = calculateSlot(channels[0]); // Check if the all channels belong to the same slot and otherwise reject the operation for (const c of channels) { if (calculateSlot(c) !== slot) { return -1; } } const slotChannels = this.channels.get(slot); if (slotChannels) { const updatedChannels = slotChannels.filter((c) => !channels.includes(c)); this.channels.set(slot, updatedChannels); } return Array.from(this.channels.values()).reduce((sum, array) => sum + array.length, 0); } /** * Disconnect all subscribers and clear some of the internal state. */ stop() { for (const s of this.shardedSubscribers.values()) { s.stop(); } // Clear subscriber instances and pending operations. // Channels are preserved for resubscription on reconnect. this.pendingReset = null; this.shardedSubscribers.clear(); this.subscriberToSlotsIndex.clear(); } /** * Start all not yet started subscribers */ start() { const startPromises = []; for (const s of this.shardedSubscribers.values()) { if (!s.isStarted()) { startPromises.push(s .start() .then(() => { this.handleSubscriberConnectSucceeded(s.getNodeKey()); }) .catch((err) => { this.handleSubscriberConnectFailed(err, s.getNodeKey()); })); } } return Promise.all(startPromises); } /** * Resets the subscriber group by disconnecting all subscribers that are no longer needed and connecting new ones. */ async reset(clusterSlots, clusterNodes) { if (this.isResetting) { this.pendingReset = { slots: clusterSlots, nodes: clusterNodes }; return; } this.isResetting = true; try { const hasTopologyChanged = this._refreshSlots(clusterSlots); const hasFailedSubscribers = this.hasUnhealthySubscribers(); if (!hasTopologyChanged && !hasFailedSubscribers) { debug("No topology change detected or failed subscribers. Skipping reset."); return; } // For each of the sharded subscribers for (const [nodeKey, shardedSubscriber] of this.shardedSubscribers) { if ( // If the subscriber is still responsible for a slot range and is running then keep it this.subscriberToSlotsIndex.has(nodeKey) && shardedSubscriber.isStarted()) { debug("Skipping deleting subscriber for %s", nodeKey); continue; } debug("Removing subscriber for %s", nodeKey); // Otherwise stop the subscriber and remove it shardedSubscriber.stop(); this.shardedSubscribers.delete(nodeKey); this.subscriberGroupEmitter.emit("-subscriber"); } const startPromises = []; // For each node in slots cache for (const [nodeKey, _] of this.subscriberToSlotsIndex) { // If we already have a subscriber for this node then keep it if (this.shardedSubscribers.has(nodeKey)) { debug("Skipping creating new subscriber for %s", nodeKey); continue; } debug("Creating new subscriber for %s", nodeKey); // Otherwise create a new subscriber const redis = clusterNodes.find((node) => { return (0, util_1.getNodeKey)(node.options) === nodeKey; }); if (!redis) { debug("Failed to find node for key %s", nodeKey); continue; } const sub = new ShardedSubscriber_1.default(this.subscriberGroupEmitter, redis.options); this.shardedSubscribers.set(nodeKey, sub); startPromises.push(sub .start() .then(() => { this.handleSubscriberConnectSucceeded(nodeKey); }) .catch((error) => { this.handleSubscriberConnectFailed(error, nodeKey); })); this.subscriberGroupEmitter.emit("+subscriber"); } // It's vital to await the start promises before resubscribing // Otherwise we might try to resubscribe to a subscriber that is not yet connected // This can cause a race condition await Promise.all(startPromises); this._resubscribe(); this.subscriberGroupEmitter.emit("subscribersReady"); } finally { this.isResetting = false; if (this.pendingReset) { const { slots, nodes } = this.pendingReset; this.pendingReset = null; await this.reset(slots, nodes); } } } /** * Refreshes the subscriber-related slot ranges * * Returns false if no refresh was needed * * @param targetSlots */ _refreshSlots(targetSlots) { //If there was an actual change, then reassign the slot ranges if (this._slotsAreEqual(targetSlots)) { debug("Nothing to refresh because the new cluster map is equal to the previous one."); return false; } debug("Refreshing the slots of the subscriber group."); //Rebuild the slots index this.subscriberToSlotsIndex = new Map(); for (let slot = 0; slot < targetSlots.length; slot++) { const node = targetSlots[slot][0]; if (!this.subscriberToSlotsIndex.has(node)) { this.subscriberToSlotsIndex.set(node, []); } this.subscriberToSlotsIndex.get(node).push(Number(slot)); } //Update the cached slots map this.clusterSlots = JSON.parse(JSON.stringify(targetSlots)); return true; } /** * Resubscribes to the previous channels * * @private */ _resubscribe() { if (this.shardedSubscribers) { this.shardedSubscribers.forEach((s, nodeKey) => { const subscriberSlots = this.subscriberToSlotsIndex.get(nodeKey); if (subscriberSlots) { //Resubscribe on the underlying connection subscriberSlots.forEach((ss) => { //Might return null if being disconnected const redis = s.getInstance(); const channels = this.channels.get(ss); if (channels && channels.length > 0) { if (redis.status === "end") { return; } if (redis.status === "ready") { redis.ssubscribe(...channels).catch((err) => { // TODO: Should we emit an error event here? debug("Failed to ssubscribe on node %s: %s", nodeKey, err); }); } else { redis.once("ready", () => { redis.ssubscribe(...channels).catch((err) => { // TODO: Should we emit an error event here? debug("Failed to ssubscribe on node %s: %s", nodeKey, err); }); }); } } }); } }); } } /** * Deep equality of the cluster slots objects * * @param other * @private */ _slotsAreEqual(other) { if (this.clusterSlots === undefined) { return false; } else { return JSON.stringify(this.clusterSlots) === JSON.stringify(other); } } /** * Checks if any subscribers are in an unhealthy state. * * A subscriber is considered unhealthy if: * - It exists but is not started (failed/disconnected) * - It's missing entirely for a node that should have one * * @returns true if any subscribers need to be recreated */ hasUnhealthySubscribers() { const hasFailedSubscribers = Array.from(this.shardedSubscribers.values()).some((sub) => !sub.isStarted()); const hasMissingSubscribers = Array.from(this.subscriberToSlotsIndex.keys()).some((nodeKey) => !this.shardedSubscribers.has(nodeKey)); return hasFailedSubscribers || hasMissingSubscribers; } } exports.default = ClusterSubscriberGroup; // Retry strategy ClusterSubscriberGroup.MAX_RETRY_ATTEMPTS = 10; ClusterSubscriberGroup.MAX_BACKOFF_MS = 2000; ClusterSubscriberGroup.BASE_BACKOFF_MS = 100;