Aktueller Stand

This commit is contained in:
2026-01-23 14:01:49 +01:00
parent 2766dd12c5
commit e16f6d50fb
46 changed files with 5482 additions and 311 deletions

View File

@@ -0,0 +1,2 @@
ALTER TABLE "CleanupJobCandidate"
ADD COLUMN "listUnsubscribePost" TEXT;

View File

@@ -0,0 +1,2 @@
ALTER TABLE "CleanupJobCandidate"
ADD COLUMN "unsubscribeDetails" JSONB;

View File

@@ -0,0 +1,16 @@
CREATE TABLE "TenantMetric" (
"id" TEXT NOT NULL,
"tenantId" TEXT NOT NULL,
"avgProcessingRate" DOUBLE PRECISION,
"sampleCount" INTEGER NOT NULL DEFAULT 0,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "TenantMetric_pkey" PRIMARY KEY ("id")
);
CREATE UNIQUE INDEX "TenantMetric_tenantId_key" ON "TenantMetric"("tenantId");
ALTER TABLE "TenantMetric"
ADD CONSTRAINT "TenantMetric_tenantId_fkey"
FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id")
ON DELETE CASCADE ON UPDATE CASCADE;

View File

@@ -0,0 +1 @@
ALTER TYPE "RuleConditionType" ADD VALUE IF NOT EXISTS 'HEADER_MISSING';

View File

@@ -0,0 +1,15 @@
ALTER TABLE "Rule" ADD COLUMN "position" INTEGER NOT NULL DEFAULT 0;
WITH ordered AS (
SELECT
"id",
"tenantId",
ROW_NUMBER() OVER (PARTITION BY "tenantId" ORDER BY "createdAt" ASC, "id" ASC) - 1 AS pos
FROM "Rule"
)
UPDATE "Rule" r
SET "position" = ordered.pos
FROM ordered
WHERE ordered.id = r.id;
CREATE INDEX "Rule_tenantId_position_idx" ON "Rule"("tenantId", "position");

View File

@@ -0,0 +1 @@
ALTER TABLE "Rule" ADD COLUMN "stopOnMatch" BOOLEAN NOT NULL DEFAULT false;

View File

@@ -0,0 +1,6 @@
ALTER TABLE "CleanupJob" ADD COLUMN "listingSeconds" INTEGER;
ALTER TABLE "CleanupJob" ADD COLUMN "processingSeconds" INTEGER;
ALTER TABLE "CleanupJob" ADD COLUMN "unsubscribeSeconds" INTEGER;
ALTER TABLE "CleanupJob" ADD COLUMN "routingSeconds" INTEGER;
ALTER TABLE "CleanupJob" ADD COLUMN "unsubscribeAttempts" INTEGER;
ALTER TABLE "CleanupJob" ADD COLUMN "actionAttempts" INTEGER;

View File

@@ -0,0 +1,23 @@
CREATE TABLE "TenantProviderMetric" (
"id" TEXT NOT NULL,
"tenantId" TEXT NOT NULL,
"provider" "MailProvider" NOT NULL,
"avgListingRate" DOUBLE PRECISION,
"avgProcessingRate" DOUBLE PRECISION,
"avgUnsubscribeRate" DOUBLE PRECISION,
"avgRoutingRate" DOUBLE PRECISION,
"listingSampleCount" INTEGER NOT NULL DEFAULT 0,
"processingSampleCount" INTEGER NOT NULL DEFAULT 0,
"unsubscribeSampleCount" INTEGER NOT NULL DEFAULT 0,
"routingSampleCount" INTEGER NOT NULL DEFAULT 0,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "TenantProviderMetric_pkey" PRIMARY KEY ("id")
);
CREATE UNIQUE INDEX "TenantProviderMetric_tenantId_provider_key" ON "TenantProviderMetric"("tenantId", "provider");
CREATE INDEX "TenantProviderMetric_tenantId_idx" ON "TenantProviderMetric"("tenantId");
ALTER TABLE "TenantProviderMetric"
ADD CONSTRAINT "TenantProviderMetric_tenantId_fkey"
FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@@ -0,0 +1,4 @@
ALTER TABLE "TenantProviderMetric" ADD COLUMN "avgListingSecondsPerMessage" DOUBLE PRECISION;
ALTER TABLE "TenantProviderMetric" ADD COLUMN "avgProcessingSecondsPerMessage" DOUBLE PRECISION;
ALTER TABLE "TenantProviderMetric" ADD COLUMN "avgUnsubscribeSecondsPerMessage" DOUBLE PRECISION;
ALTER TABLE "TenantProviderMetric" ADD COLUMN "avgRoutingSecondsPerMessage" DOUBLE PRECISION;

View File

