322 lines
13 KiB
JavaScript
322 lines
13 KiB
JavaScript
"use strict";
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
const utils_1 = require("../utils");
|
|
const util_1 = require("./util");
|
|
const calculateSlot = require("cluster-key-slot");
|
|
const ShardedSubscriber_1 = require("./ShardedSubscriber");
|
|
const debug = (0, utils_1.Debug)("cluster:subscriberGroup");
|
|
/**
|
|
* Redis distinguishes between "normal" and sharded PubSub. When using the normal PubSub feature,
|
|
* exactly one subscriber exists per cluster instance because the Redis cluster bus forwards
|
|
* messages between shards. Sharded PubSub removes this limitation by making each shard
|
|
* responsible for its own messages.
|
|
*
|
|
* This class coordinates one ShardedSubscriber per master node in the cluster, providing
|
|
* sharded PubSub support while keeping the public API backward compatible.
|
|
*/
|
|
class ClusterSubscriberGroup {
|
|
/**
|
|
* Register callbacks
|
|
*
|
|
* @param cluster
|
|
*/
|
|
constructor(subscriberGroupEmitter) {
|
|
this.subscriberGroupEmitter = subscriberGroupEmitter;
|
|
this.shardedSubscribers = new Map();
|
|
this.clusterSlots = [];
|
|
// Simple [min, max] slot ranges aren't enough because you can migrate single slots
|
|
this.subscriberToSlotsIndex = new Map();
|
|
this.channels = new Map();
|
|
this.failedAttemptsByNode = new Map();
|
|
// Only latest pending reset kept; throttled by refreshSlotsCache's isRefreshing + backoff delay
|
|
this.isResetting = false;
|
|
this.pendingReset = null;
|
|
/**
|
|
* Handles failed subscriber connections by emitting an event to refresh the slots cache
|
|
* after a backoff period.
|
|
*
|
|
* @param error
|
|
* @param nodeKey
|
|
*/
|
|
this.handleSubscriberConnectFailed = (error, nodeKey) => {
|
|
const currentAttempts = this.failedAttemptsByNode.get(nodeKey) || 0;
|
|
const failedAttempts = currentAttempts + 1;
|
|
this.failedAttemptsByNode.set(nodeKey, failedAttempts);
|
|
const attempts = Math.min(failedAttempts, ClusterSubscriberGroup.MAX_RETRY_ATTEMPTS);
|
|
const backoff = Math.min(ClusterSubscriberGroup.BASE_BACKOFF_MS * 2 ** attempts, ClusterSubscriberGroup.MAX_BACKOFF_MS);
|
|
const jitter = Math.floor((Math.random() - 0.5) * (backoff * 0.5));
|
|
const delay = Math.max(0, backoff + jitter);
|
|
debug("Failed to connect subscriber for %s. Refreshing slots in %dms", nodeKey, delay);
|
|
this.subscriberGroupEmitter.emit("subscriberConnectFailed", {
|
|
delay,
|
|
error,
|
|
});
|
|
};
|
|
/**
|
|
* Handles successful subscriber connections by resetting the failed attempts counter.
|
|
*
|
|
* @param nodeKey
|
|
*/
|
|
this.handleSubscriberConnectSucceeded = (nodeKey) => {
|
|
this.failedAttemptsByNode.delete(nodeKey);
|
|
};
|
|
}
|
|
/**
|
|
* Get the responsible subscriber.
|
|
*
|
|
* @param slot
|
|
*/
|
|
getResponsibleSubscriber(slot) {
|
|
const nodeKey = this.clusterSlots[slot][0];
|
|
return this.shardedSubscribers.get(nodeKey);
|
|
}
|
|
/**
|
|
* Adds a channel for which this subscriber group is responsible
|
|
*
|
|
* @param channels
|
|
*/
|
|
addChannels(channels) {
|
|
const slot = calculateSlot(channels[0]);
|
|
// Check if the all channels belong to the same slot and otherwise reject the operation
|
|
for (const c of channels) {
|
|
if (calculateSlot(c) !== slot) {
|
|
return -1;
|
|
}
|
|
}
|
|
const currChannels = this.channels.get(slot);
|
|
if (!currChannels) {
|
|
this.channels.set(slot, channels);
|
|
}
|
|
else {
|
|
this.channels.set(slot, currChannels.concat(channels));
|
|
}
|
|
return Array.from(this.channels.values()).reduce((sum, array) => sum + array.length, 0);
|
|
}
|
|
/**
|
|
* Removes channels for which the subscriber group is responsible by optionally unsubscribing
|
|
* @param channels
|
|
*/
|
|
removeChannels(channels) {
|
|
const slot = calculateSlot(channels[0]);
|
|
// Check if the all channels belong to the same slot and otherwise reject the operation
|
|
for (const c of channels) {
|
|
if (calculateSlot(c) !== slot) {
|
|
return -1;
|
|
}
|
|
}
|
|
const slotChannels = this.channels.get(slot);
|
|
if (slotChannels) {
|
|
const updatedChannels = slotChannels.filter((c) => !channels.includes(c));
|
|
this.channels.set(slot, updatedChannels);
|
|
}
|
|
return Array.from(this.channels.values()).reduce((sum, array) => sum + array.length, 0);
|
|
}
|
|
/**
|
|
* Disconnect all subscribers and clear some of the internal state.
|
|
*/
|
|
stop() {
|
|
for (const s of this.shardedSubscribers.values()) {
|
|
s.stop();
|
|
}
|
|
// Clear subscriber instances and pending operations.
|
|
// Channels are preserved for resubscription on reconnect.
|
|
this.pendingReset = null;
|
|
this.shardedSubscribers.clear();
|
|
this.subscriberToSlotsIndex.clear();
|
|
}
|
|
/**
|
|
* Start all not yet started subscribers
|
|
*/
|
|
start() {
|
|
const startPromises = [];
|
|
for (const s of this.shardedSubscribers.values()) {
|
|
if (!s.isStarted()) {
|
|
startPromises.push(s
|
|
.start()
|
|
.then(() => {
|
|
this.handleSubscriberConnectSucceeded(s.getNodeKey());
|
|
})
|
|
.catch((err) => {
|
|
this.handleSubscriberConnectFailed(err, s.getNodeKey());
|
|
}));
|
|
}
|
|
}
|
|
return Promise.all(startPromises);
|
|
}
|
|
/**
|
|
* Resets the subscriber group by disconnecting all subscribers that are no longer needed and connecting new ones.
|
|
*/
|
|
async reset(clusterSlots, clusterNodes) {
|
|
if (this.isResetting) {
|
|
this.pendingReset = { slots: clusterSlots, nodes: clusterNodes };
|
|
return;
|
|
}
|
|
this.isResetting = true;
|
|
try {
|
|
const hasTopologyChanged = this._refreshSlots(clusterSlots);
|
|
const hasFailedSubscribers = this.hasUnhealthySubscribers();
|
|
if (!hasTopologyChanged && !hasFailedSubscribers) {
|
|
debug("No topology change detected or failed subscribers. Skipping reset.");
|
|
return;
|
|
}
|
|
// For each of the sharded subscribers
|
|
for (const [nodeKey, shardedSubscriber] of this.shardedSubscribers) {
|
|
if (
|
|
// If the subscriber is still responsible for a slot range and is running then keep it
|
|
this.subscriberToSlotsIndex.has(nodeKey) &&
|
|
shardedSubscriber.isStarted()) {
|
|
debug("Skipping deleting subscriber for %s", nodeKey);
|
|
continue;
|
|
}
|
|
debug("Removing subscriber for %s", nodeKey);
|
|
// Otherwise stop the subscriber and remove it
|
|
shardedSubscriber.stop();
|
|
this.shardedSubscribers.delete(nodeKey);
|
|
this.subscriberGroupEmitter.emit("-subscriber");
|
|
}
|
|
const startPromises = [];
|
|
// For each node in slots cache
|
|
for (const [nodeKey, _] of this.subscriberToSlotsIndex) {
|
|
// If we already have a subscriber for this node then keep it
|
|
if (this.shardedSubscribers.has(nodeKey)) {
|
|
debug("Skipping creating new subscriber for %s", nodeKey);
|
|
continue;
|
|
}
|
|
debug("Creating new subscriber for %s", nodeKey);
|
|
// Otherwise create a new subscriber
|
|
const redis = clusterNodes.find((node) => {
|
|
return (0, util_1.getNodeKey)(node.options) === nodeKey;
|
|
});
|
|
if (!redis) {
|
|
debug("Failed to find node for key %s", nodeKey);
|
|
continue;
|
|
}
|
|
const sub = new ShardedSubscriber_1.default(this.subscriberGroupEmitter, redis.options);
|
|
this.shardedSubscribers.set(nodeKey, sub);
|
|
startPromises.push(sub
|
|
.start()
|
|
.then(() => {
|
|
this.handleSubscriberConnectSucceeded(nodeKey);
|
|
})
|
|
.catch((error) => {
|
|
this.handleSubscriberConnectFailed(error, nodeKey);
|
|
}));
|
|
this.subscriberGroupEmitter.emit("+subscriber");
|
|
}
|
|
// It's vital to await the start promises before resubscribing
|
|
// Otherwise we might try to resubscribe to a subscriber that is not yet connected
|
|
// This can cause a race condition
|
|
await Promise.all(startPromises);
|
|
this._resubscribe();
|
|
this.subscriberGroupEmitter.emit("subscribersReady");
|
|
}
|
|
finally {
|
|
this.isResetting = false;
|
|
if (this.pendingReset) {
|
|
const { slots, nodes } = this.pendingReset;
|
|
this.pendingReset = null;
|
|
await this.reset(slots, nodes);
|
|
}
|
|
}
|
|
}
|
|
/**
|
|
* Refreshes the subscriber-related slot ranges
|
|
*
|
|
* Returns false if no refresh was needed
|
|
*
|
|
* @param targetSlots
|
|
*/
|
|
_refreshSlots(targetSlots) {
|
|
//If there was an actual change, then reassign the slot ranges
|
|
if (this._slotsAreEqual(targetSlots)) {
|
|
debug("Nothing to refresh because the new cluster map is equal to the previous one.");
|
|
return false;
|
|
}
|
|
debug("Refreshing the slots of the subscriber group.");
|
|
//Rebuild the slots index
|
|
this.subscriberToSlotsIndex = new Map();
|
|
for (let slot = 0; slot < targetSlots.length; slot++) {
|
|
const node = targetSlots[slot][0];
|
|
if (!this.subscriberToSlotsIndex.has(node)) {
|
|
this.subscriberToSlotsIndex.set(node, []);
|
|
}
|
|
this.subscriberToSlotsIndex.get(node).push(Number(slot));
|
|
}
|
|
//Update the cached slots map
|
|
this.clusterSlots = JSON.parse(JSON.stringify(targetSlots));
|
|
return true;
|
|
}
|
|
/**
|
|
* Resubscribes to the previous channels
|
|
*
|
|
* @private
|
|
*/
|
|
_resubscribe() {
|
|
if (this.shardedSubscribers) {
|
|
this.shardedSubscribers.forEach((s, nodeKey) => {
|
|
const subscriberSlots = this.subscriberToSlotsIndex.get(nodeKey);
|
|
if (subscriberSlots) {
|
|
//Resubscribe on the underlying connection
|
|
subscriberSlots.forEach((ss) => {
|
|
//Might return null if being disconnected
|
|
const redis = s.getInstance();
|
|
const channels = this.channels.get(ss);
|
|
if (channels && channels.length > 0) {
|
|
if (redis.status === "end") {
|
|
return;
|
|
}
|
|
if (redis.status === "ready") {
|
|
redis.ssubscribe(...channels).catch((err) => {
|
|
// TODO: Should we emit an error event here?
|
|
debug("Failed to ssubscribe on node %s: %s", nodeKey, err);
|
|
});
|
|
}
|
|
else {
|
|
redis.once("ready", () => {
|
|
redis.ssubscribe(...channels).catch((err) => {
|
|
// TODO: Should we emit an error event here?
|
|
debug("Failed to ssubscribe on node %s: %s", nodeKey, err);
|
|
});
|
|
});
|
|
}
|
|
}
|
|
});
|
|
}
|
|
});
|
|
}
|
|
}
|
|
/**
|
|
* Deep equality of the cluster slots objects
|
|
*
|
|
* @param other
|
|
* @private
|
|
*/
|
|
_slotsAreEqual(other) {
|
|
if (this.clusterSlots === undefined) {
|
|
return false;
|
|
}
|
|
else {
|
|
return JSON.stringify(this.clusterSlots) === JSON.stringify(other);
|
|
}
|
|
}
|
|
/**
|
|
* Checks if any subscribers are in an unhealthy state.
|
|
*
|
|
* A subscriber is considered unhealthy if:
|
|
* - It exists but is not started (failed/disconnected)
|
|
* - It's missing entirely for a node that should have one
|
|
*
|
|
* @returns true if any subscribers need to be recreated
|
|
*/
|
|
hasUnhealthySubscribers() {
|
|
const hasFailedSubscribers = Array.from(this.shardedSubscribers.values()).some((sub) => !sub.isStarted());
|
|
const hasMissingSubscribers = Array.from(this.subscriberToSlotsIndex.keys()).some((nodeKey) => !this.shardedSubscribers.has(nodeKey));
|
|
return hasFailedSubscribers || hasMissingSubscribers;
|
|
}
|
|
}
|
|
exports.default = ClusterSubscriberGroup;
|
|
// Retry strategy
|
|
ClusterSubscriberGroup.MAX_RETRY_ATTEMPTS = 10;
|
|
ClusterSubscriberGroup.MAX_BACKOFF_MS = 2000;
|
|
ClusterSubscriberGroup.BASE_BACKOFF_MS = 100;
|