Add FeedOrchestrator that coordinates fetch→parse→dedup→store pipeline: - FeedSource type for managing RSS/Atom feed configurations - Feed source CRUD operations in IStorage interface - Database schema migration for feed_sources table - Exponential backoff retry with configurable delays - Per-feed poll intervals with health tracking - Concurrency-limited parallel feed processing - ProcessResult and FeedHealth interfaces for status monitoring Files added: - orchestrator/orchestrator.ts - main orchestrator class - orchestrator/scheduler.ts - backoff calculation utilities - orchestrator/index.ts - module exports - orchestrator/orchestrator.test.ts - comprehensive test suite Files modified: - interfaces/feed.types.ts - add FeedSource type - interfaces/storage.interface.ts - extend with feed source methods - infrastructure/db/database.ts - add FeedSourceTable interface - infrastructure/db/schema.ts - add feed_sources table migration - modules/storage/storage.ts - implement feed source CRUD - modules/storage/storage.test.ts - add feed source tests
305 lines
8.6 KiB
TypeScript
305 lines
8.6 KiB
TypeScript
/**
|
|
* Orchestrator module.
|
|
* Coordinates fetching, parsing, deduplication, and storage of feed items.
|
|
* Runs on a schedule with exponential backoff for failed feeds.
|
|
*/
|
|
|
|
import type { IStorage } from '../interfaces/storage.interface.js';
|
|
import type { IFetcher } from '../interfaces/fetcher.interface.js';
|
|
import type { IParser } from '../interfaces/parser.interface.js';
|
|
import type { IDedup } from '../interfaces/dedup.interface.js';
|
|
import type { FeedSource, FetchError, FeedItem } from '../interfaces/feed.types.js';
|
|
import { calculateNextDelay, createConcurrencyLimit } from './scheduler.js';
|
|
|
|
export interface OrchestratorOptions {
|
|
storage: IStorage;
|
|
fetcher: IFetcher;
|
|
parser: IParser;
|
|
dedup: IDedup;
|
|
maxRetries?: number; // For health status tracking only (default: 3)
|
|
baseRetryDelayMs?: number; // Default: 5000 (5s)
|
|
maxRetryDelayMs?: number; // Default: 300000 (5min)
|
|
concurrency?: number; // Default: 5 (parallel feed processing)
|
|
}
|
|
|
|
export interface ProcessResult {
|
|
sourceId: string;
|
|
success: boolean;
|
|
itemsFound: number;
|
|
itemsNew: number;
|
|
error?: FetchError;
|
|
nextPollDelayMs: number;
|
|
}
|
|
|
|
export interface FeedHealth {
|
|
sourceId: string;
|
|
url: string;
|
|
name: string | null;
|
|
isActive: boolean;
|
|
lastFetchedAt: Date | null;
|
|
lastSuccessAt: Date | null;
|
|
consecutiveFailures: number;
|
|
isHealthy: boolean; // true if consecutiveFailures < maxRetries
|
|
}
|
|
|
|
export class FeedOrchestrator {
|
|
private readonly storage: IStorage;
|
|
private readonly fetcher: IFetcher;
|
|
private readonly parser: IParser;
|
|
private readonly dedup: IDedup;
|
|
private readonly maxRetries: number;
|
|
private readonly baseRetryDelayMs: number;
|
|
private readonly maxRetryDelayMs: number;
|
|
private readonly concurrency: number;
|
|
private readonly limit: <T>(fn: () => Promise<T>) => Promise<T>;
|
|
|
|
private isRunning: boolean = false;
|
|
private timers: Map<string, NodeJS.Timeout> = new Map();
|
|
|
|
constructor(options: OrchestratorOptions) {
|
|
this.storage = options.storage;
|
|
this.fetcher = options.fetcher;
|
|
this.parser = options.parser;
|
|
this.dedup = options.dedup;
|
|
this.maxRetries = options.maxRetries ?? 3;
|
|
this.baseRetryDelayMs = options.baseRetryDelayMs ?? 5000;
|
|
this.maxRetryDelayMs = options.maxRetryDelayMs ?? 300000;
|
|
this.concurrency = options.concurrency ?? 5;
|
|
this.limit = createConcurrencyLimit(this.concurrency);
|
|
}
|
|
|
|
/**
|
|
* Start the orchestrator.
|
|
* Loads all active feed sources and immediately polls them.
|
|
* Schedules recurring polls based on each feed's pollIntervalMs.
|
|
*/
|
|
async start(): Promise<void> {
|
|
if (this.isRunning) {
|
|
throw new Error('Orchestrator is already running');
|
|
}
|
|
|
|
this.isRunning = true;
|
|
|
|
// Load all active feed sources
|
|
const sources = await this.storage.getFeedSources(true);
|
|
|
|
// Immediately poll all feeds (with concurrency limit)
|
|
await Promise.all(
|
|
sources.map((source) =>
|
|
this.limit(() => this.schedulePoll(source))
|
|
)
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Stop the orchestrator.
|
|
* Clears all scheduled timers.
|
|
*/
|
|
async stop(): Promise<void> {
|
|
this.isRunning = false;
|
|
|
|
// Clear all timers
|
|
for (const [id, timer] of this.timers) {
|
|
clearTimeout(timer);
|
|
this.timers.delete(id);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Process a single feed immediately.
|
|
* This still runs through the deduplication pipeline.
|
|
*
|
|
* @param sourceId The ID of the feed source to process
|
|
* @returns ProcessResult with details of the fetch
|
|
*/
|
|
async processFeed(sourceId: string): Promise<ProcessResult> {
|
|
const source = await this.storage.getFeedSourceById(sourceId);
|
|
if (!source) {
|
|
throw new Error(`Feed source not found: ${sourceId}`);
|
|
}
|
|
|
|
return this.processFeedInternal(source);
|
|
}
|
|
|
|
/**
|
|
* Process all feeds once (useful for cron/one-shot mode).
|
|
* Does not start recurring polling.
|
|
*
|
|
* @returns Map of sourceId to ProcessResult
|
|
*/
|
|
async processAllFeeds(): Promise<Map<string, ProcessResult>> {
|
|
const sources = await this.storage.getFeedSources(true);
|
|
const results = new Map<string, ProcessResult>();
|
|
|
|
// Process all feeds with concurrency limit
|
|
const processPromises = sources.map(async (source) => {
|
|
const result = await this.limit(() => this.processFeedInternal(source));
|
|
results.set(source.id, result);
|
|
});
|
|
|
|
await Promise.all(processPromises);
|
|
return results;
|
|
}
|
|
|
|
/**
|
|
* Get health status of all feeds.
|
|
*
|
|
* @returns Array of FeedHealth for all feed sources
|
|
*/
|
|
async getFeedHealth(): Promise<FeedHealth[]> {
|
|
const sources = await this.storage.getFeedSources();
|
|
|
|
return sources.map((source) => ({
|
|
sourceId: source.id,
|
|
url: source.url,
|
|
name: source.name,
|
|
isActive: source.isActive,
|
|
lastFetchedAt: source.lastFetchedAt,
|
|
lastSuccessAt: source.lastSuccessAt,
|
|
consecutiveFailures: source.consecutiveFailures,
|
|
isHealthy: source.consecutiveFailures < this.maxRetries,
|
|
}));
|
|
}
|
|
|
|
/**
|
|
* Internal method to schedule the next poll for a feed.
|
|
*/
|
|
private async schedulePoll(source: FeedSource): Promise<void> {
|
|
if (!this.isRunning) {
|
|
return;
|
|
}
|
|
|
|
// Clear any existing timer for this source
|
|
const existingTimer = this.timers.get(source.id);
|
|
if (existingTimer) {
|
|
clearTimeout(existingTimer);
|
|
}
|
|
|
|
// Process the feed
|
|
const result = await this.processFeedInternal(source);
|
|
|
|
// Schedule next poll based on result
|
|
const timer = setTimeout(() => {
|
|
this.timers.delete(source.id);
|
|
this.schedulePoll(source);
|
|
}, result.nextPollDelayMs);
|
|
|
|
this.timers.set(source.id, timer);
|
|
}
|
|
|
|
/**
|
|
* Internal method to process a single feed.
|
|
* Updates the feed status in storage.
|
|
*/
|
|
private async processFeedInternal(source: FeedSource): Promise<ProcessResult> {
|
|
const now = new Date();
|
|
|
|
// Update last fetched time
|
|
await this.storage.updateFeedSourceStatus(source.id, {
|
|
lastFetchedAt: now,
|
|
});
|
|
|
|
// Fetch the feed
|
|
const fetchInput = {
|
|
url: source.url,
|
|
expectedFormat: source.format,
|
|
};
|
|
|
|
const fetchResult = await this.fetcher.fetch(fetchInput);
|
|
|
|
// Handle fetch errors
|
|
if (fetchResult.errors.length > 0) {
|
|
const error = fetchResult.errors[0]!;
|
|
const newConsecutiveFailures = source.consecutiveFailures + 1;
|
|
|
|
// Update status with failure
|
|
await this.storage.updateFeedSourceStatus(source.id, {
|
|
consecutiveFailures: newConsecutiveFailures,
|
|
});
|
|
|
|
// Calculate next delay with backoff
|
|
const nextDelay = calculateNextDelay(
|
|
newConsecutiveFailures,
|
|
source.pollIntervalMs, // Use the configured poll interval as base
|
|
this.maxRetryDelayMs
|
|
);
|
|
|
|
return {
|
|
sourceId: source.id,
|
|
success: false,
|
|
itemsFound: 0,
|
|
itemsNew: 0,
|
|
error,
|
|
nextPollDelayMs: Math.max(nextDelay, this.baseRetryDelayMs),
|
|
};
|
|
}
|
|
|
|
// Parse the feed
|
|
let items: FeedItem[] = [];
|
|
try {
|
|
for (const response of fetchResult.responses) {
|
|
const parsedItems = await this.parser.parse(response.body, response.source);
|
|
items.push(...parsedItems);
|
|
}
|
|
} catch (parseError) {
|
|
const error: FetchError = {
|
|
source: source.url,
|
|
reason: parseError instanceof Error ? parseError.message : 'Parse error',
|
|
code: 'PARSE',
|
|
};
|
|
|
|
const newConsecutiveFailures = source.consecutiveFailures + 1;
|
|
await this.storage.updateFeedSourceStatus(source.id, {
|
|
consecutiveFailures: newConsecutiveFailures,
|
|
});
|
|
|
|
const nextDelay = calculateNextDelay(
|
|
newConsecutiveFailures,
|
|
source.pollIntervalMs,
|
|
this.maxRetryDelayMs
|
|
);
|
|
|
|
return {
|
|
sourceId: source.id,
|
|
success: false,
|
|
itemsFound: 0,
|
|
itemsNew: 0,
|
|
error,
|
|
nextPollDelayMs: Math.max(nextDelay, this.baseRetryDelayMs),
|
|
};
|
|
}
|
|
|
|
const itemsFound = items.length;
|
|
|
|
// Filter duplicates
|
|
const newItems = await this.dedup.filter(items);
|
|
|
|
// Store new items
|
|
if (newItems.length > 0) {
|
|
await this.storage.save(newItems);
|
|
await this.dedup.markSeen(newItems);
|
|
}
|
|
|
|
// Update status with success
|
|
await this.storage.updateFeedSourceStatus(source.id, {
|
|
lastSuccessAt: now,
|
|
consecutiveFailures: 0,
|
|
});
|
|
|
|
// Calculate next delay (base interval, no backoff since successful)
|
|
const nextDelay = calculateNextDelay(
|
|
0,
|
|
source.pollIntervalMs,
|
|
this.maxRetryDelayMs
|
|
);
|
|
|
|
return {
|
|
sourceId: source.id,
|
|
success: true,
|
|
itemsFound,
|
|
itemsNew: newItems.length,
|
|
nextPollDelayMs: nextDelay,
|
|
};
|
|
}
|
|
}
|