@@ -0,0 +1,33 @@
-- CreateTable
CREATE TABLE "CleanupJobCandidate" (
"id" TEXT NOT NULL,
"jobId" TEXT NOT NULL,
"mailboxAccountId" TEXT NOT NULL,
"provider" "MailProvider" NOT NULL,
"externalId" TEXT NOT NULL,
"subject" TEXT,
"from" TEXT,
"fromDomain" TEXT,
"listId" TEXT,
"listUnsubscribe" TEXT,
"score" INTEGER NOT NULL,
"signals" JSONB NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "CleanupJobCandidate_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "CleanupJobCandidate_jobId_externalId_key" ON "CleanupJobCandidate"("jobId", "externalId");
-- CreateIndex
CREATE INDEX "CleanupJobCandidate_jobId_idx" ON "CleanupJobCandidate"("jobId");
-- CreateIndex
CREATE INDEX "CleanupJobCandidate_jobId_fromDomain_idx" ON "CleanupJobCandidate"("jobId", "fromDomain");
-- AddForeignKey
ALTER TABLE "CleanupJobCandidate" ADD CONSTRAINT "CleanupJobCandidate_jobId_fkey" FOREIGN KEY ("jobId") REFERENCES "CleanupJob"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "CleanupJobCandidate" ADD CONSTRAINT "CleanupJobCandidate_mailboxAccountId_fkey" FOREIGN KEY ("mailboxAccountId") REFERENCES "MailboxAccount"("id") ON DELETE RESTRICT ON UPDATE CASCADE;

View File

@@ -0,0 +1,7 @@
-- AlterTable
ALTER TABLE "CleanupJobCandidate"
ADD COLUMN "receivedAt" TIMESTAMP(3),
ADD COLUMN "actions" JSONB,
ADD COLUMN "unsubscribeStatus" TEXT,
ADD COLUMN "unsubscribeMessage" TEXT,
ADD COLUMN "unsubscribeTarget" TEXT;

View File

@@ -0,0 +1,5 @@
-- AlterTable
ALTER TABLE "UnsubscribeAttempt" ADD COLUMN "dedupeKey" TEXT;
-- CreateIndex
CREATE UNIQUE INDEX "UnsubscribeAttempt_jobId_dedupeKey_key" ON "UnsubscribeAttempt"("jobId", "dedupeKey");

View File

@@ -0,0 +1,6 @@
-- AlterEnum
ALTER TYPE "RuleActionType" ADD VALUE IF NOT EXISTS 'MARK_READ';
ALTER TYPE "RuleActionType" ADD VALUE IF NOT EXISTS 'MARK_UNREAD';
-- AlterEnum
ALTER TYPE "RuleConditionType" ADD VALUE IF NOT EXISTS 'UNSUBSCRIBE_STATUS';

View File

@@ -0,0 +1,2 @@
-- AlterEnum
ALTER TYPE "RuleConditionType" ADD VALUE IF NOT EXISTS 'SCORE';

View File

@@ -0,0 +1,17 @@
-- CreateTable
CREATE TABLE "UnsubscribeHistory" (
"id" TEXT NOT NULL,
"tenantId" TEXT NOT NULL,
"dedupeKey" TEXT NOT NULL,
"target" TEXT NOT NULL,
"status" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "UnsubscribeHistory_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "UnsubscribeHistory_tenantId_dedupeKey_key" ON "UnsubscribeHistory"("tenantId", "dedupeKey");
-- CreateIndex
CREATE INDEX "UnsubscribeHistory_tenantId_idx" ON "UnsubscribeHistory"("tenantId");

View File

@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "CleanupJobCandidate" ADD COLUMN "reviewed" BOOLEAN NOT NULL DEFAULT false;

View File

@@ -0,0 +1,5 @@
-- CreateEnum
CREATE TYPE "RuleMatchMode" AS ENUM ('ALL', 'ANY');
-- AlterTable
ALTER TABLE "Rule" ADD COLUMN "matchMode" "RuleMatchMode" NOT NULL DEFAULT 'ALL';

View File

@@ -31,14 +31,19 @@ enum RuleActionType {
DELETE
ARCHIVE
LABEL
MARK_READ
MARK_UNREAD
}
enum RuleConditionType {
HEADER
HEADER_MISSING
SUBJECT
FROM
LIST_UNSUBSCRIBE
LIST_ID
UNSUBSCRIBE_STATUS
SCORE
}
enum ExportStatus {
@@ -60,6 +65,42 @@ model Tenant {
mailboxAccounts MailboxAccount[]
rules Rule[]
jobs CleanupJob[]
metric TenantMetric?
providerMetrics TenantProviderMetric[]
}
model TenantMetric {
id String @id @default(cuid())
tenantId String @unique
avgProcessingRate Float?
sampleCount Int @default(0)
updatedAt DateTime @updatedAt
tenant Tenant @relation(fields: [tenantId], references: [id])
}
model TenantProviderMetric {
id String @id @default(cuid())
tenantId String
provider MailProvider
avgListingRate Float?
avgProcessingRate Float?
avgUnsubscribeRate Float?
avgRoutingRate Float?
avgListingSecondsPerMessage Float?
avgProcessingSecondsPerMessage Float?
avgUnsubscribeSecondsPerMessage Float?
avgRoutingSecondsPerMessage Float?
listingSampleCount Int @default(0)
processingSampleCount Int @default(0)
unsubscribeSampleCount Int @default(0)
routingSampleCount Int @default(0)
updatedAt DateTime @updatedAt
tenant Tenant @relation(fields: [tenantId], references: [id])
@@unique([tenantId, provider])
@@index([tenantId])
}
model ExportJob {
@@ -120,6 +161,7 @@ model MailboxAccount {
tenant Tenant @relation(fields: [tenantId], references: [id])
folders MailboxFolder[]
jobs CleanupJob[]
candidates CleanupJobCandidate[]
@@index([tenantId])
}
@@ -161,6 +203,9 @@ model Rule {
tenantId String
name String
enabled Boolean @default(true)
matchMode RuleMatchMode @default(ALL)
position Int @default(0)
stopOnMatch Boolean @default(false)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@ -169,6 +214,12 @@ model Rule {
actions RuleAction[]
@@index([tenantId])
@@index([tenantId, position])
}
enum RuleMatchMode {
ALL
ANY
}
model RuleCondition {
@@ -205,6 +256,12 @@ model CleanupJob {
checkpointUpdatedAt DateTime?
processedMessages Int?
totalMessages Int?
listingSeconds Int?
processingSeconds Int?
unsubscribeSeconds Int?
routingSeconds Int?
unsubscribeAttempts Int?
actionAttempts Int?
startedAt DateTime?
finishedAt DateTime?
createdAt DateTime @default(now())
@@ -212,17 +269,50 @@ model CleanupJob {
tenant Tenant @relation(fields: [tenantId], references: [id])
mailboxAccount MailboxAccount @relation(fields: [mailboxAccountId], references: [id])
unsubscribeAttempts UnsubscribeAttempt[]
unsubscribeAttemptItems UnsubscribeAttempt[]
events CleanupJobEvent[]
candidates CleanupJobCandidate[]
@@index([tenantId])
@@index([mailboxAccountId])
}
model CleanupJobCandidate {
id String @id @default(cuid())
jobId String
mailboxAccountId String
provider MailProvider
externalId String
subject String?
from String?
fromDomain String?
receivedAt DateTime?
listId String?
listUnsubscribe String?
listUnsubscribePost String?
score Int
signals Json
actions Json?
unsubscribeStatus String?
unsubscribeMessage String?
unsubscribeTarget String?
unsubscribeDetails Json?
reviewed Boolean @default(false)
createdAt DateTime @default(now())
job CleanupJob @relation(fields: [jobId], references: [id])
mailboxAccount MailboxAccount @relation(fields: [mailboxAccountId], references: [id])
@@unique([jobId, externalId])
@@index([jobId])
@@index([jobId, fromDomain])
}
model UnsubscribeAttempt {
id String @id @default(cuid())
jobId String
mailItemId String?
dedupeKey String?
method String
target String
status String
@@ -231,6 +321,19 @@ model UnsubscribeAttempt {
job CleanupJob @relation(fields: [jobId], references: [id])
@@index([jobId])
@@unique([jobId, dedupeKey])
}
model UnsubscribeHistory {
id String @id @default(cuid())
tenantId String
dedupeKey String
target String
status String
createdAt DateTime @default(now())
@@unique([tenantId, dedupeKey])
@@index([tenantId])
}
model CleanupJobEvent {

View File

@@ -1,6 +1,7 @@
import { FastifyInstance } from "fastify";
import { z } from "zod";
import { prisma } from "../db.js";
import { config } from "../config.js";
import { logJobEvent } from "../queue/jobEvents.js";
import { queueCleanupJob, removeQueueJob, queueExportJob } from "../queue/queue.js";
import { createReadStream } from "node:fs";
@@ -28,7 +29,17 @@ const allowedSettings = [
"google.client_id",
"google.client_secret",
"google.redirect_uri",
"cleanup.scan_limit"
"cleanup.scan_limit",
"newsletter.threshold",
"newsletter.subject_tokens",
"newsletter.from_tokens",
"newsletter.header_keys",
"newsletter.weight_header",
"newsletter.weight_precedence",
"newsletter.weight_subject",
"newsletter.weight_from",
"unsubscribe.history_ttl_days",
"unsubscribe.method_preference"
] as const;
export async function adminRoutes(app: FastifyInstance) {
@@ -41,7 +52,17 @@ export async function adminRoutes(app: FastifyInstance) {
"google.client_id": process.env.GOOGLE_CLIENT_ID ?? null,
"google.client_secret": process.env.GOOGLE_CLIENT_SECRET ?? null,
"google.redirect_uri": process.env.GOOGLE_REDIRECT_URI ?? null,
"cleanup.scan_limit": process.env.CLEANUP_SCAN_LIMIT ?? null
"cleanup.scan_limit": process.env.CLEANUP_SCAN_LIMIT ?? String(config.CLEANUP_SCAN_LIMIT),
"newsletter.threshold": process.env.NEWSLETTER_THRESHOLD ?? String(config.NEWSLETTER_THRESHOLD),
"newsletter.subject_tokens": process.env.NEWSLETTER_SUBJECT_TOKENS ?? config.NEWSLETTER_SUBJECT_TOKENS,
"newsletter.from_tokens": process.env.NEWSLETTER_FROM_TOKENS ?? config.NEWSLETTER_FROM_TOKENS,
"newsletter.header_keys": process.env.NEWSLETTER_HEADER_KEYS ?? config.NEWSLETTER_HEADER_KEYS,
"newsletter.weight_header": process.env.NEWSLETTER_WEIGHT_HEADER ?? String(config.NEWSLETTER_WEIGHT_HEADER),
"newsletter.weight_precedence": process.env.NEWSLETTER_WEIGHT_PRECEDENCE ?? String(config.NEWSLETTER_WEIGHT_PRECEDENCE),
"newsletter.weight_subject": process.env.NEWSLETTER_WEIGHT_SUBJECT ?? String(config.NEWSLETTER_WEIGHT_SUBJECT),
"newsletter.weight_from": process.env.NEWSLETTER_WEIGHT_FROM ?? String(config.NEWSLETTER_WEIGHT_FROM),
"unsubscribe.history_ttl_days": process.env.UNSUBSCRIBE_HISTORY_TTL_DAYS ?? String(config.UNSUBSCRIBE_HISTORY_TTL_DAYS),
"unsubscribe.method_preference": process.env.UNSUBSCRIBE_METHOD_PREFERENCE ?? config.UNSUBSCRIBE_METHOD_PREFERENCE
};
const settings = keys.reduce<Record<string, { value: string | null; source: "db" | "env" | "unset" }>>((acc, key) => {
const dbValue = stored[key];
@@ -223,6 +244,7 @@ export async function adminRoutes(app: FastifyInstance) {
const jobIds = jobs.map((job) => job.id);
await tx.cleanupJobEvent.deleteMany({ where: { jobId: { in: jobIds } } });
await tx.unsubscribeAttempt.deleteMany({ where: { jobId: { in: jobIds } } });
await tx.cleanupJobCandidate.deleteMany({ where: { jobId: { in: jobIds } } });
await tx.cleanupJob.deleteMany({ where: { tenantId: tenant.id } });
await tx.ruleAction.deleteMany({ where: { rule: { tenantId: tenant.id } } });
await tx.ruleCondition.deleteMany({ where: { rule: { tenantId: tenant.id } } });
@@ -427,6 +449,7 @@ export async function adminRoutes(app: FastifyInstance) {
await prisma.$transaction(async (tx) => {
await tx.cleanupJobEvent.deleteMany({ where: { jobId: job.id } });
await tx.unsubscribeAttempt.deleteMany({ where: { jobId: job.id } });
await tx.cleanupJobCandidate.deleteMany({ where: { jobId: job.id } });
await tx.cleanupJob.delete({ where: { id: job.id } });
});

View File

@@ -30,6 +30,21 @@ const envSchema = z.object({
SSE_TOKEN_TTL_SECONDS: z.coerce.number().default(300),
OAUTH_STATE_TTL_SECONDS: z.coerce.number().default(600),
CLEANUP_SCAN_LIMIT: z.coerce.number().default(0),
NEWSLETTER_THRESHOLD: z.coerce.number().default(2),
NEWSLETTER_SUBJECT_TOKENS: z.string().default("newsletter,unsubscribe,update,news,digest"),
NEWSLETTER_FROM_TOKENS: z.string().default("newsletter,no-reply,noreply,news,updates"),
NEWSLETTER_HEADER_KEYS: z.string().default("list-unsubscribe,list-id,list-help,list-archive,list-post,list-owner,list-subscribe,list-unsubscribe-post"),
NEWSLETTER_WEIGHT_HEADER: z.coerce.number().default(1),
NEWSLETTER_WEIGHT_PRECEDENCE: z.coerce.number().default(1),
NEWSLETTER_WEIGHT_SUBJECT: z.coerce.number().default(1),
NEWSLETTER_WEIGHT_FROM: z.coerce.number().default(1),
UNSUBSCRIBE_HISTORY_TTL_DAYS: z.coerce.number().default(180),
UNSUBSCRIBE_METHOD_PREFERENCE: z.preprocess(
(value) => (typeof value === "string" ? value.toLowerCase() : value),
z.enum(["auto", "http", "mailto"]).default("http")
),
METRICS_EMA_ALPHA: z.coerce.number().default(0.3),
ATTACHMENT_MAX_BYTES: z.coerce.number().default(10 * 1024 * 1024),
ALLOW_CUSTOM_MAIL_HOSTS: envBoolean(false),
BLOCK_PRIVATE_NETWORKS: envBoolean(true),
ENCRYPTION_KEY: z.string().optional(),
@@ -67,6 +82,18 @@ const parsed = envSchema.safeParse({
SSE_TOKEN_TTL_SECONDS: process.env.SSE_TOKEN_TTL_SECONDS,
OAUTH_STATE_TTL_SECONDS: process.env.OAUTH_STATE_TTL_SECONDS,
CLEANUP_SCAN_LIMIT: process.env.CLEANUP_SCAN_LIMIT,
NEWSLETTER_THRESHOLD: process.env.NEWSLETTER_THRESHOLD,
NEWSLETTER_SUBJECT_TOKENS: process.env.NEWSLETTER_SUBJECT_TOKENS,
NEWSLETTER_FROM_TOKENS: process.env.NEWSLETTER_FROM_TOKENS,
NEWSLETTER_HEADER_KEYS: process.env.NEWSLETTER_HEADER_KEYS,
NEWSLETTER_WEIGHT_HEADER: process.env.NEWSLETTER_WEIGHT_HEADER,
NEWSLETTER_WEIGHT_PRECEDENCE: process.env.NEWSLETTER_WEIGHT_PRECEDENCE,
NEWSLETTER_WEIGHT_SUBJECT: process.env.NEWSLETTER_WEIGHT_SUBJECT,
NEWSLETTER_WEIGHT_FROM: process.env.NEWSLETTER_WEIGHT_FROM,
UNSUBSCRIBE_HISTORY_TTL_DAYS: process.env.UNSUBSCRIBE_HISTORY_TTL_DAYS,
UNSUBSCRIBE_METHOD_PREFERENCE: process.env.UNSUBSCRIBE_METHOD_PREFERENCE,
METRICS_EMA_ALPHA: process.env.METRICS_EMA_ALPHA,
ATTACHMENT_MAX_BYTES: process.env.ATTACHMENT_MAX_BYTES,
ALLOW_CUSTOM_MAIL_HOSTS: process.env.ALLOW_CUSTOM_MAIL_HOSTS,
BLOCK_PRIVATE_NETWORKS: process.env.BLOCK_PRIVATE_NETWORKS,
ENCRYPTION_KEY: process.env.ENCRYPTION_KEY,

View File

@@ -5,7 +5,7 @@ import { createImapClient, fetchHeadersByUids, listMailboxes } from "./imap.js";
import { detectNewsletter } from "./newsletter.js";
import { matchRules } from "./rules.js";
import { unsubscribeFromHeader } from "./unsubscribe.js";
import { applyGmailAction, gmailClientForAccount } from "./gmail.js";
import { gmailClientForAccount } from "./gmail.js";
export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string) => {
const account = await prisma.mailboxAccount.findUnique({ where: { id: mailboxAccountId } });
@@ -27,9 +27,73 @@ export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string)
const rules = await prisma.rule.findMany({
where: { tenantId: job.tenantId },
include: { conditions: true, actions: true }
include: { conditions: true, actions: true },
orderBy: [{ position: "asc" }, { createdAt: "asc" }]
});
const parseList = (value: string | null | undefined, fallback: string[]) => {
if (!value) return fallback;
return value.split(",").map((item) => item.trim().toLowerCase()).filter(Boolean);
};
const parseThreshold = (value: string | null | undefined, fallback: number) => {
if (!value) return fallback;
const parsed = Number.parseInt(value, 10);
return Number.isFinite(parsed) ? parsed : fallback;
};
const defaultHeaderKeys = [
"list-unsubscribe",
"list-id",
"list-help",
"list-archive",
"list-post",
"list-owner",
"list-subscribe",
"list-unsubscribe-post"
];
const defaultSubjectTokens = ["newsletter", "unsubscribe", "update", "news", "digest"];
const defaultFromTokens = ["newsletter", "no-reply", "noreply", "news", "updates"];
const newsletterSettings = await prisma.appSetting.findMany({
where: { key: { in: ["newsletter.threshold", "newsletter.subject_tokens", "newsletter.from_tokens", "newsletter.header_keys"] } }
});
const newsletterMap = new Map(newsletterSettings.map((setting) => [setting.key, setting.value]));
const newsletterConfig = {
threshold: parseThreshold(
newsletterMap.get("newsletter.threshold"),
config.NEWSLETTER_THRESHOLD
),
weightHeader: parseThreshold(
newsletterMap.get("newsletter.weight_header"),
config.NEWSLETTER_WEIGHT_HEADER
),
weightPrecedence: parseThreshold(
newsletterMap.get("newsletter.weight_precedence"),
config.NEWSLETTER_WEIGHT_PRECEDENCE
),
weightSubject: parseThreshold(
newsletterMap.get("newsletter.weight_subject"),
config.NEWSLETTER_WEIGHT_SUBJECT
),
weightFrom: parseThreshold(
newsletterMap.get("newsletter.weight_from"),
config.NEWSLETTER_WEIGHT_FROM
),
subjectTokens: parseList(
newsletterMap.get("newsletter.subject_tokens") ?? config.NEWSLETTER_SUBJECT_TOKENS,
defaultSubjectTokens
),
fromTokens: parseList(
newsletterMap.get("newsletter.from_tokens") ?? config.NEWSLETTER_FROM_TOKENS,
defaultFromTokens
),
headerKeys: parseList(
newsletterMap.get("newsletter.header_keys") ?? config.NEWSLETTER_HEADER_KEYS,
defaultHeaderKeys
)
};
await logJobEvent(cleanupJobId, "info", `Connecting to ${account.email}`, 5);
const isGmail = account.provider === "GMAIL";
@@ -94,106 +158,329 @@ export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string)
});
};
const extractDomain = (value?: string | null) => {
if (!value) return null;
const match = value.match(/@([^>\s]+)/);
if (!match) return null;
return match[1].toLowerCase();
};
const normalizeListUnsubscribe = (value: string) => {
const tokens = value
.split(",")
.map((token) => token.trim())
.map((token) => token.replace(/^<|>$/g, ""))
.filter(Boolean)
.map((token) => token.toLowerCase());
if (!tokens.length) return null;
return tokens.sort().join(",");
};
type GmailClient = Awaited<ReturnType<typeof gmailClientForAccount>>["gmail"];
const processMessage = async (msg: {
uid: number;
subject?: string;
from?: string;
receivedAt?: Date;
headers: Map<string, string>;
gmailMessageId?: string;
}) => {
mailbox?: string;
}, gmailContext?: { gmail: GmailClient; resolveLabelId: (label: string) => Promise<string> }) => {
const ctx = {
headers: msg.headers,
subject: msg.subject ?? "",
from: msg.from ?? ""
};
const result = detectNewsletter(ctx);
const result = detectNewsletter({ ...ctx, config: newsletterConfig });
if (!result.isNewsletter) {
return false;
}
const actions = job.routingEnabled ? matchRules(rules, ctx) : [];
const listId = msg.headers.get("list-id") ?? null;
const listUnsubscribe = msg.headers.get("list-unsubscribe") ?? null;
const listUnsubscribePost = msg.headers.get("list-unsubscribe-post") ?? null;
const externalId = msg.gmailMessageId
? `gmail:${msg.gmailMessageId}`
: `imap:${msg.mailbox ?? account.email}:${msg.uid}`;
const candidate = await prisma.cleanupJobCandidate.upsert({
where: { jobId_externalId: { jobId: cleanupJobId, externalId } },
update: {
subject: msg.subject ?? null,
from: msg.from ?? null,
fromDomain: extractDomain(msg.from),
receivedAt: msg.receivedAt ?? null,
listId,
listUnsubscribe,
listUnsubscribePost,
score: result.score,
signals: result.signals
},
create: {
jobId: cleanupJobId,
mailboxAccountId: account.id,
provider: account.provider,
externalId,
subject: msg.subject ?? null,
from: msg.from ?? null,
fromDomain: extractDomain(msg.from),
receivedAt: msg.receivedAt ?? null,
listId,
listUnsubscribe,
listUnsubscribePost,
score: result.score,
signals: result.signals
}
});
let unsubscribeStatus = job.unsubscribeEnabled ? "pending" : "disabled";
let unsubscribeMessage: string | null = null;
let unsubscribeDetails: Record<string, any> | null = null;
const unsubscribeTarget = listUnsubscribe;
if (job.unsubscribeEnabled) {
if (listUnsubscribe) {
const dedupeKey = listId
? `list-id:${listId.toLowerCase()}`
: normalizeListUnsubscribe(listUnsubscribe);
if (dedupeKey) {
const existing = await prisma.unsubscribeAttempt.findFirst({
where: { jobId: cleanupJobId, dedupeKey }
});
const history = await prisma.unsubscribeHistory.findFirst({
where: { tenantId: job.tenantId, dedupeKey }
});
const historyAgeLimit = config.UNSUBSCRIBE_HISTORY_TTL_DAYS;
const historyEnabled = historyAgeLimit > 0;
const isHistoryFresh = historyEnabled && history
? history.status !== "dry-run" &&
Date.now() - history.createdAt.getTime() <= historyAgeLimit * 24 * 3600 * 1000
: false;
if (existing || isHistoryFresh) {
unsubscribeStatus = "skipped-duplicate";
unsubscribeMessage = "Duplicate unsubscribe target";
unsubscribeDetails = { reason: "duplicate" };
} else {
const attempt = await prisma.unsubscribeAttempt.create({
data: {
jobId: cleanupJobId,
mailItemId: externalId,
dedupeKey,
method: "list-unsubscribe",
target: listUnsubscribe,
status: "pending"
}
});
unsubscribeAttempts += 1;
const attemptStart = Date.now();
try {
if (job.dryRun) {
await prisma.unsubscribeAttempt.update({
where: { id: attempt.id },
data: { status: "dry-run" }
});
await logJobEvent(cleanupJobId, "info", `DRY RUN: unsubscribe ${listUnsubscribe}`);
unsubscribeStatus = "dry-run";
unsubscribeDetails = { reason: "dry-run" };
} else {
const response = await unsubscribeFromHeader({
account,
listUnsubscribe,
listUnsubscribePost,
subject: msg.subject,
from: msg.from
});
await prisma.unsubscribeAttempt.update({
where: { id: attempt.id },
data: { status: response.status }
});
await logJobEvent(cleanupJobId, "info", `Unsubscribe ${response.status}: ${response.message}`);
unsubscribeStatus = response.status;
unsubscribeMessage = response.message;
unsubscribeDetails = response.details ?? null;
await prisma.unsubscribeHistory.upsert({
where: { tenantId_dedupeKey: { tenantId: job.tenantId, dedupeKey } },
update: { status: response.status, target: listUnsubscribe, createdAt: new Date() },
create: { tenantId: job.tenantId, dedupeKey, target: listUnsubscribe, status: response.status }
});
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
await prisma.unsubscribeAttempt.update({
where: { id: attempt.id },
data: { status: "failed" }
});
await logJobEvent(cleanupJobId, "error", `Unsubscribe failed: ${message}`);
unsubscribeStatus = "failed";
unsubscribeMessage = message;
unsubscribeDetails = { reason: "exception", error: message };
await prisma.unsubscribeHistory.upsert({
where: { tenantId_dedupeKey: { tenantId: job.tenantId, dedupeKey } },
update: { status: "failed", target: listUnsubscribe, createdAt: new Date() },
create: { tenantId: job.tenantId, dedupeKey, target: listUnsubscribe, status: "failed" }
});
}
unsubscribeSeconds += (Date.now() - attemptStart) / 1000;
}
} else {
unsubscribeStatus = "skipped";
unsubscribeMessage = "No usable List-Unsubscribe target";
unsubscribeDetails = { reason: "missing-target" };
}
} else {
unsubscribeStatus = "skipped";
unsubscribeDetails = { reason: "disabled" };
}
}
const routingCtx = { ...ctx, unsubscribeStatus, newsletterScore: result.score };
const actions = job.routingEnabled ? matchRules(rules, routingCtx) : [];
const actionLog: { type: string; target?: string | null; status: string; error?: string }[] = [];
if (actions.length > 0) {
for (const action of actions) {
if (job.dryRun) {
await logJobEvent(cleanupJobId, "info", `DRY RUN: ${action.type} ${action.target ?? ""}`);
actionLog.push({ type: action.type, target: action.target ?? null, status: "dry-run" });
continue;
}
if (account.provider === "GMAIL" && msg.gmailMessageId) {
await applyGmailAction({
account,
gmailMessageId: msg.gmailMessageId,
action: action.type,
target: action.target
});
await logJobEvent(cleanupJobId, "info", `Gmail action ${action.type} applied`);
continue;
if (account.provider === "GMAIL" && msg.gmailMessageId && gmailContext) {
const actionStart = Date.now();
actionAttempts += 1;
const actionLogItems = actions.map((item) => ({
type: item.type,
target: item.target ?? null,
status: "pending" as const,
error: undefined as string | undefined
}));
const hasDelete = actions.some((item) => item.type === "DELETE");
try {
if (hasDelete) {
await gmailContext.gmail.users.messages.delete({ userId: "me", id: msg.gmailMessageId });
await logJobEvent(cleanupJobId, "info", "Gmail action DELETE applied");
for (const item of actionLogItems) {
if (item.type === "DELETE") {
item.status = "applied";
} else {
item.status = "skipped";
}
}
} else {
const addLabelIds = new Set<string>();
const removeLabelIds = new Set<string>();
for (const item of actions) {
if ((item.type === "MOVE" || item.type === "LABEL") && item.target) {
const labelId = await gmailContext.resolveLabelId(item.target);
addLabelIds.add(labelId);
if (item.type === "MOVE") {
removeLabelIds.add("INBOX");
}
}
if (item.type === "ARCHIVE") {
removeLabelIds.add("INBOX");
}
if (item.type === "MARK_READ") {
removeLabelIds.add("UNREAD");
}
if (item.type === "MARK_UNREAD") {
addLabelIds.add("UNREAD");
}
}
if (addLabelIds.size === 0 && removeLabelIds.size === 0) {
await logJobEvent(cleanupJobId, "info", "Gmail action skipped: no label changes");
for (const item of actionLogItems) {
item.status = "skipped";
}
} else {
await gmailContext.gmail.users.messages.modify({
userId: "me",
id: msg.gmailMessageId,
requestBody: {
addLabelIds: Array.from(addLabelIds),
removeLabelIds: Array.from(removeLabelIds)
}
});
await logJobEvent(cleanupJobId, "info", `Gmail action applied: ${actions.map((a) => a.type).join(", ")}`);
for (const item of actionLogItems) {
item.status = "applied";
}
}
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
await logJobEvent(cleanupJobId, "error", `Gmail action failed: ${message}`);
for (const item of actionLogItems) {
if (item.status === "pending") {
item.status = "failed";
item.error = message;
}
}
} finally {
routingSeconds += (Date.now() - actionStart) / 1000;
}
actionLog.push(...actionLogItems);
break;
}
if (!imapClient) {
await logJobEvent(cleanupJobId, "info", "Skipping IMAP action: no IMAP client");
actionLog.push({ type: action.type, target: action.target ?? null, status: "skipped" });
} else {
if ((action.type === "MOVE" || action.type === "ARCHIVE" || action.type === "LABEL") && action.target) {
await imapClient.mailboxCreate(action.target).catch(() => undefined);
await imapClient.messageMove(msg.uid, action.target);
await logJobEvent(cleanupJobId, "info", `Moved message ${msg.uid} to ${action.target}`);
}
if (action.type === "DELETE") {
await imapClient.messageDelete(msg.uid);
await logJobEvent(cleanupJobId, "info", `Deleted message ${msg.uid}`);
const actionStart = Date.now();
actionAttempts += 1;
try {
if ((action.type === "MOVE" || action.type === "ARCHIVE" || action.type === "LABEL") && action.target) {
await imapClient.mailboxCreate(action.target).catch(() => undefined);
await imapClient.messageMove(msg.uid, action.target);
await logJobEvent(cleanupJobId, "info", `Moved message ${msg.uid} to ${action.target}`);
}
if (action.type === "DELETE") {
await imapClient.messageDelete(msg.uid);
await logJobEvent(cleanupJobId, "info", `Deleted message ${msg.uid}`);
}
if (action.type === "MARK_READ") {
await imapClient.messageFlagsAdd(msg.uid, ["\\Seen"]);
await logJobEvent(cleanupJobId, "info", `Marked message ${msg.uid} as read`);
}
if (action.type === "MARK_UNREAD") {
await imapClient.messageFlagsRemove(msg.uid, ["\\Seen"]);
await logJobEvent(cleanupJobId, "info", `Marked message ${msg.uid} as unread`);
}
actionLog.push({ type: action.type, target: action.target ?? null, status: "applied" });
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
await logJobEvent(cleanupJobId, "error", `IMAP action ${action.type} failed: ${message}`);
actionLog.push({ type: action.type, target: action.target ?? null, status: "failed", error: message });
} finally {
routingSeconds += (Date.now() - actionStart) / 1000;
}
}
}
}
if (job.unsubscribeEnabled) {
const listUnsubscribe = msg.headers.get("list-unsubscribe") ?? null;
const listUnsubscribePost = msg.headers.get("list-unsubscribe-post") ?? null;
if (listUnsubscribe) {
const attempt = await prisma.unsubscribeAttempt.create({
data: {
jobId: cleanupJobId,
method: "list-unsubscribe",
target: listUnsubscribe,
status: "pending"
}
});
try {
if (job.dryRun) {
await prisma.unsubscribeAttempt.update({
where: { id: attempt.id },
data: { status: "dry-run" }
});
await logJobEvent(cleanupJobId, "info", `DRY RUN: unsubscribe ${listUnsubscribe}`);
} else {
const response = await unsubscribeFromHeader({
account,
listUnsubscribe,
listUnsubscribePost,
subject: msg.subject,
from: msg.from
});
await prisma.unsubscribeAttempt.update({
where: { id: attempt.id },
data: { status: response.status }
});
await logJobEvent(cleanupJobId, "info", `Unsubscribe ${response.status}: ${response.message}`);
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
await prisma.unsubscribeAttempt.update({
where: { id: attempt.id },
data: { status: "failed" }
});
await logJobEvent(cleanupJobId, "error", `Unsubscribe failed: ${message}`);
if (actionLog.length || unsubscribeStatus !== "pending" || unsubscribeTarget) {
await prisma.cleanupJobCandidate.update({
where: { id: candidate.id },
data: {
actions: actionLog.length ? actionLog : undefined,
unsubscribeStatus,
unsubscribeMessage,
unsubscribeTarget,
unsubscribeDetails
}
}
});
}
const cleanupBefore = new Date(Date.now() - config.UNSUBSCRIBE_HISTORY_TTL_DAYS * 24 * 3600 * 1000);
await prisma.unsubscribeHistory.deleteMany({
where: { tenantId: job.tenantId, createdAt: { lt: cleanupBefore } }
});
return true;
};
@@ -204,9 +491,38 @@ export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string)
let nextIndex = 0;
let messageIds: string[] = [];
let imapUids: number[] = [];
let listingSeconds: number | null = null;
let processingSeconds: number | null = null;
let unsubscribeSeconds = 0;
let routingSeconds = 0;
let unsubscribeAttempts = 0;
let actionAttempts = 0;
const imapMailboxCache = new Set<string>();
if (isGmail && hasGmailOAuth) {
const { gmail } = await gmailClientForAccount(account);
const labelCache = new Map<string, string>();
let labelsLoaded = false;
const resolveLabelId = async (labelName: string) => {
if (labelCache.has(labelName)) return labelCache.get(labelName)!;
if (!labelsLoaded) {
const list = await gmail.users.labels.list({ userId: "me" });
for (const label of list.data.labels ?? []) {
if (label.name && label.id) {
labelCache.set(label.name, label.id);
}
}
labelsLoaded = true;
if (labelCache.has(labelName)) return labelCache.get(labelName)!;
}
const created = await gmail.users.labels.create({
userId: "me",
requestBody: { name: labelName, labelListVisibility: "labelShow", messageListVisibility: "show" }
});
const id = created.data.id ?? labelName;
labelCache.set(labelName, id);
return id;
};
if (checkpoint?.provider === "GMAIL") {
messageIds = checkpoint.messageIds ?? [];
total = checkpoint.total ?? messageIds.length;
@@ -215,6 +531,7 @@ export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string)
}
if (!messageIds.length) {
const listingStart = Date.now();
const ids: string[] = [];
let pageToken: string | undefined;
let pageCount = 0;
@@ -250,6 +567,7 @@ export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string)
total
};
await saveCheckpoint(checkpoint, nextIndex, total);
listingSeconds = Math.max(1, Math.round((Date.now() - listingStart) / 1000));
await logJobEvent(cleanupJobId, "info", `Prepared ${total} Gmail messages`, 12);
} else {
await logJobEvent(cleanupJobId, "info", `Resuming Gmail cleanup at ${nextIndex}/${total}`, 12);
@@ -268,13 +586,33 @@ export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string)
if (total === 0) {
await logJobEvent(cleanupJobId, "info", "No Gmail messages to process", 90);
await prisma.cleanupJob.update({
where: { id: cleanupJobId },
data: {
listingSeconds,
processingSeconds: processingSeconds ?? null,
unsubscribeSeconds: unsubscribeSeconds ? Math.max(1, Math.round(unsubscribeSeconds)) : null,
routingSeconds: routingSeconds ? Math.max(1, Math.round(routingSeconds)) : null,
unsubscribeAttempts: unsubscribeAttempts || null,
actionAttempts: actionAttempts || null
}
});
return;
}
await logJobEvent(cleanupJobId, "info", `Processing ${total} Gmail messages`, 35);
const processingStart = Date.now();
let newsletterCount = 0;
const headersWanted = ["Subject", "From", "List-Id", "List-Unsubscribe", "List-Unsubscribe-Post", "Message-Id"];
const normalizeHeaderName = (value: string) =>
value
.split("-")
.map((part) => (part ? part[0].toUpperCase() + part.slice(1) : part))
.join("-");
const headerSet = new Set<string>(["Subject", "From", "Message-Id", "Precedence", "X-Precedence"]);
for (const key of newsletterConfig.headerKeys) {
headerSet.add(normalizeHeaderName(key));
}
const headersWanted = Array.from(headerSet);
for (let index = nextIndex; index < messageIds.length; index++) {
const statusCheck = await prisma.cleanupJob.findUnique({ where: { id: cleanupJobId } });
@@ -300,11 +638,13 @@ export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string)
uid: 0,
subject: headers.get("subject"),
from: headers.get("from"),
receivedAt: meta.data.internalDate ? new Date(Number(meta.data.internalDate)) : undefined,
headers,
gmailMessageId: id
gmailMessageId: id,
mailbox: "INBOX"
};
const isNewsletter = await processMessage(msg);
const isNewsletter = await processMessage(msg, { gmail, resolveLabelId });
if (isNewsletter) newsletterCount += 1;
const processed = index + 1;
@@ -321,6 +661,18 @@ export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string)
}
await logJobEvent(cleanupJobId, "info", `Detected ${newsletterCount} newsletter candidates`, 92);
processingSeconds = Math.max(1, Math.round((Date.now() - processingStart) / 1000));
await prisma.cleanupJob.update({
where: { id: cleanupJobId },
data: {
listingSeconds,
processingSeconds,
unsubscribeSeconds: unsubscribeSeconds ? Math.max(1, Math.round(unsubscribeSeconds)) : null,
routingSeconds: routingSeconds ? Math.max(1, Math.round(routingSeconds)) : null,
unsubscribeAttempts: unsubscribeAttempts || null,
actionAttempts: actionAttempts || null
}
});
return;
}
@@ -342,6 +694,7 @@ export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string)
await imapClient.mailboxOpen(targetMailbox, { readOnly: job.dryRun });
if (!imapUids.length) {
const listingStart = Date.now();
await logJobEvent(cleanupJobId, "info", `Scanning ${targetMailbox}`, 15);
const search = await imapClient.search({ all: true });
const limited = scanLimit && scanLimit > 0 ? search.slice(-scanLimit) : search;
@@ -357,6 +710,7 @@ export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string)
total
};
await saveCheckpoint(checkpoint, nextIndex, total);
listingSeconds = Math.max(1, Math.round((Date.now() - listingStart) / 1000));
await logJobEvent(cleanupJobId, "info", `Prepared ${total} IMAP messages`, 18);
} else {
await logJobEvent(cleanupJobId, "info", `Resuming IMAP cleanup at ${nextIndex}/${total}`, 18);
@@ -376,6 +730,17 @@ export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string)
if (total === 0) {
await logJobEvent(cleanupJobId, "info", "No IMAP messages to process", 90);
await prisma.cleanupJob.update({
where: { id: cleanupJobId },
data: {
listingSeconds,
processingSeconds: processingSeconds ?? null,
unsubscribeSeconds: unsubscribeSeconds ? Math.max(1, Math.round(unsubscribeSeconds)) : null,
routingSeconds: routingSeconds ? Math.max(1, Math.round(routingSeconds)) : null,
unsubscribeAttempts: unsubscribeAttempts || null,
actionAttempts: actionAttempts || null
}
});
return;
}
@@ -401,7 +766,7 @@ export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string)
processed += 1;
continue;
}
const isNewsletter = await processMessage(msg);
const isNewsletter = await processMessage({ ...msg, mailbox: targetMailbox });
if (isNewsletter) newsletterCount += 1;
processed += 1;
}
@@ -418,6 +783,18 @@ export const runCleanup = async (cleanupJobId: string, mailboxAccountId: string)
}
await logJobEvent(cleanupJobId, "info", `Detected ${newsletterCount} newsletter candidates`, 92);
processingSeconds = Math.max(1, Math.round((Date.now() - processingStart) / 1000));
await prisma.cleanupJob.update({
where: { id: cleanupJobId },
data: {
listingSeconds,
processingSeconds,
unsubscribeSeconds: unsubscribeSeconds ? Math.max(1, Math.round(unsubscribeSeconds)) : null,
routingSeconds: routingSeconds ? Math.max(1, Math.round(routingSeconds)) : null,
unsubscribeAttempts: unsubscribeAttempts || null,
actionAttempts: actionAttempts || null
}
});
} finally {
await imapClient?.logout().catch(() => undefined);
}

View File

@@ -105,7 +105,7 @@ export const ensureGmailLabel = async (gmail: ReturnType<typeof google.gmail>, l
export const applyGmailAction = async (params: {
account: MailboxAccount;
gmailMessageId: string;
action: "LABEL" | "MOVE" | "ARCHIVE" | "DELETE";
action: "LABEL" | "MOVE" | "ARCHIVE" | "DELETE" | "MARK_READ" | "MARK_UNREAD";
target?: string | null;
}) => {
const { gmail } = await gmailClientForAccount(params.account);
@@ -124,13 +124,34 @@ export const applyGmailAction = async (params: {
return;
}
if (params.action === "MARK_READ") {
await gmail.users.messages.modify({
userId: "me",
id: params.gmailMessageId,
requestBody: { removeLabelIds: ["UNREAD"] }
});
return;
}
if (params.action === "MARK_UNREAD") {
await gmail.users.messages.modify({
userId: "me",
id: params.gmailMessageId,
requestBody: { addLabelIds: ["UNREAD"] }
});
return;
}
if (params.action === "MOVE" || params.action === "LABEL") {
const labelName = params.target ?? "Newsletter";
const labelId = await ensureGmailLabel(gmail, labelName);
await gmail.users.messages.modify({
userId: "me",
id: params.gmailMessageId,
requestBody: { addLabelIds: [labelId] }
requestBody: {
addLabelIds: [labelId],
...(params.action === "MOVE" ? { removeLabelIds: ["INBOX"] } : {})
}
});
}
};

View File

@@ -35,6 +35,7 @@ export const fetchHeaders = async (
uid: number;
subject?: string;
from?: string;
receivedAt?: Date;
headers: Map<string, string>;
gmailMessageId?: string;
}[];
@@ -55,6 +56,7 @@ export const fetchHeaders = async (
uid: msg.uid,
subject: parsed.subject,
from: parsed.from?.text,
receivedAt: parsed.date ?? undefined,
headers,
gmailMessageId: (msg as { gmailMessageId?: string }).gmailMessageId
});
@@ -72,6 +74,7 @@ export const fetchHeadersByUids = async (client: ImapFlow, uids: number[]) => {
uid: number;
subject?: string;
from?: string;
receivedAt?: Date;
headers: Map<string, string>;
gmailMessageId?: string;
}[];
@@ -88,6 +91,7 @@ export const fetchHeadersByUids = async (client: ImapFlow, uids: number[]) => {
uid: msg.uid,
subject: parsed.subject,
from: parsed.from?.text,
receivedAt: parsed.date ?? undefined,
headers,
gmailMessageId: (msg as { gmailMessageId?: string }).gmailMessageId
});

View File

@@ -1,5 +1,33 @@
const headerIncludes = (headers: Map<string, string>, key: string) =>
headers.has(key.toLowerCase());
export type NewsletterConfig = {
threshold: number;
headerKeys: string[];
subjectTokens: string[];
fromTokens: string[];
weightHeader: number;
weightPrecedence: number;
weightSubject: number;
weightFrom: number;
};
const DEFAULT_CONFIG: NewsletterConfig = {
threshold: 2,
headerKeys: [
"list-unsubscribe",
"list-id",
"list-help",
"list-archive",
"list-post",
"list-owner",
"list-subscribe",
"list-unsubscribe-post"
],
subjectTokens: ["newsletter", "unsubscribe", "update", "news", "digest"],
fromTokens: ["newsletter", "no-reply", "noreply", "news", "updates"],
weightHeader: 1,
weightPrecedence: 1,
weightSubject: 1,
weightFrom: 1
};
const headerValue = (headers: Map<string, string>, key: string) =>
headers.get(key.toLowerCase()) ?? "";
@@ -7,39 +35,64 @@ const headerValue = (headers: Map<string, string>, key: string) =>
const containsAny = (value: string, tokens: string[]) =>
tokens.some((token) => value.includes(token));
const normalizeList = (items: string[]) =>
items.map((item) => item.trim().toLowerCase()).filter(Boolean);
export const detectNewsletter = (params: {
headers: Map<string, string>;
subject?: string | null;
from?: string | null;
config?: Partial<NewsletterConfig>;
}) => {
const subject = (params.subject ?? "").toLowerCase();
const from = (params.from ?? "").toLowerCase();
const headers = params.headers;
const config: NewsletterConfig = {
threshold: params.config?.threshold ?? DEFAULT_CONFIG.threshold,
headerKeys: normalizeList(params.config?.headerKeys ?? DEFAULT_CONFIG.headerKeys),
subjectTokens: normalizeList(params.config?.subjectTokens ?? DEFAULT_CONFIG.subjectTokens),
fromTokens: normalizeList(params.config?.fromTokens ?? DEFAULT_CONFIG.fromTokens),
weightHeader: params.config?.weightHeader ?? DEFAULT_CONFIG.weightHeader,
weightPrecedence: params.config?.weightPrecedence ?? DEFAULT_CONFIG.weightPrecedence,
weightSubject: params.config?.weightSubject ?? DEFAULT_CONFIG.weightSubject,
weightFrom: params.config?.weightFrom ?? DEFAULT_CONFIG.weightFrom
};
const hasListUnsubscribe = headerIncludes(headers, "list-unsubscribe");
const hasListId = headerIncludes(headers, "list-id");
const matchedHeaderKeys = config.headerKeys.filter((key) => headers.has(key));
const precedence = headerValue(headers, "precedence").toLowerCase();
const bulkHeader = headerValue(headers, "x-precedence").toLowerCase();
const precedenceHint = containsAny(precedence, ["bulk", "list"]) || containsAny(bulkHeader, ["bulk", "list"]);
const headerHints = containsAny(precedence, ["bulk", "list"]) ||
containsAny(bulkHeader, ["bulk", "list"]) ||
headerIncludes(headers, "list-unsubscribe-post");
const subjectMatches = config.subjectTokens.filter((token) => subject.includes(token));
const fromMatches = config.fromTokens.filter((token) => from.includes(token));
const subjectHints = containsAny(subject, ["newsletter", "unsubscribe", "update", "news", "digest"]);
const fromHints = containsAny(from, ["newsletter", "no-reply", "noreply", "news", "updates"]);
const score = [hasListUnsubscribe, hasListId, headerHints, subjectHints, fromHints].filter(Boolean).length;
const headerScore = matchedHeaderKeys.length * config.weightHeader;
const precedenceScore = precedenceHint ? config.weightPrecedence : 0;
const subjectScore = subjectMatches.length ? config.weightSubject : 0;
const fromScore = fromMatches.length ? config.weightFrom : 0;
const score = headerScore + precedenceScore + subjectScore + fromScore;
return {
isNewsletter: score >= 2,
isNewsletter: score >= config.threshold,
score,
signals: {
hasListUnsubscribe,
hasListId,
headerHints,
subjectHints,
fromHints
headerKeys: matchedHeaderKeys,
precedenceHint,
subjectTokens: subjectMatches,
fromTokens: fromMatches,
scoreBreakdown: {
headerMatches: matchedHeaderKeys.length,
headerWeight: config.weightHeader,
headerScore,
precedenceWeight: config.weightPrecedence,
precedenceScore,
subjectMatches: subjectMatches.length,
subjectWeight: config.weightSubject,
subjectScore,
fromMatches: fromMatches.length,
fromWeight: config.weightFrom,
fromScore
}
}
};
};

View File

@@ -203,6 +203,9 @@ export async function mailRoutes(app: FastifyInstance) {
await tx.unsubscribeAttempt.deleteMany({
where: { job: { mailboxAccountId: account.id } }
});
await tx.cleanupJobCandidate.deleteMany({
where: { mailboxAccountId: account.id }
});
await tx.cleanupJob.deleteMany({
where: { mailboxAccountId: account.id }
});

View File

@@ -6,25 +6,66 @@ const getHeader = (headers: Map<string, string>, name: string) =>
const contains = (value: string, needle: string) =>
value.toLowerCase().includes(needle.toLowerCase());
const isWildcard = (value: string) => value.trim() === "*";
const matchCondition = (condition: RuleCondition, ctx: {
subject: string;
from: string;
headers: Map<string, string>;
unsubscribeStatus?: string | null;
newsletterScore?: number | null;
}) => {
const value = condition.value;
switch (condition.type) {
case "SUBJECT":
return contains(ctx.subject, value);
return isWildcard(value) ? ctx.subject.trim().length > 0 : contains(ctx.subject, value);
case "FROM":
return contains(ctx.from, value);
return isWildcard(value) ? ctx.from.trim().length > 0 : contains(ctx.from, value);
case "LIST_ID":
return contains(getHeader(ctx.headers, "list-id"), value);
return isWildcard(value)
? getHeader(ctx.headers, "list-id").trim().length > 0
: contains(getHeader(ctx.headers, "list-id"), value);
case "LIST_UNSUBSCRIBE":
return contains(getHeader(ctx.headers, "list-unsubscribe"), value);
return isWildcard(value)
? getHeader(ctx.headers, "list-unsubscribe").trim().length > 0
: contains(getHeader(ctx.headers, "list-unsubscribe"), value);
case "HEADER": {
const [headerName, headerValue] = value.split(":");
if (!headerName || !headerValue) return false;
return contains(getHeader(ctx.headers, headerName.trim()), headerValue.trim());
const parts = value.split(":");
const headerName = parts[0]?.trim();
const headerValue = parts.slice(1).join(":").trim();
if (!headerName) return false;
const headerContent = getHeader(ctx.headers, headerName);
if (!headerValue || isWildcard(headerValue)) {
return headerContent.trim().length > 0;
}
return contains(headerContent, headerValue);
}
case "HEADER_MISSING": {
const parts = value.split(":");
const headerName = parts[0]?.trim();
if (!headerName) return false;
const headerContent = getHeader(ctx.headers, headerName);
return headerContent.trim().length === 0;
}
case "UNSUBSCRIBE_STATUS": {
if (!ctx.unsubscribeStatus) return false;
return isWildcard(value)
? ctx.unsubscribeStatus.trim().length > 0
: ctx.unsubscribeStatus.toLowerCase() === value.toLowerCase();
}
case "SCORE": {
if (ctx.newsletterScore === null || ctx.newsletterScore === undefined) return false;
const trimmed = value.trim();
const match = trimmed.match(/^(>=|<=|>|<|=)?\s*(\d+)$/);
if (!match) return false;
const op = match[1] ?? ">=";
const threshold = Number.parseInt(match[2], 10);
if (!Number.isFinite(threshold)) return false;
if (op === ">") return ctx.newsletterScore > threshold;
if (op === "<") return ctx.newsletterScore < threshold;
if (op === "<=") return ctx.newsletterScore <= threshold;
if (op === "=") return ctx.newsletterScore === threshold;
return ctx.newsletterScore >= threshold;
}
default:
return false;
@@ -35,14 +76,21 @@ export const matchRules = (rules: (Rule & { conditions: RuleCondition[]; actions
subject: string;
from: string;
headers: Map<string, string>;
unsubscribeStatus?: string | null;
newsletterScore?: number | null;
}) => {
const matched: RuleAction[] = [];
for (const rule of rules) {
if (!rule.enabled) continue;
const allMatch = rule.conditions.every((condition) => matchCondition(condition, ctx));
if (allMatch) {
const anyMatch = rule.conditions.some((condition) => matchCondition(condition, ctx));
const shouldApply = rule.matchMode === "ANY" ? anyMatch : allMatch;
if (shouldApply) {
matched.push(...rule.actions);
if (rule.stopOnMatch) {
break;
}
}
}

View File

@@ -3,6 +3,8 @@ import { MailboxAccount } from "@prisma/client";
import { isPrivateHost } from "../security/ssrf.js";
import { decryptSecret } from "../security/crypto.js";
import { config } from "../config.js";
import { gmailClientForAccount } from "./gmail.js";
import { getSetting } from "../admin/settings.js";
const parseListUnsubscribe = (value: string) => {
const tokens = value
@@ -24,67 +26,123 @@ export const unsubscribeFromHeader = async (params: {
from?: string | null;
}) => {
if (!params.listUnsubscribe) {
return { status: "skipped", message: "No List-Unsubscribe header" };
return { status: "skipped", message: "No List-Unsubscribe header", details: { method: "NONE" } };
}
const { httpLinks, mailtoLinks } = parseListUnsubscribe(params.listUnsubscribe);
const postHint = (params.listUnsubscribePost ?? "").toLowerCase();
const preference = ((await getSetting("unsubscribe.method_preference")) ?? config.UNSUBSCRIBE_METHOD_PREFERENCE ?? "auto").toLowerCase();
if (httpLinks.length > 0) {
const target = httpLinks[0];
const tryHttp = async (target: string) => {
let parsed: URL;
try {
parsed = new URL(target);
} catch {
return { status: "failed", message: "Invalid unsubscribe URL" };
return { status: "failed", message: "Invalid unsubscribe URL", details: { method: "HTTP", url: target } };
}
if (!["http:", "https:"].includes(parsed.protocol)) {
return { status: "failed", message: "Unsupported URL scheme" };
return { status: "failed", message: "Unsupported URL scheme", details: { method: "HTTP", url: target } };
}
if (config.BLOCK_PRIVATE_NETWORKS && await isPrivateHost(parsed.hostname)) {
return { status: "failed", message: "Blocked private network URL" };
return { status: "failed", message: "Blocked private network URL", details: { method: "HTTP", url: target } };
}
const usePost = postHint.includes("one-click");
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 8000);
const response = await fetch(target, {
method: usePost ? "POST" : "GET",
headers: usePost ? { "Content-Type": "application/x-www-form-urlencoded" } : undefined,
body: usePost ? "List-Unsubscribe=One-Click" : undefined,
redirect: "manual",
signal: controller.signal
});
clearTimeout(timeout);
try {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 8000);
const response = await fetch(target, {
method: usePost ? "POST" : "GET",
headers: usePost ? { "Content-Type": "application/x-www-form-urlencoded" } : undefined,
body: usePost ? "List-Unsubscribe=One-Click" : undefined,
redirect: "manual",
signal: controller.signal
});
clearTimeout(timeout);
if (response.status >= 300 && response.status < 400) {
const location = response.headers.get("location");
if (!location) {
return { status: "failed", message: `HTTP ${response.status}` };
}
try {
const redirected = new URL(location, parsed);
if (config.BLOCK_PRIVATE_NETWORKS && await isPrivateHost(redirected.hostname)) {
return { status: "failed", message: "Blocked private redirect" };
if (response.status >= 300 && response.status < 400) {
const location = response.headers.get("location");
if (!location) {
return { status: "failed", message: `HTTP ${response.status}`, details: { method: usePost ? "POST" : "GET", url: target, status: response.status } };
}
try {
const redirected = new URL(location, parsed);
if (config.BLOCK_PRIVATE_NETWORKS && await isPrivateHost(redirected.hostname)) {
return { status: "failed", message: "Blocked private redirect", details: { method: usePost ? "POST" : "GET", url: target, redirect: location } };
}
} catch {
return { status: "failed", message: "Invalid redirect URL", details: { method: usePost ? "POST" : "GET", url: target, redirect: location } };
}
} catch {
return { status: "failed", message: "Invalid redirect URL" };
}
return {
status: response.ok ? "ok" : "failed",
message: `HTTP ${response.status}`,
details: { method: usePost ? "POST" : "GET", url: target, status: response.status }
};
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
return { status: "failed", message: "HTTP request failed", details: { method: "HTTP", url: target, error: message } };
}
};
const tryMailto = async (target: string) => {
const mailtoUrl = (() => {
try {
return new URL(target);
} catch {
return null;
}
})();
const to = mailtoUrl ? decodeURIComponent(mailtoUrl.pathname) : target.replace("mailto:", "");
const mailSubject = mailtoUrl?.searchParams.get("subject") ?? params.subject ?? "Unsubscribe";
const mailBody = mailtoUrl?.searchParams.get("body") ?? "Please unsubscribe me from this mailing list.";
if (params.account.provider === "GMAIL" && (params.account.oauthAccessToken || params.account.oauthRefreshToken)) {
const { gmail } = await gmailClientForAccount(params.account);
const rawMessage = [
`From: ${params.account.email}`,
`To: ${to}`,
`Subject: ${mailSubject}`,
...(params.from ? [`Reply-To: ${params.from}`] : []),
...(params.listUnsubscribe ? [`List-Unsubscribe: ${params.listUnsubscribe}`] : []),
"MIME-Version: 1.0",
"Content-Type: text/plain; charset=UTF-8",
"",
mailBody
].join("\r\n");
const raw = Buffer.from(rawMessage)
.toString("base64")
.replace(/\+/g, "-")
.replace(/\//g, "_")
.replace(/=+$/g, "");
await gmail.users.messages.send({
userId: "me",
requestBody: { raw }
});
return {
status: "ok",
message: "Unsubscribe email sent via Gmail API",
details: {
method: "MAILTO",
via: "gmail",
to,
subject: mailSubject,
body: mailBody,
replyTo: params.from ?? null,
listUnsubscribe: params.listUnsubscribe ?? null
}
};
}
return { status: response.ok ? "ok" : "failed", message: `HTTP ${response.status}` };
}
if (mailtoLinks.length > 0) {
const target = mailtoLinks[0];
const smtpHost = params.account.smtpHost;
const smtpPort = params.account.smtpPort ?? 587;
const smtpTLS = params.account.smtpTLS ?? true;
if (!smtpHost || !params.account.appPassword) {
return { status: "failed", message: "SMTP credentials missing" };
return { status: "failed", message: "SMTP credentials missing", details: { method: "MAILTO", via: "smtp", to } };
}
const transporter = nodemailer.createTransport({
@@ -99,13 +157,57 @@ export const unsubscribeFromHeader = async (params: {
await transporter.sendMail({
from: params.account.email,
to: target.replace("mailto:", ""),
subject: params.subject ?? "Unsubscribe",
text: "Please unsubscribe me from this mailing list."
to,
subject: mailSubject,
text: mailBody,
replyTo: params.from ?? undefined,
headers: params.listUnsubscribe ? { "List-Unsubscribe": params.listUnsubscribe } : undefined
});
return { status: "ok", message: "Unsubscribe email sent" };
return {
status: "ok",
message: "Unsubscribe email sent",
details: {
method: "MAILTO",
via: "smtp",
to,
subject: mailSubject,
body: mailBody,
replyTo: params.from ?? null,
listUnsubscribe: params.listUnsubscribe ?? null
}
};
};
const hasHttp = httpLinks.length > 0;
const hasMailto = mailtoLinks.length > 0;
const fallbackToMailto = async (httpResult: { status: string; message: string; details?: Record<string, unknown> }) => {
if (!hasMailto) return httpResult;
const mailResult = await tryMailto(mailtoLinks[0]);
return {
...mailResult,
message: mailResult.status === "ok" ? `${mailResult.message} (HTTP failed)` : `${mailResult.message} (HTTP failed)`
};
};
if (preference === "mailto") {
if (hasMailto) {
return tryMailto(mailtoLinks[0]);
}
if (hasHttp) {
return tryHttp(httpLinks[0]);
}
} else {
if (hasHttp) {
const httpResult = await tryHttp(httpLinks[0]);
if (httpResult.status === "ok") return httpResult;
return fallbackToMailto(httpResult);
}
if (hasMailto) {
return tryMailto(mailtoLinks[0]);
}
}
return { status: "failed", message: "No supported unsubscribe link" };
return { status: "failed", message: "No supported unsubscribe link", details: { method: "NONE" } };
};

View File

@@ -1,6 +1,10 @@
import { FastifyInstance } from "fastify";
import { prisma } from "../db.js";
import { config } from "../config.js";
import { createImapClient } from "../mail/imap.js";
import { applyGmailAction, gmailClientForAccount } from "../mail/gmail.js";
import { createImapClient } from "../mail/imap.js";
import { simpleParser } from "mailparser";
export async function queueRoutes(app: FastifyInstance) {
app.addHook("preHandler", app.authenticate);
@@ -35,7 +39,35 @@ export async function queueRoutes(app: FastifyInstance) {
orderBy: { createdAt: "desc" }
});
return { jobs };
const metric = await prisma.tenantMetric.findUnique({
where: { tenantId: request.user.tenantId }
});
const providerMetrics = await prisma.tenantProviderMetric.findMany({
where: { tenantId: request.user.tenantId }
});
return {
jobs,
meta: {
avgProcessingRate: metric?.avgProcessingRate ?? null,
sampleCount: metric?.sampleCount ?? 0,
providerMetrics: providerMetrics.map((item) => ({
provider: item.provider,
avgListingRate: item.avgListingRate,
avgProcessingRate: item.avgProcessingRate,
avgUnsubscribeRate: item.avgUnsubscribeRate,
avgRoutingRate: item.avgRoutingRate,
avgListingSecondsPerMessage: item.avgListingSecondsPerMessage,
avgProcessingSecondsPerMessage: item.avgProcessingSecondsPerMessage,
avgUnsubscribeSecondsPerMessage: item.avgUnsubscribeSecondsPerMessage,
avgRoutingSecondsPerMessage: item.avgRoutingSecondsPerMessage,
listingSampleCount: item.listingSampleCount,
processingSampleCount: item.processingSampleCount,
unsubscribeSampleCount: item.unsubscribeSampleCount,
routingSampleCount: item.routingSampleCount
}))
}
};
});
app.get("/:id", async (request, reply) => {
@@ -71,6 +103,704 @@ export async function queueRoutes(app: FastifyInstance) {
return { events };
});
app.get("/:id/candidates", async (request, reply) => {
const params = request.params as { id: string };
const query = request.query as {
groupBy?: string;
groupValue?: string;
limit?: string;
cursor?: string;
q?: string;
status?: string;
reviewed?: string;
};
const job = await prisma.cleanupJob.findFirst({
where: { id: params.id, tenantId: request.user.tenantId }
});
if (!job) {
return reply.code(404).send({ message: "Job not found" });
}
const groupBy = query.groupBy ?? "none";
const limitRaw = query.limit ? Number.parseInt(query.limit, 10) : 200;
const limit = Number.isFinite(limitRaw) ? Math.min(Math.max(limitRaw, 1), 500) : 200;
const cursor = query.cursor ?? null;
type GroupField = "fromDomain" | "from" | "listId";
const field: GroupField | null =
groupBy === "domain"
? "fromDomain"
: groupBy === "from"
? "from"
: groupBy === "listId"
? "listId"
: null;
const baseWhere: Record<string, unknown> = { jobId: job.id };
if (query.q) {
baseWhere.OR = [
{ subject: { contains: query.q, mode: "insensitive" } },
{ from: { contains: query.q, mode: "insensitive" } },
{ listId: { contains: query.q, mode: "insensitive" } }
];
}
if (query.status) {
baseWhere.unsubscribeStatus = query.status;
}
if (query.reviewed === "true") {
baseWhere.reviewed = true;
}
if (query.reviewed === "false") {
baseWhere.reviewed = false;
}
if (field && query.groupValue === undefined) {
const groups = await prisma.cleanupJobCandidate.groupBy({
by: [field],
where: baseWhere,
_count: { id: true },
orderBy: { _count: { id: "desc" } }
});
return {
groups: groups.map((item) => ({
key: (item as Record<GroupField, string | null>)[field] ?? "",
count: item._count.id
}))
};
}
const where: Record<string, unknown> = { ...baseWhere };
if (field) {
const groupValue = query.groupValue ?? "";
where[field] = groupValue ? groupValue : null;
}
const total = await prisma.cleanupJobCandidate.count({ where });
const items = await prisma.cleanupJobCandidate.findMany({
where,
orderBy: { id: "desc" },
take: limit + 1,
...(cursor ? { cursor: { id: cursor }, skip: 1 } : {})
});
const hasMore = items.length > limit;
const slice = hasMore ? items.slice(0, limit) : items;
const dedupeKeys = items
.map((item) => {
if (item.listId) return `list-id:${item.listId.toLowerCase()}`;
if (item.listUnsubscribe) {
return item.listUnsubscribe
.split(",")
.map((token) => token.trim().replace(/^<|>$/g, "").toLowerCase())
.filter(Boolean)
.sort()
.join(",");
}
return null;
})
.filter(Boolean) as string[];
const histories = dedupeKeys.length
? await prisma.unsubscribeHistory.findMany({
where: { tenantId: job.tenantId, dedupeKey: { in: dedupeKeys } }
})
: [];
const historyMap = new Map(histories.map((item) => [item.dedupeKey, item]));
return {
total,
nextCursor: hasMore ? slice[slice.length - 1]?.id ?? null : null,
items: slice.map((item) => ({
id: item.id,
subject: item.subject,
from: item.from,
fromDomain: item.fromDomain,
receivedAt: item.receivedAt,
listId: item.listId,
listUnsubscribe: item.listUnsubscribe,
listUnsubscribePost: item.listUnsubscribePost,
score: item.score,
signals: item.signals,
actions: item.actions,
unsubscribeStatus: item.unsubscribeStatus,
unsubscribeMessage: item.unsubscribeMessage,
unsubscribeTarget: item.unsubscribeTarget,
unsubscribeDetails: item.unsubscribeDetails,
history: (() => {
const key = item.listId
? `list-id:${item.listId.toLowerCase()}`
: item.listUnsubscribe
? item.listUnsubscribe
.split(",")
.map((token) => token.trim().replace(/^<|>$/g, "").toLowerCase())
.filter(Boolean)
.sort()
.join(",")
: null;
if (!key) return null;
const history = historyMap.get(key);
if (!history) return null;
return {
status: history.status,
createdAt: history.createdAt,
target: history.target
};
})(),
reviewed: item.reviewed
}))
};
});
app.patch("/:id/candidates/:candidateId", async (request, reply) => {
const params = request.params as { id: string; candidateId: string };
const body = request.body as { reviewed?: boolean };
const job = await prisma.cleanupJob.findFirst({
where: { id: params.id, tenantId: request.user.tenantId }
});
if (!job) {
return reply.code(404).send({ message: "Job not found" });
}
const updated = await prisma.cleanupJobCandidate.updateMany({
where: { id: params.candidateId, jobId: job.id },
data: { reviewed: body.reviewed ?? false }
});
if (!updated.count) {
return reply.code(404).send({ message: "Candidate not found" });
}
return { success: true };
});
app.post("/:id/candidates/mark-reviewed", async (request, reply) => {
const params = request.params as { id: string };
const query = request.query as { q?: string; status?: string; reviewed?: string };
const body = request.body as { reviewed?: boolean };
const job = await prisma.cleanupJob.findFirst({
where: { id: params.id, tenantId: request.user.tenantId }
});
if (!job) {
return reply.code(404).send({ message: "Job not found" });
}
const where: Record<string, unknown> = { jobId: job.id };
if (query.q) {
where.OR = [
{ subject: { contains: query.q, mode: "insensitive" } },
{ from: { contains: query.q, mode: "insensitive" } },
{ listId: { contains: query.q, mode: "insensitive" } }
];
}
if (query.status) {
where.unsubscribeStatus = query.status;
}
if (query.reviewed === "true") {
where.reviewed = true;
}
if (query.reviewed === "false") {
where.reviewed = false;
}
const updated = await prisma.cleanupJobCandidate.updateMany({
where,
data: { reviewed: body.reviewed ?? true }
});
return { updated: updated.count };
});
app.post("/:id/candidates/batch", async (request, reply) => {
const params = request.params as { id: string };
const body = request.body as { ids?: string[]; reviewed?: boolean };
const job = await prisma.cleanupJob.findFirst({
where: { id: params.id, tenantId: request.user.tenantId }
});
if (!job) {
return reply.code(404).send({ message: "Job not found" });
}
const ids = (body.ids ?? []).filter(Boolean);
if (!ids.length) {
return reply.code(400).send({ message: "No candidate ids provided" });
}
const updated = await prisma.cleanupJobCandidate.updateMany({
where: { jobId: job.id, id: { in: ids } },
data: { reviewed: body.reviewed ?? true }
});
return { updated: updated.count };
});
app.post("/:id/candidates/delete-gmail", async (request, reply) => {
const params = request.params as { id: string };
const body = request.body as { ids?: string[] };
const job = await prisma.cleanupJob.findFirst({
where: { id: params.id, tenantId: request.user.tenantId }
});
if (!job) {
return reply.code(404).send({ message: "Job not found" });
}
const ids = (body.ids ?? []).filter(Boolean);
if (!ids.length) {
return reply.code(400).send({ message: "No candidate ids provided" });
}
const account = await prisma.mailboxAccount.findUnique({ where: { id: job.mailboxAccountId } });
if (!account || account.provider !== "GMAIL") {
return reply.code(400).send({ message: "Gmail account required" });
}
const candidates = await prisma.cleanupJobCandidate.findMany({
where: { jobId: job.id, id: { in: ids } }
});
let deleted = 0;
let missing = 0;
let failed = 0;
for (const candidate of candidates) {
const externalId = candidate.externalId ?? "";
const gmailMessageId = externalId.startsWith("gmail:") ? externalId.slice(6) : null;
if (!gmailMessageId) {
failed += 1;
continue;
}
try {
await applyGmailAction({
account,
gmailMessageId,
action: "DELETE"
});
deleted += 1;
} catch (err) {
const status = (err as { code?: number; response?: { status?: number } })?.response?.status
?? (err as { code?: number }).code;
if (status === 404 || status === 410) {
missing += 1;
} else {
failed += 1;
}
}
}
return { deleted, missing, failed, total: candidates.length };
});
app.post("/:id/candidates/delete", async (request, reply) => {
const params = request.params as { id: string };
const body = request.body as { ids?: string[] };
const job = await prisma.cleanupJob.findFirst({
where: { id: params.id, tenantId: request.user.tenantId }
});
if (!job) {
return reply.code(404).send({ message: "Job not found" });
}
const ids = (body.ids ?? []).filter(Boolean);
if (!ids.length) {
return reply.code(400).send({ message: "No candidate ids provided" });
}
const account = await prisma.mailboxAccount.findUnique({ where: { id: job.mailboxAccountId } });
if (!account) {
return reply.code(404).send({ message: "Mailbox account not found" });
}
const candidates = await prisma.cleanupJobCandidate.findMany({
where: { jobId: job.id, id: { in: ids } }
});
let deleted = 0;
let missing = 0;
let failed = 0;
if (account.provider === "GMAIL") {
for (const candidate of candidates) {
const externalId = candidate.externalId ?? "";
const gmailMessageId = externalId.startsWith("gmail:") ? externalId.slice(6) : null;
if (!gmailMessageId) {
failed += 1;
continue;
}
try {
await applyGmailAction({
account,
gmailMessageId,
action: "DELETE"
});
deleted += 1;
} catch (err) {
const status = (err as { response?: { status?: number }; code?: number })?.response?.status
?? (err as { code?: number }).code;
if (status === 404 || status === 410) {
missing += 1;
} else {
failed += 1;
}
}
}
return { deleted, missing, failed, total: candidates.length };
}
const imapClient = createImapClient(account);
let currentMailbox: string | null = null;
try {
await imapClient.connect();
for (const candidate of candidates) {
const externalId = candidate.externalId ?? "";
if (!externalId.startsWith("imap:")) {
failed += 1;
continue;
}
const rest = externalId.slice(5);
const lastColon = rest.lastIndexOf(":");
if (lastColon === -1) {
failed += 1;
continue;
}
const mailbox = rest.slice(0, lastColon);
const uidRaw = rest.slice(lastColon + 1);
const uid = Number.parseInt(uidRaw, 10);
if (!Number.isFinite(uid)) {
failed += 1;
continue;
}
try {
if (mailbox && mailbox !== currentMailbox) {
await imapClient.mailboxOpen(mailbox, { readOnly: false });
currentMailbox = mailbox;
}
await imapClient.messageDelete(uid);
deleted += 1;
} catch (err) {
const message = (err as Error).message?.toLowerCase?.() ?? "";
if (message.includes("no such message") || message.includes("not found") || message.includes("does not exist")) {
missing += 1;
} else {
failed += 1;
}
}
}
} finally {
await imapClient.logout().catch(() => undefined);
}
return { deleted, missing, failed, total: candidates.length };
});
app.get("/:id/candidates/export", async (request, reply) => {
const params = request.params as { id: string };
const query = request.query as { q?: string; status?: string; reviewed?: string };
const job = await prisma.cleanupJob.findFirst({
where: { id: params.id, tenantId: request.user.tenantId }
});
if (!job) {
return reply.code(404).send({ message: "Job not found" });
}
const where: Record<string, unknown> = { jobId: job.id };
if (query.status === "group") {
// reserved for group export when used with dedicated endpoint
}
if (query.q) {
where.OR = [
{ subject: { contains: query.q, mode: "insensitive" } },
{ from: { contains: query.q, mode: "insensitive" } },
{ listId: { contains: query.q, mode: "insensitive" } }
];
}
if (query.status) {
where.unsubscribeStatus = query.status;
}
if (query.reviewed === "true") {
where.reviewed = true;
}
if (query.reviewed === "false") {
where.reviewed = false;
}
const candidates = await prisma.cleanupJobCandidate.findMany({
where,
orderBy: { createdAt: "desc" }
});
const header = [
"subject",
"from",
"receivedAt",
"listId",
"unsubscribeStatus",
"unsubscribeMessage",
"score",
"reviewed"
].join(",");
const rows = candidates.map((item) => [
JSON.stringify(item.subject ?? ""),
JSON.stringify(item.from ?? ""),
JSON.stringify(item.receivedAt ? item.receivedAt.toISOString() : ""),
JSON.stringify(item.listId ?? ""),
JSON.stringify(item.unsubscribeStatus ?? ""),
JSON.stringify(item.unsubscribeMessage ?? ""),
item.score,
item.reviewed ? "true" : "false"
].join(","));
reply.header("Content-Type", "text/csv");
return reply.send([header, ...rows].join("\\n"));
});
app.get("/:id/candidates/export-group", async (request, reply) => {
const params = request.params as { id: string };
const query = request.query as { groupBy?: string; groupValue?: string; q?: string; status?: string; reviewed?: string };
const job = await prisma.cleanupJob.findFirst({
where: { id: params.id, tenantId: request.user.tenantId }
});
if (!job) {
return reply.code(404).send({ message: "Job not found" });
}
const where: Record<string, unknown> = { jobId: job.id };
const field =
query.groupBy === "domain"
? "fromDomain"
: query.groupBy === "from"
? "from"
: query.groupBy === "listId"
? "listId"
: null;
if (field && query.groupValue !== undefined) {
const value = query.groupValue ?? "";
where[field] = value ? value : null;
}
if (query.q) {
where.OR = [
{ subject: { contains: query.q, mode: "insensitive" } },
{ from: { contains: query.q, mode: "insensitive" } },
{ listId: { contains: query.q, mode: "insensitive" } }
];
}
if (query.status) {
where.unsubscribeStatus = query.status;
}
if (query.reviewed === "true") {
where.reviewed = true;
}
if (query.reviewed === "false") {
where.reviewed = false;
}
const candidates = await prisma.cleanupJobCandidate.findMany({
where,
orderBy: { createdAt: "desc" }
});
const header = [
"subject",
"from",
"receivedAt",
"listId",
"unsubscribeStatus",
"unsubscribeMessage",
"score",
"reviewed"
].join(",");
const rows = candidates.map((item) => [
JSON.stringify(item.subject ?? ""),
JSON.stringify(item.from ?? ""),
JSON.stringify(item.receivedAt ? item.receivedAt.toISOString() : ""),
JSON.stringify(item.listId ?? ""),
JSON.stringify(item.unsubscribeStatus ?? ""),
JSON.stringify(item.unsubscribeMessage ?? ""),
item.score,
item.reviewed ? "true" : "false"
].join(","));
reply.header("Content-Type", "text/csv");
return reply.send([header, ...rows].join("\\n"));
});
app.get("/:id/candidates/:candidateId/preview", async (request, reply) => {
const params = request.params as { id: string; candidateId: string };
const job = await prisma.cleanupJob.findFirst({
where: { id: params.id, tenantId: request.user.tenantId }
});
if (!job) {
return reply.code(404).send({ message: "Job not found" });
}
const candidate = await prisma.cleanupJobCandidate.findFirst({
where: { id: params.candidateId, jobId: job.id }
});
if (!candidate) {
return reply.code(404).send({ message: "Candidate not found" });
}
const account = await prisma.mailboxAccount.findUnique({
where: { id: candidate.mailboxAccountId }
});
if (!account) {
return reply.code(404).send({ message: "Mailbox account not found" });
}
const trimPayload = (value?: string | null) => {
if (!value) return null;
const limit = 200000;
return value.length > limit ? `${value.slice(0, limit)}...` : value;
};
if (account.provider === "GMAIL" && candidate.externalId.startsWith("gmail:")) {
const messageId = candidate.externalId.replace("gmail:", "");
const { gmail } = await gmailClientForAccount(account);
const raw = await gmail.users.messages.get({ userId: "me", id: messageId, format: "raw" });
const rawValue = raw.data.raw ?? "";
const normalized = rawValue.replace(/-/g, "+").replace(/_/g, "/");
const padding = normalized.length % 4 ? "=".repeat(4 - (normalized.length % 4)) : "";
const buffer = Buffer.from(normalized + padding, "base64");
const parsed = await simpleParser(buffer);
return {
subject: parsed.subject ?? candidate.subject,
from: parsed.from?.text ?? candidate.from,
to: parsed.to?.text ?? null,
date: parsed.date?.toISOString() ?? null,
text: trimPayload(parsed.text ?? null),
html: trimPayload(typeof parsed.html === "string" ? parsed.html : null),
attachments: (parsed.attachments ?? []).map((item, index) => ({
id: index,
filename: item.filename ?? null,
contentType: item.contentType ?? null,
size: item.size ?? null
}))
};
}
if (candidate.externalId.startsWith("imap:")) {
const rest = candidate.externalId.replace("imap:", "");
const lastIndex = rest.lastIndexOf(":");
const mailbox = lastIndex === -1 ? "INBOX" : rest.slice(0, lastIndex);
const uid = lastIndex === -1 ? Number.parseInt(rest, 10) : Number.parseInt(rest.slice(lastIndex + 1), 10);
if (!Number.isFinite(uid)) {
return reply.code(400).send({ message: "Invalid IMAP message reference" });
}
const client = createImapClient(account);
await client.connect();
try {
await client.mailboxOpen(mailbox, { readOnly: true });
let source: Buffer | null = null;
for await (const msg of client.fetch([uid], { source: true, envelope: true })) {
source = msg.source ?? null;
}
if (!source) {
return reply.code(404).send({ message: "Message not found" });
}
const parsed = await simpleParser(source);
return {
subject: parsed.subject ?? candidate.subject,
from: parsed.from?.text ?? candidate.from,
to: parsed.to?.text ?? null,
date: parsed.date?.toISOString() ?? null,
text: trimPayload(parsed.text ?? null),
html: trimPayload(typeof parsed.html === "string" ? parsed.html : null),
attachments: (parsed.attachments ?? []).map((item, index) => ({
id: index,
filename: item.filename ?? null,
contentType: item.contentType ?? null,
size: item.size ?? null
}))
};
} finally {
await client.logout().catch(() => undefined);
}
}
return reply.code(400).send({ message: "Unsupported provider" });
});
app.get("/:id/candidates/:candidateId/attachments/:index", async (request, reply) => {
const params = request.params as { id: string; candidateId: string; index: string };
const attachmentIndex = Number.parseInt(params.index, 10);
if (!Number.isFinite(attachmentIndex) || attachmentIndex < 0) {
return reply.code(400).send({ message: "Invalid attachment index" });
}
const job = await prisma.cleanupJob.findFirst({
where: { id: params.id, tenantId: request.user.tenantId }
});
if (!job) {
return reply.code(404).send({ message: "Job not found" });
}
const candidate = await prisma.cleanupJobCandidate.findFirst({
where: { id: params.candidateId, jobId: job.id }
});
if (!candidate) {
return reply.code(404).send({ message: "Candidate not found" });
}
const account = await prisma.mailboxAccount.findUnique({
where: { id: candidate.mailboxAccountId }
});
if (!account) {
return reply.code(404).send({ message: "Mailbox account not found" });
}
const resolveAttachment = async () => {
if (account.provider === "GMAIL" && candidate.externalId.startsWith("gmail:")) {
const messageId = candidate.externalId.replace("gmail:", "");
const { gmail } = await gmailClientForAccount(account);
const raw = await gmail.users.messages.get({ userId: "me", id: messageId, format: "raw" });
const rawValue = raw.data.raw ?? "";
const normalized = rawValue.replace(/-/g, "+").replace(/_/g, "/");
const padding = normalized.length % 4 ? "=".repeat(4 - (normalized.length % 4)) : "";
const buffer = Buffer.from(normalized + padding, "base64");
const parsed = await simpleParser(buffer);
return parsed.attachments ?? [];
}
if (candidate.externalId.startsWith("imap:")) {
const rest = candidate.externalId.replace("imap:", "");
const lastIndex = rest.lastIndexOf(":");
const mailbox = lastIndex === -1 ? "INBOX" : rest.slice(0, lastIndex);
const uid = lastIndex === -1 ? Number.parseInt(rest, 10) : Number.parseInt(rest.slice(lastIndex + 1), 10);
if (!Number.isFinite(uid)) {
return null;
}
const client = createImapClient(account);
await client.connect();
try {
await client.mailboxOpen(mailbox, { readOnly: true });
let source: Buffer | null = null;
for await (const msg of client.fetch([uid], { source: true })) {
source = msg.source ?? null;
}
if (!source) {
return null;
}
const parsed = await simpleParser(source);
return parsed.attachments ?? [];
} finally {
await client.logout().catch(() => undefined);
}
}
return null;
};
const attachments = await resolveAttachment();
if (!attachments) {
return reply.code(400).send({ message: "Unsupported provider" });
}
const attachment = attachments[attachmentIndex];
if (!attachment || !attachment.content) {
return reply.code(404).send({ message: "Attachment not found" });
}
const size = attachment.size ?? attachment.content.length ?? 0;
if (size > config.ATTACHMENT_MAX_BYTES) {
return reply.code(413).send({ message: "Attachment too large" });
}
const safeName = (attachment.filename ?? `attachment-${attachmentIndex}`)
.replace(/[^a-zA-Z0-9._-]/g, "_")
.slice(0, 180);
reply.header("Content-Type", "application/octet-stream");
reply.header("Content-Disposition", `attachment; filename=\"${safeName}\"`);
reply.header("Content-Length", size);
return reply.send(attachment.content);
});
app.get("/:id/stream-token", async (request, reply) => {
const params = request.params as { id: string };
const job = await prisma.cleanupJob.findFirst({

View File

@@ -5,12 +5,14 @@ import { prisma } from "../db.js";
const ruleSchema = z.object({
name: z.string().min(2),
enabled: z.boolean().optional(),
matchMode: z.enum(["ALL", "ANY"]).optional(),
stopOnMatch: z.boolean().optional(),
conditions: z.array(z.object({
type: z.enum(["HEADER", "SUBJECT", "FROM", "LIST_UNSUBSCRIBE", "LIST_ID"]),
type: z.enum(["HEADER", "HEADER_MISSING", "SUBJECT", "FROM", "LIST_UNSUBSCRIBE", "LIST_ID", "UNSUBSCRIBE_STATUS", "SCORE"]),
value: z.string().min(1)
})),
actions: z.array(z.object({
type: z.enum(["MOVE", "DELETE", "ARCHIVE", "LABEL"]),
type: z.enum(["MOVE", "DELETE", "ARCHIVE", "LABEL", "MARK_READ", "MARK_UNREAD"]),
target: z.string().optional()
}))
});
@@ -21,18 +23,28 @@ export async function rulesRoutes(app: FastifyInstance) {
app.get("/", async (request) => {
const rules = await prisma.rule.findMany({
where: { tenantId: request.user.tenantId },
include: { conditions: true, actions: true }
include: { conditions: true, actions: true },
orderBy: [{ position: "asc" }, { createdAt: "asc" }]
});
return { rules };
});
app.post("/", async (request, reply) => {
const input = ruleSchema.parse(request.body);
const lastRule = await prisma.rule.findFirst({
where: { tenantId: request.user.tenantId },
orderBy: { position: "desc" },
select: { position: true }
});
const nextPosition = (lastRule?.position ?? -1) + 1;
const rule = await prisma.rule.create({
data: {
tenantId: request.user.tenantId,
name: input.name,
enabled: input.enabled ?? true,
matchMode: input.matchMode ?? "ALL",
position: nextPosition,
stopOnMatch: input.stopOnMatch ?? false,
conditions: {
create: input.conditions
},
@@ -45,6 +57,37 @@ export async function rulesRoutes(app: FastifyInstance) {
return reply.code(201).send({ rule });
});
app.put("/reorder", async (request) => {
const input = z.object({
orderedIds: z.array(z.string()).min(1)
}).parse(request.body);
const uniqueIds = Array.from(new Set(input.orderedIds));
if (uniqueIds.length !== input.orderedIds.length) {
return { success: false, message: "Duplicate ids provided" };
}
const existing = await prisma.rule.findMany({
where: { tenantId: request.user.tenantId, id: { in: input.orderedIds } },
select: { id: true }
});
if (existing.length !== input.orderedIds.length) {
return { success: false, message: "One or more rules not found" };
}
await prisma.$transaction(
input.orderedIds.map((id, index) =>
prisma.rule.update({
where: { id },
data: { position: index }
})
)
);
return { success: true };
});
app.put("/:id", async (request, reply) => {
const params = request.params as { id: string };
const input = ruleSchema.parse(request.body);
@@ -64,6 +107,8 @@ export async function rulesRoutes(app: FastifyInstance) {
data: {
name: input.name,
enabled: input.enabled ?? true,
matchMode: input.matchMode ?? "ALL",
stopOnMatch: input.stopOnMatch ?? false,
conditions: { create: input.conditions },
actions: { create: input.actions }
},

View File

@@ -8,6 +8,10 @@ import { runExportJob, startExportCleanupLoop } from "./admin/exportWorker.js";
const connection = new IORedis(config.REDIS_URL, { maxRetriesPerRequest: null });
const clampAlpha = (value: number) => Math.min(0.95, Math.max(0.05, value));
const ema = (prev: number | null | undefined, next: number, alpha: number) =>
prev === null || prev === undefined ? next : prev + alpha * (next - prev);
const worker = new Worker(
"cleanup",
async (job) => {
@@ -35,6 +39,138 @@ const worker = new Worker(
data: { status: "SUCCEEDED", finishedAt: new Date() }
});
await logJobEvent(cleanupJobId, "info", "Cleanup finished", 100);
if (latest?.tenantId && latest.startedAt && latest.finishedAt && latest.processedMessages) {
const durationSeconds = Math.max(1, (latest.finishedAt.getTime() - latest.startedAt.getTime()) / 1000);
const rate = latest.processedMessages / durationSeconds;
if (Number.isFinite(rate) && rate > 0) {
const metric = await prisma.tenantMetric.findUnique({ where: { tenantId: latest.tenantId } });
const alpha = clampAlpha(config.METRICS_EMA_ALPHA);
if (!metric) {
await prisma.tenantMetric.create({
data: { tenantId: latest.tenantId, avgProcessingRate: rate, sampleCount: 1 }
});
} else {
const newCount = metric.sampleCount + 1;
const blended = ema(metric.avgProcessingRate, rate, alpha);
await prisma.tenantMetric.update({
where: { tenantId: latest.tenantId },
data: { avgProcessingRate: blended, sampleCount: newCount }
});
}
}
}
if (latest?.tenantId) {
const account = await prisma.mailboxAccount.findUnique({ where: { id: latest.mailboxAccountId } });
const provider = account?.provider ?? null;
if (provider) {
const listingRate = latest.listingSeconds && latest.totalMessages
? latest.totalMessages / latest.listingSeconds
: null;
const processingRate = latest.processingSeconds && latest.processedMessages
? latest.processedMessages / latest.processingSeconds
: null;
const allowSideEffects = !latest.dryRun;
const unsubscribeRate = allowSideEffects && latest.unsubscribeSeconds && latest.unsubscribeAttempts
? latest.unsubscribeAttempts / latest.unsubscribeSeconds
: null;
const routingRate = allowSideEffects && latest.routingSeconds && latest.actionAttempts
? latest.actionAttempts / latest.routingSeconds
: null;
const listingSecPerMsg = latest.listingSeconds && latest.totalMessages
? latest.listingSeconds / latest.totalMessages
: null;
const processingSecPerMsg = latest.processingSeconds && latest.processedMessages
? latest.processingSeconds / latest.processedMessages
: null;
const unsubscribeSecPerMsg = allowSideEffects && latest.unsubscribeSeconds && latest.processedMessages
? latest.unsubscribeSeconds / latest.processedMessages
: null;
const routingSecPerMsg = allowSideEffects && latest.routingSeconds && latest.processedMessages
? latest.routingSeconds / latest.processedMessages
: null;
const existing = await prisma.tenantProviderMetric.findUnique({
where: { tenantId_provider: { tenantId: latest.tenantId, provider } }
});
const alpha = clampAlpha(config.METRICS_EMA_ALPHA);
if (!existing) {
await prisma.tenantProviderMetric.create({
data: {
tenantId: latest.tenantId,
provider,
avgListingRate: listingRate ?? undefined,
avgProcessingRate: processingRate ?? undefined,
avgUnsubscribeRate: unsubscribeRate ?? undefined,
avgRoutingRate: routingRate ?? undefined,
avgListingSecondsPerMessage: listingSecPerMsg ?? undefined,
avgProcessingSecondsPerMessage: processingSecPerMsg ?? undefined,
avgUnsubscribeSecondsPerMessage: unsubscribeSecPerMsg ?? undefined,
avgRoutingSecondsPerMessage: routingSecPerMsg ?? undefined,
listingSampleCount: listingRate ? 1 : 0,
processingSampleCount: processingRate ? 1 : 0,
unsubscribeSampleCount: unsubscribeRate ? 1 : 0,
routingSampleCount: routingRate ? 1 : 0
}
});
} else {
const updates: Record<string, number> = {};
const counts: Record<string, number> = {};
const hasListingSample = Boolean((listingRate && listingRate > 0) || (listingSecPerMsg && listingSecPerMsg > 0));
if (hasListingSample) {
counts.listingSampleCount = existing.listingSampleCount + 1;
if (listingRate && listingRate > 0) {
updates.avgListingRate = ema(existing.avgListingRate, listingRate, alpha);
}
if (listingSecPerMsg && listingSecPerMsg > 0) {
updates.avgListingSecondsPerMessage = ema(existing.avgListingSecondsPerMessage, listingSecPerMsg, alpha);
}
}
const hasProcessingSample = Boolean((processingRate && processingRate > 0) || (processingSecPerMsg && processingSecPerMsg > 0));
if (hasProcessingSample) {
counts.processingSampleCount = existing.processingSampleCount + 1;
if (processingRate && processingRate > 0) {
updates.avgProcessingRate = ema(existing.avgProcessingRate, processingRate, alpha);
}
if (processingSecPerMsg && processingSecPerMsg > 0) {
updates.avgProcessingSecondsPerMessage = ema(existing.avgProcessingSecondsPerMessage, processingSecPerMsg, alpha);
}
}
const hasUnsubscribeSample = Boolean((unsubscribeRate && unsubscribeRate > 0) || (unsubscribeSecPerMsg && unsubscribeSecPerMsg > 0));
if (hasUnsubscribeSample) {
counts.unsubscribeSampleCount = existing.unsubscribeSampleCount + 1;
if (unsubscribeRate && unsubscribeRate > 0) {
updates.avgUnsubscribeRate = ema(existing.avgUnsubscribeRate, unsubscribeRate, alpha);
}
if (unsubscribeSecPerMsg && unsubscribeSecPerMsg > 0) {
updates.avgUnsubscribeSecondsPerMessage = ema(existing.avgUnsubscribeSecondsPerMessage, unsubscribeSecPerMsg, alpha);
}
}
const hasRoutingSample = Boolean((routingRate && routingRate > 0) || (routingSecPerMsg && routingSecPerMsg > 0));
if (hasRoutingSample) {
counts.routingSampleCount = existing.routingSampleCount + 1;
if (routingRate && routingRate > 0) {
updates.avgRoutingRate = ema(existing.avgRoutingRate, routingRate, alpha);
}
if (routingSecPerMsg && routingSecPerMsg > 0) {
updates.avgRoutingSecondsPerMessage = ema(existing.avgRoutingSecondsPerMessage, routingSecPerMsg, alpha);
}
}
if (Object.keys(updates).length > 0) {
await prisma.tenantProviderMetric.update({
where: { tenantId_provider: { tenantId: latest.tenantId, provider } },
data: { ...updates, ...counts }
});
}
}
}
}
} catch {
return { ok: false, skipped: true };
}