pulse/orchestrator/orchestrator.ts
Edo Limburg 78a2b27f6d feat: implement orchestrator module with feed source management
Add FeedOrchestrator that coordinates fetch→parse→dedup→store pipeline:
- FeedSource type for managing RSS/Atom feed configurations
- Feed source CRUD operations in IStorage interface
- Database schema migration for feed_sources table
- Exponential backoff retry with configurable delays
- Per-feed poll intervals with health tracking
- Concurrency-limited parallel feed processing
- ProcessResult and FeedHealth interfaces for status monitoring

Files added:
- orchestrator/orchestrator.ts - main orchestrator class
- orchestrator/scheduler.ts - backoff calculation utilities
- orchestrator/index.ts - module exports
- orchestrator/orchestrator.test.ts - comprehensive test suite

Files modified:
- interfaces/feed.types.ts - add FeedSource type
- interfaces/storage.interface.ts - extend with feed source methods
- infrastructure/db/database.ts - add FeedSourceTable interface
- infrastructure/db/schema.ts - add feed_sources table migration
- modules/storage/storage.ts - implement feed source CRUD
- modules/storage/storage.test.ts - add feed source tests
2026-05-05 22:17:16 +02:00

305 lines
8.6 KiB
TypeScript

/**
* Orchestrator module.
* Coordinates fetching, parsing, deduplication, and storage of feed items.
* Runs on a schedule with exponential backoff for failed feeds.
*/
import type { IStorage } from '../interfaces/storage.interface.js';
import type { IFetcher } from '../interfaces/fetcher.interface.js';
import type { IParser } from '../interfaces/parser.interface.js';
import type { IDedup } from '../interfaces/dedup.interface.js';
import type { FeedSource, FetchError, FeedItem } from '../interfaces/feed.types.js';
import { calculateNextDelay, createConcurrencyLimit } from './scheduler.js';
export interface OrchestratorOptions {
storage: IStorage;
fetcher: IFetcher;
parser: IParser;
dedup: IDedup;
maxRetries?: number; // For health status tracking only (default: 3)
baseRetryDelayMs?: number; // Default: 5000 (5s)
maxRetryDelayMs?: number; // Default: 300000 (5min)
concurrency?: number; // Default: 5 (parallel feed processing)
}
export interface ProcessResult {
sourceId: string;
success: boolean;
itemsFound: number;
itemsNew: number;
error?: FetchError;
nextPollDelayMs: number;
}
export interface FeedHealth {
sourceId: string;
url: string;
name: string | null;
isActive: boolean;
lastFetchedAt: Date | null;
lastSuccessAt: Date | null;
consecutiveFailures: number;
isHealthy: boolean; // true if consecutiveFailures < maxRetries
}
export class FeedOrchestrator {
private readonly storage: IStorage;
private readonly fetcher: IFetcher;
private readonly parser: IParser;
private readonly dedup: IDedup;
private readonly maxRetries: number;
private readonly baseRetryDelayMs: number;
private readonly maxRetryDelayMs: number;
private readonly concurrency: number;
private readonly limit: <T>(fn: () => Promise<T>) => Promise<T>;
private isRunning: boolean = false;
private timers: Map<string, NodeJS.Timeout> = new Map();
constructor(options: OrchestratorOptions) {
this.storage = options.storage;
this.fetcher = options.fetcher;
this.parser = options.parser;
this.dedup = options.dedup;
this.maxRetries = options.maxRetries ?? 3;
this.baseRetryDelayMs = options.baseRetryDelayMs ?? 5000;
this.maxRetryDelayMs = options.maxRetryDelayMs ?? 300000;
this.concurrency = options.concurrency ?? 5;
this.limit = createConcurrencyLimit(this.concurrency);
}
/**
* Start the orchestrator.
* Loads all active feed sources and immediately polls them.
* Schedules recurring polls based on each feed's pollIntervalMs.
*/
async start(): Promise<void> {
if (this.isRunning) {
throw new Error('Orchestrator is already running');
}
this.isRunning = true;
// Load all active feed sources
const sources = await this.storage.getFeedSources(true);
// Immediately poll all feeds (with concurrency limit)
await Promise.all(
sources.map((source) =>
this.limit(() => this.schedulePoll(source))
)
);
}
/**
* Stop the orchestrator.
* Clears all scheduled timers.
*/
async stop(): Promise<void> {
this.isRunning = false;
// Clear all timers
for (const [id, timer] of this.timers) {
clearTimeout(timer);
this.timers.delete(id);
}
}
/**
* Process a single feed immediately.
* This still runs through the deduplication pipeline.
*
* @param sourceId The ID of the feed source to process
* @returns ProcessResult with details of the fetch
*/
async processFeed(sourceId: string): Promise<ProcessResult> {
const source = await this.storage.getFeedSourceById(sourceId);
if (!source) {
throw new Error(`Feed source not found: ${sourceId}`);
}
return this.processFeedInternal(source);
}
/**
* Process all feeds once (useful for cron/one-shot mode).
* Does not start recurring polling.
*
* @returns Map of sourceId to ProcessResult
*/
async processAllFeeds(): Promise<Map<string, ProcessResult>> {
const sources = await this.storage.getFeedSources(true);
const results = new Map<string, ProcessResult>();
// Process all feeds with concurrency limit
const processPromises = sources.map(async (source) => {
const result = await this.limit(() => this.processFeedInternal(source));
results.set(source.id, result);
});
await Promise.all(processPromises);
return results;
}
/**
* Get health status of all feeds.
*
* @returns Array of FeedHealth for all feed sources
*/
async getFeedHealth(): Promise<FeedHealth[]> {
const sources = await this.storage.getFeedSources();
return sources.map((source) => ({
sourceId: source.id,
url: source.url,
name: source.name,
isActive: source.isActive,
lastFetchedAt: source.lastFetchedAt,
lastSuccessAt: source.lastSuccessAt,
consecutiveFailures: source.consecutiveFailures,
isHealthy: source.consecutiveFailures < this.maxRetries,
}));
}
/**
* Internal method to schedule the next poll for a feed.
*/
private async schedulePoll(source: FeedSource): Promise<void> {
if (!this.isRunning) {
return;
}
// Clear any existing timer for this source
const existingTimer = this.timers.get(source.id);
if (existingTimer) {
clearTimeout(existingTimer);
}
// Process the feed
const result = await this.processFeedInternal(source);
// Schedule next poll based on result
const timer = setTimeout(() => {
this.timers.delete(source.id);
this.schedulePoll(source);
}, result.nextPollDelayMs);
this.timers.set(source.id, timer);
}
/**
* Internal method to process a single feed.
* Updates the feed status in storage.
*/
private async processFeedInternal(source: FeedSource): Promise<ProcessResult> {
const now = new Date();
// Update last fetched time
await this.storage.updateFeedSourceStatus(source.id, {
lastFetchedAt: now,
});
// Fetch the feed
const fetchInput = {
url: source.url,
expectedFormat: source.format,
};
const fetchResult = await this.fetcher.fetch(fetchInput);
// Handle fetch errors
if (fetchResult.errors.length > 0) {
const error = fetchResult.errors[0]!;
const newConsecutiveFailures = source.consecutiveFailures + 1;
// Update status with failure
await this.storage.updateFeedSourceStatus(source.id, {
consecutiveFailures: newConsecutiveFailures,
});
// Calculate next delay with backoff
const nextDelay = calculateNextDelay(
newConsecutiveFailures,
source.pollIntervalMs, // Use the configured poll interval as base
this.maxRetryDelayMs
);
return {
sourceId: source.id,
success: false,
itemsFound: 0,
itemsNew: 0,
error,
nextPollDelayMs: Math.max(nextDelay, this.baseRetryDelayMs),
};
}
// Parse the feed
let items: FeedItem[] = [];
try {
for (const response of fetchResult.responses) {
const parsedItems = await this.parser.parse(response.body, response.source);
items.push(...parsedItems);
}
} catch (parseError) {
const error: FetchError = {
source: source.url,
reason: parseError instanceof Error ? parseError.message : 'Parse error',
code: 'PARSE',
};
const newConsecutiveFailures = source.consecutiveFailures + 1;
await this.storage.updateFeedSourceStatus(source.id, {
consecutiveFailures: newConsecutiveFailures,
});
const nextDelay = calculateNextDelay(
newConsecutiveFailures,
source.pollIntervalMs,
this.maxRetryDelayMs
);
return {
sourceId: source.id,
success: false,
itemsFound: 0,
itemsNew: 0,
error,
nextPollDelayMs: Math.max(nextDelay, this.baseRetryDelayMs),
};
}
const itemsFound = items.length;
// Filter duplicates
const newItems = await this.dedup.filter(items);
// Store new items
if (newItems.length > 0) {
await this.storage.save(newItems);
await this.dedup.markSeen(newItems);
}
// Update status with success
await this.storage.updateFeedSourceStatus(source.id, {
lastSuccessAt: now,
consecutiveFailures: 0,
});
// Calculate next delay (base interval, no backoff since successful)
const nextDelay = calculateNextDelay(
0,
source.pollIntervalMs,
this.maxRetryDelayMs
);
return {
sourceId: source.id,
success: true,
itemsFound,
itemsNew: newItems.length,
nextPollDelayMs: nextDelay,
};
}
}