diff --git a/infrastructure/db/database.ts b/infrastructure/db/database.ts index 221f7c6..00ad20e 100644 --- a/infrastructure/db/database.ts +++ b/infrastructure/db/database.ts @@ -19,8 +19,23 @@ export interface SeenIdTable { seen_at: string; // ISO 8601 format } +export interface FeedSourceTable { + id: string; + url: string; + name: string | null; + format: 'rss' | 'atom'; + poll_interval_ms: number; + is_active: number; // SQLite stores boolean as 0/1 + last_fetched_at: string | null; // ISO 8601 + last_success_at: string | null; // ISO 8601 + consecutive_failures: number; + created_at: string; // ISO 8601 + updated_at: string; // ISO 8601 +} + // Database interface used by Kysely export interface Database { feed_items: FeedItemTable; seen_ids: SeenIdTable; + feed_sources: FeedSourceTable; } diff --git a/infrastructure/db/schema.ts b/infrastructure/db/schema.ts index e964d77..58b4be6 100644 --- a/infrastructure/db/schema.ts +++ b/infrastructure/db/schema.ts @@ -43,10 +43,37 @@ export async function migrate(db: Kysely): Promise { .addColumn('id', 'varchar(64)', (col) => col.primaryKey()) .addColumn('seen_at', 'varchar(32)', (col) => col.notNull().defaultTo('CURRENT_TIMESTAMP')) .execute(); + + // Create feed_sources table + await db.schema + .createTable('feed_sources') + .ifNotExists() + .addColumn('id', 'varchar(64)', (col) => col.primaryKey()) + .addColumn('url', 'varchar(2048)', (col) => col.notNull().unique()) + .addColumn('name', 'varchar(256)') + .addColumn('format', 'varchar(10)', (col) => col.notNull()) + .addColumn('poll_interval_ms', 'integer', (col) => col.notNull()) + .addColumn('is_active', 'boolean', (col) => col.notNull().defaultTo(true)) + .addColumn('last_fetched_at', 'varchar(32)') + .addColumn('last_success_at', 'varchar(32)') + .addColumn('consecutive_failures', 'integer', (col) => col.notNull().defaultTo(0)) + .addColumn('created_at', 'varchar(32)', (col) => col.notNull().defaultTo('CURRENT_TIMESTAMP')) + .addColumn('updated_at', 'varchar(32)', (col) => col.notNull().defaultTo('CURRENT_TIMESTAMP')) + .execute(); + + // Create index on is_active for quick filtering + await db.schema + .createIndex('idx_feed_sources_active') + .ifNotExists() + .on('feed_sources') + .column('is_active') + .execute(); } export async function reset(db: Kysely): Promise { // Drop tables (for testing) + await db.schema.dropIndex('idx_feed_sources_active').ifExists().execute(); + await db.schema.dropTable('feed_sources').ifExists().execute(); await db.schema.dropTable('seen_ids').ifExists().execute(); await db.schema.dropIndex('idx_feed_items_published').ifExists().execute(); await db.schema.dropIndex('idx_feed_items_source').ifExists().execute(); diff --git a/interfaces/feed.types.ts b/interfaces/feed.types.ts index e732f56..dd7638d 100644 --- a/interfaces/feed.types.ts +++ b/interfaces/feed.types.ts @@ -31,3 +31,17 @@ export interface FetchResult { errors: FetchError[]; fetchedAt: Date; } + +export interface FeedSource { + id: string; // Hash of URL + url: string; // Feed URL + name: string | null; // Display name + format: 'rss' | 'atom'; // Expected format + pollIntervalMs: number; // How often to check + isActive: boolean; // Whether to poll + lastFetchedAt: Date | null; // Last attempt timestamp + lastSuccessAt: Date | null; // Last successful fetch + consecutiveFailures: number; // Error streak counter + createdAt: Date; + updatedAt: Date; +} diff --git a/interfaces/storage.interface.ts b/interfaces/storage.interface.ts index 2c895f3..3996613 100644 --- a/interfaces/storage.interface.ts +++ b/interfaces/storage.interface.ts @@ -1,4 +1,4 @@ -import type { FeedItem } from './feed.types.js'; +import type { FeedItem, FeedSource } from './feed.types.js'; export interface StorageError { code: 'DB_ERROR' | 'CONSTRAINT_ERROR' | 'UNKNOWN'; @@ -25,4 +25,37 @@ export interface IStorage { * Search items by title/content keywords. */ search(query: string): Promise; + + /** + * Get all feed sources, optionally filtering by active status. + */ + getFeedSources(activeOnly?: boolean): Promise; + + /** + * Get a single feed source by ID. + */ + getFeedSourceById(id: string): Promise; + + /** + * Save or update a feed source (upsert). + */ + saveFeedSource(source: FeedSource): Promise; + + /** + * Update feed source status fields (last fetched, success, failures, active). + */ + updateFeedSourceStatus( + id: string, + updates: { + lastFetchedAt?: Date; + lastSuccessAt?: Date; + consecutiveFailures?: number; + isActive?: boolean; + } + ): Promise; + + /** + * Delete a feed source by ID. + */ + deleteFeedSource(id: string): Promise; } diff --git a/modules/formatter/formatter.test.ts b/modules/formatter/formatter.test.ts new file mode 100644 index 0000000..c5c612b --- /dev/null +++ b/modules/formatter/formatter.test.ts @@ -0,0 +1,207 @@ +import { describe, it, expect } from 'vitest'; +import { Formatter } from './formatter.js'; +import { TerminalFormatter } from './terminal.formatter.js'; +import { JsonFormatter } from './json.formatter.js'; +import type { FeedItem } from '../../interfaces/feed.types.js'; + +describe('Formatter', () => { + const formatter = new Formatter(); + + const createMockItem = (id: string, overrides: Partial = {}): FeedItem => ({ + id, + source: 'https://example.com/feed', + title: 'Test Article', + url: 'https://example.com/article/1', + publishedAt: new Date('2026-05-05T10:30:00Z'), + content: undefined, + summary: undefined, + ...overrides + }); + + describe('format with terminal format', () => { + it('should format empty array', async () => { + const result = await formatter.format([], 'terminal'); + expect(result).toContain('No items to display'); + }); + + it('should format single item', async () => { + const item = createMockItem('item-1', { + title: 'My Article', + source: 'https://myblog.com/feed' + }); + const result = await formatter.format([item], 'terminal'); + expect(result).toContain('1.'); + expect(result).toContain('My Article'); + expect(result).toContain('https://myblog.com/feed'); + expect(result).toContain('https://example.com/article/1'); + }); + + it('should format multiple items', async () => { + const items = [ + createMockItem('item-1', { title: 'Article 1' }), + createMockItem('item-2', { title: 'Article 2' }) + ]; + const result = await formatter.format(items, 'terminal'); + expect(result).toContain('Found 2 items'); + expect(result).toContain('1.'); + expect(result).toContain('2.'); + expect(result).toContain('Article 1'); + expect(result).toContain('Article 2'); + }); + + it('should include summary when present', async () => { + const item = createMockItem('item-1', { + summary: 'This is a test summary that might be truncated' + }); + const result = await formatter.format([item], 'terminal'); + expect(result).toContain('This is a test summary'); + }); + }); + + describe('format with json format', () => { + it('should format empty array', async () => { + const result = await formatter.format([], 'json'); + expect(JSON.parse(result)).toEqual([]); + }); + + it('should format single item', async () => { + const item = createMockItem('item-1', { + title: 'My Article' + }); + const result = await formatter.format([item], 'json'); + const parsed = JSON.parse(result); + expect(parsed).toHaveLength(1); + expect(parsed[0].id).toBe('item-1'); + expect(parsed[0].title).toBe('My Article'); + expect(parsed[0].publishedAt).toBe('2026-05-05T10:30:00.000Z'); + }); + + it('should format multiple items', async () => { + const items = [ + createMockItem('item-1', { title: 'Article 1' }), + createMockItem('item-2', { title: 'Article 2' }) + ]; + const result = await formatter.format(items, 'json'); + const parsed = JSON.parse(result); + expect(parsed).toHaveLength(2); + expect(parsed[0].title).toBe('Article 1'); + expect(parsed[1].title).toBe('Article 2'); + }); + + it('should include optional content when present', async () => { + const item = createMockItem('item-1', { + content: 'Full article content here' + }); + const result = await formatter.format([item], 'json'); + const parsed = JSON.parse(result); + expect(parsed[0].content).toBe('Full article content here'); + }); + + it('should include optional summary when present', async () => { + const item = createMockItem('item-1', { + summary: 'Article summary' + }); + const result = await formatter.format([item], 'json'); + const parsed = JSON.parse(result); + expect(parsed[0].summary).toBe('Article summary'); + }); + + it('should not include undefined fields', async () => { + const item = createMockItem('item-1'); + const result = await formatter.format([item], 'json'); + const parsed = JSON.parse(result); + expect(parsed[0]).not.toHaveProperty('content'); + expect(parsed[0]).not.toHaveProperty('summary'); + }); + + it('should use ISO 8601 date format', async () => { + const item = createMockItem('item-1', { + publishedAt: new Date('2026-12-25T15:30:45.123Z') + }); + const result = await formatter.format([item], 'json'); + const parsed = JSON.parse(result); + expect(parsed[0].publishedAt).toBe('2026-12-25T15:30:45.123Z'); + }); + }); + + describe('error handling', () => { + it('should throw FormatterError for HTML format', async () => { + await expect(formatter.format([], 'html')).rejects.toMatchObject({ + code: 'UNKNOWN', + message: 'HTML format not implemented' + }); + }); + + it('should throw FormatterError for unknown format', async () => { + await expect(formatter.format([], 'unknown-format' as any)).rejects.toMatchObject({ + code: 'SERIALIZE_ERROR' + }); + }); + }); +}); + +describe('TerminalFormatter', () => { + const terminalFormatter = new TerminalFormatter(); + + it('should truncate long summary text', () => { + const longSummary = 'a'.repeat(100); + const item: FeedItem = { + id: 'item-1', + source: 'https://example.com', + title: 'Test', + url: 'https://example.com/article', + publishedAt: new Date(), + summary: longSummary + }; + const result = terminalFormatter.format([item]); + expect(result).toContain('...'); + expect(result).not.toContain(longSummary); + }); + + it('should format date in readable format', () => { + const item: FeedItem = { + id: 'item-1', + source: 'https://example.com', + title: 'Test', + url: 'https://example.com/article', + publishedAt: new Date('2026-05-05T10:30:00Z') + }; + const result = terminalFormatter.format([item]); + expect(result).toContain('May'); + expect(result).toContain('2026'); + }); +}); + +describe('JsonFormatter', () => { + const jsonFormatter = new JsonFormatter(); + + it('should produce valid JSON', () => { + const items: FeedItem[] = [ + { + id: 'test-1', + source: 'https://example.com', + title: 'Test Article', + url: 'https://example.com/1', + publishedAt: new Date() + } + ]; + const result = jsonFormatter.format(items); + expect(() => JSON.parse(result)).not.toThrow(); + }); + + it('should convert Date to ISO string', () => { + const date = new Date('2026-01-15T08:00:00Z'); + const items: FeedItem[] = [ + { + id: 'test-1', + source: 'https://example.com', + title: 'Test', + url: 'https://example.com/1', + publishedAt: date + } + ]; + const result = jsonFormatter.format(items); + const parsed = JSON.parse(result); + expect(parsed[0].publishedAt).toBe('2026-01-15T08:00:00.000Z'); + }); +}); diff --git a/modules/formatter/formatter.ts b/modules/formatter/formatter.ts new file mode 100644 index 0000000..82afefd --- /dev/null +++ b/modules/formatter/formatter.ts @@ -0,0 +1,49 @@ +import type { IFormatter, FormatterError, OutputFormat } from '../../interfaces/formatter.interface.js'; +import type { FeedItem } from '../../interfaces/feed.types.js'; +import { TerminalFormatter } from './terminal.formatter.js'; +import { JsonFormatter } from './json.formatter.js'; + +export class Formatter implements IFormatter { + private terminalFormatter: TerminalFormatter; + private jsonFormatter: JsonFormatter; + + constructor() { + this.terminalFormatter = new TerminalFormatter(); + this.jsonFormatter = new JsonFormatter(); + } + + async format(items: FeedItem[], format: OutputFormat): Promise { + try { + switch (format) { + case 'terminal': + return this.terminalFormatter.format(items); + case 'json': + return this.jsonFormatter.format(items); + case 'html': + // HTML not implemented yet per requirements + throw this.createError('UNKNOWN', 'HTML format not implemented'); + default: + throw this.createError('SERIALIZE_ERROR', `Unknown format: ${format}`); + } + } catch (error) { + if (this.isFormatterError(error)) { + throw error; + } + throw this.createError('UNKNOWN', error instanceof Error ? error.message : 'Unknown error during formatting'); + } + } + + private createError(code: FormatterError['code'], message: string): FormatterError { + return { code, message }; + } + + private isFormatterError(error: unknown): error is FormatterError { + return ( + typeof error === 'object' && + error !== null && + 'code' in error && + 'message' in error && + (error as FormatterError).code !== undefined + ); + } +} diff --git a/modules/formatter/index.ts b/modules/formatter/index.ts new file mode 100644 index 0000000..7d8432f --- /dev/null +++ b/modules/formatter/index.ts @@ -0,0 +1,3 @@ +export { Formatter } from './formatter.js'; +export { TerminalFormatter } from './terminal.formatter.js'; +export { JsonFormatter } from './json.formatter.js'; diff --git a/modules/formatter/json.formatter.ts b/modules/formatter/json.formatter.ts new file mode 100644 index 0000000..977bf16 --- /dev/null +++ b/modules/formatter/json.formatter.ts @@ -0,0 +1,27 @@ +import type { FeedItem } from '../../interfaces/feed.types.js'; + +interface JsonFeedItem { + id: string; + source: string; + title: string; + url: string; + publishedAt: string; + content?: string; + summary?: string; +} + +export class JsonFormatter { + format(items: FeedItem[]): string { + const jsonItems: JsonFeedItem[] = items.map(item => ({ + id: item.id, + source: item.source, + title: item.title, + url: item.url, + publishedAt: item.publishedAt.toISOString(), + ...(item.content !== undefined && { content: item.content }), + ...(item.summary !== undefined && { summary: item.summary }) + })); + + return JSON.stringify(jsonItems, null, 2); + } +} diff --git a/modules/formatter/terminal.formatter.ts b/modules/formatter/terminal.formatter.ts new file mode 100644 index 0000000..42abebe --- /dev/null +++ b/modules/formatter/terminal.formatter.ts @@ -0,0 +1,65 @@ +import type { FeedItem } from '../../interfaces/feed.types.js'; + +export class TerminalFormatter { + format(items: FeedItem[]): string { + if (items.length === 0) { + return '\n No items to display.\n'; + } + + const lines: string[] = []; + lines.push(''); + lines.push(` ${this.bold(`Found ${items.length} item${items.length === 1 ? '' : 's'}`)}`); + lines.push(''); + + items.forEach((item, index) => { + const number = `${index + 1}.`.padStart(3); + lines.push(` ${this.dim(number)} ${this.cyan(item.source)}`); + lines.push(` ${this.bold(item.title)}`); + lines.push(` ${this.dim(this.formatDate(item.publishedAt))}`); + lines.push(` ${this.blue(item.url)}`); + + if (item.summary) { + const truncated = this.truncate(item.summary, 80); + lines.push(` ${this.dim(truncated)}`); + } + + lines.push(''); + }); + + return lines.join('\n'); + } + + private formatDate(date: Date): string { + return date.toLocaleString('en-US', { + year: 'numeric', + month: 'short', + day: 'numeric', + hour: '2-digit', + minute: '2-digit' + }); + } + + private truncate(text: string, maxLength: number): string { + if (text.length <= maxLength) { + return text; + } + return text.substring(0, maxLength - 3) + '...'; + } + + // ANSI color codes + private bold(text: string): string { + return `\x1b[1m${text}\x1b[0m`; + } + + private dim(text: string): string { + return `\x1b[2m${text}\x1b[0m`; + } + + private cyan(text: string): string { + return `\x1b[36m${text}\x1b[0m`; + } + + private blue(text: string): string { + return `\x1b[34m${text}\x1b[0m`; + } +} diff --git a/modules/storage/storage.test.ts b/modules/storage/storage.test.ts index 877ba7e..e1e2ef0 100644 --- a/modules/storage/storage.test.ts +++ b/modules/storage/storage.test.ts @@ -4,7 +4,7 @@ import { Kysely, SqliteDialect } from 'kysely'; import { SqlStorage } from './storage.js'; import { migrate, reset } from '../../infrastructure/db/schema.js'; import type { Database } from '../../infrastructure/db/database.js'; -import type { FeedItem } from '../../interfaces/feed.types.js'; +import type { FeedItem, FeedSource } from '../../interfaces/feed.types.js'; describe('SqlStorage', () => { let sqliteDb: BetterSqlite3.Database; @@ -427,4 +427,176 @@ describe('SqlStorage', () => { expect(recent[0].content).toBeUndefined(); }); }); + + describe('Feed Source Management', () => { + function createTestFeedSource(overrides: Partial = {}): FeedSource { + const now = new Date(); + return { + id: 'test-source-1', + url: 'https://example.com/feed.xml', + name: 'Test Feed', + format: 'rss', + pollIntervalMs: 60000, + isActive: true, + lastFetchedAt: null, + lastSuccessAt: null, + consecutiveFailures: 0, + createdAt: now, + updatedAt: now, + ...overrides, + }; + } + + describe('saveFeedSource', () => { + it('saves a new feed source', async () => { + const source = createTestFeedSource(); + await storage.saveFeedSource(source); + + const retrieved = await storage.getFeedSourceById(source.id); + expect(retrieved).toBeTruthy(); + expect(retrieved!.id).toBe(source.id); + expect(retrieved!.url).toBe(source.url); + expect(retrieved!.name).toBe(source.name); + expect(retrieved!.format).toBe(source.format); + expect(retrieved!.pollIntervalMs).toBe(source.pollIntervalMs); + expect(retrieved!.isActive).toBe(source.isActive); + }); + + it('updates existing feed source on duplicate id (upsert)', async () => { + const source = createTestFeedSource({ id: 'same-id', name: 'Original Name' }); + await storage.saveFeedSource(source); + + const updated = createTestFeedSource({ + id: 'same-id', + name: 'Updated Name', + pollIntervalMs: 120000, + }); + await storage.saveFeedSource(updated); + + const retrieved = await storage.getFeedSourceById('same-id'); + expect(retrieved!.name).toBe('Updated Name'); + expect(retrieved!.pollIntervalMs).toBe(120000); + }); + }); + + describe('getFeedSources', () => { + it('returns all feed sources', async () => { + const source1 = createTestFeedSource({ id: 'source-1', url: 'https://example1.com/feed.xml' }); + const source2 = createTestFeedSource({ id: 'source-2', url: 'https://example2.com/feed.xml' }); + + await storage.saveFeedSource(source1); + await storage.saveFeedSource(source2); + + const sources = await storage.getFeedSources(); + expect(sources).toHaveLength(2); + expect(sources.map(s => s.id)).toContain('source-1'); + expect(sources.map(s => s.id)).toContain('source-2'); + }); + + it('returns only active sources when activeOnly is true', async () => { + const active = createTestFeedSource({ id: 'active', url: 'https://active.com/feed.xml', isActive: true }); + const inactive = createTestFeedSource({ id: 'inactive', url: 'https://inactive.com/feed.xml', isActive: false }); + + await storage.saveFeedSource(active); + await storage.saveFeedSource(inactive); + + const sources = await storage.getFeedSources(true); + expect(sources).toHaveLength(1); + expect(sources[0]!.id).toBe('active'); + }); + }); + + describe('getFeedSourceById', () => { + it('returns feed source by id', async () => { + const source = createTestFeedSource({ id: 'specific-id' }); + await storage.saveFeedSource(source); + + const retrieved = await storage.getFeedSourceById('specific-id'); + expect(retrieved).toBeTruthy(); + expect(retrieved!.id).toBe('specific-id'); + }); + + it('returns null for non-existent id', async () => { + const retrieved = await storage.getFeedSourceById('non-existent'); + expect(retrieved).toBeNull(); + }); + }); + + describe('updateFeedSourceStatus', () => { + it('updates lastFetchedAt', async () => { + const source = createTestFeedSource(); + await storage.saveFeedSource(source); + + const fetchTime = new Date('2024-09-06T10:00:00Z'); + await storage.updateFeedSourceStatus(source.id, { lastFetchedAt: fetchTime }); + + const retrieved = await storage.getFeedSourceById(source.id); + expect(retrieved!.lastFetchedAt).toEqual(fetchTime); + }); + + it('updates lastSuccessAt', async () => { + const source = createTestFeedSource(); + await storage.saveFeedSource(source); + + const successTime = new Date('2024-09-06T10:00:00Z'); + await storage.updateFeedSourceStatus(source.id, { lastSuccessAt: successTime }); + + const retrieved = await storage.getFeedSourceById(source.id); + expect(retrieved!.lastSuccessAt).toEqual(successTime); + }); + + it('updates consecutiveFailures', async () => { + const source = createTestFeedSource(); + await storage.saveFeedSource(source); + + await storage.updateFeedSourceStatus(source.id, { consecutiveFailures: 5 }); + + const retrieved = await storage.getFeedSourceById(source.id); + expect(retrieved!.consecutiveFailures).toBe(5); + }); + + it('updates isActive', async () => { + const source = createTestFeedSource({ isActive: true }); + await storage.saveFeedSource(source); + + await storage.updateFeedSourceStatus(source.id, { isActive: false }); + + const retrieved = await storage.getFeedSourceById(source.id); + expect(retrieved!.isActive).toBe(false); + }); + + it('updates multiple fields at once', async () => { + const source = createTestFeedSource(); + await storage.saveFeedSource(source); + + const now = new Date(); + await storage.updateFeedSourceStatus(source.id, { + lastFetchedAt: now, + lastSuccessAt: now, + consecutiveFailures: 0, + }); + + const retrieved = await storage.getFeedSourceById(source.id); + expect(retrieved!.lastFetchedAt).toEqual(now); + expect(retrieved!.lastSuccessAt).toEqual(now); + expect(retrieved!.consecutiveFailures).toBe(0); + }); + }); + + describe('deleteFeedSource', () => { + it('deletes feed source by id', async () => { + const source = createTestFeedSource({ id: 'to-delete' }); + await storage.saveFeedSource(source); + + await storage.deleteFeedSource('to-delete'); + + const retrieved = await storage.getFeedSourceById('to-delete'); + expect(retrieved).toBeNull(); + }); + + it('does not throw when deleting non-existent id', async () => { + await expect(storage.deleteFeedSource('non-existent')).resolves.not.toThrow(); + }); + }); + }); }); diff --git a/modules/storage/storage.ts b/modules/storage/storage.ts index 0769b00..1ea99ea 100644 --- a/modules/storage/storage.ts +++ b/modules/storage/storage.ts @@ -4,9 +4,9 @@ */ import type { Kysely } from 'kysely'; -import type { FeedItem } from '../../interfaces/feed.types.js'; +import type { FeedItem, FeedSource } from '../../interfaces/feed.types.js'; import type { IStorage } from '../../interfaces/storage.interface.js'; -import type { Database, FeedItemTable } from '../../infrastructure/db/database.js'; +import type { Database, FeedItemTable, FeedSourceTable } from '../../infrastructure/db/database.js'; export class SqlStorage implements IStorage { private readonly db: Kysely; @@ -98,4 +98,115 @@ export class SqlStorage implements IStorage { summary: row.summary ?? undefined, }; } + + // Feed source management methods + + async getFeedSources(activeOnly?: boolean): Promise { + let query = this.db.selectFrom('feed_sources').selectAll(); + + if (activeOnly) { + query = query.where('is_active', '=', 1); // Use 1 for SQLite boolean + } + + const rows = await query.orderBy('created_at', 'asc').execute(); + return rows.map(this.rowToFeedSource); + } + + async getFeedSourceById(id: string): Promise { + const row = await this.db + .selectFrom('feed_sources') + .selectAll() + .where('id', '=', id) + .executeTakeFirst(); + + return row ? this.rowToFeedSource(row) : null; + } + + async saveFeedSource(source: FeedSource): Promise { + const now = new Date().toISOString(); + const row: FeedSourceTable = { + id: source.id, + url: source.url, + name: source.name, + format: source.format, + poll_interval_ms: source.pollIntervalMs, + is_active: source.isActive ? 1 : 0, // Convert boolean to integer for SQLite + last_fetched_at: source.lastFetchedAt?.toISOString() ?? null, + last_success_at: source.lastSuccessAt?.toISOString() ?? null, + consecutive_failures: source.consecutiveFailures, + created_at: source.createdAt.toISOString(), + updated_at: now, + }; + + // Upsert: insert or update on conflict + await this.db + .insertInto('feed_sources') + .values(row) + .onConflict((oc) => + oc.column('id').doUpdateSet({ + name: (eb) => eb.ref('excluded.name'), + format: (eb) => eb.ref('excluded.format'), + poll_interval_ms: (eb) => eb.ref('excluded.poll_interval_ms'), + is_active: (eb) => eb.ref('excluded.is_active'), + last_fetched_at: (eb) => eb.ref('excluded.last_fetched_at'), + last_success_at: (eb) => eb.ref('excluded.last_success_at'), + consecutive_failures: (eb) => eb.ref('excluded.consecutive_failures'), + updated_at: (eb) => eb.ref('excluded.updated_at'), + }) + ) + .execute(); + } + + async updateFeedSourceStatus( + id: string, + updates: { + lastFetchedAt?: Date; + lastSuccessAt?: Date; + consecutiveFailures?: number; + isActive?: boolean; + } + ): Promise { + const setClause: Partial = { + updated_at: new Date().toISOString(), + }; + + if (updates.lastFetchedAt !== undefined) { + setClause.last_fetched_at = updates.lastFetchedAt.toISOString(); + } + if (updates.lastSuccessAt !== undefined) { + setClause.last_success_at = updates.lastSuccessAt?.toISOString() ?? null; + } + if (updates.consecutiveFailures !== undefined) { + setClause.consecutive_failures = updates.consecutiveFailures; + } + if (updates.isActive !== undefined) { + setClause.is_active = updates.isActive ? 1 : 0; // Convert boolean to integer for SQLite + } + + await this.db + .updateTable('feed_sources') + .set(setClause) + .where('id', '=', id) + .execute(); + } + + async deleteFeedSource(id: string): Promise { + await this.db.deleteFrom('feed_sources').where('id', '=', id).execute(); + } + + private rowToFeedSource(row: FeedSourceTable): FeedSource { + return { + id: row.id, + url: row.url, + name: row.name, + format: row.format, + pollIntervalMs: row.poll_interval_ms, + isActive: Boolean(row.is_active), // Convert integer to boolean + lastFetchedAt: row.last_fetched_at ? new Date(row.last_fetched_at) : null, + lastSuccessAt: row.last_success_at ? new Date(row.last_success_at) : null, + consecutiveFailures: row.consecutive_failures, + createdAt: new Date(row.created_at), + updatedAt: new Date(row.updated_at), + }; + } } diff --git a/orchestrator/index.ts b/orchestrator/index.ts new file mode 100644 index 0000000..f196151 --- /dev/null +++ b/orchestrator/index.ts @@ -0,0 +1,17 @@ +/** + * Orchestrator module exports. + * Coordinates fetching, parsing, deduplication, and storage of feed items. + */ + +export { + FeedOrchestrator, + type OrchestratorOptions, + type ProcessResult, + type FeedHealth, +} from './orchestrator.js'; + +export { + calculateNextDelay, + calculateJitter, + createConcurrencyLimit, +} from './scheduler.js'; diff --git a/orchestrator/orchestrator.test.ts b/orchestrator/orchestrator.test.ts new file mode 100644 index 0000000..f4e0a34 --- /dev/null +++ b/orchestrator/orchestrator.test.ts @@ -0,0 +1,474 @@ +/** + * Orchestrator tests. + * Tests the FeedOrchestrator class with mocked dependencies. + */ + +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { FeedOrchestrator, type OrchestratorOptions } from './orchestrator.js'; +import { calculateNextDelay, calculateJitter, createConcurrencyLimit } from './scheduler.js'; +import type { IStorage } from '../interfaces/storage.interface.js'; +import type { IFetcher } from '../interfaces/fetcher.interface.js'; +import type { IParser } from '../interfaces/parser.interface.js'; +import type { IDedup } from '../interfaces/dedup.interface.js'; +import type { FeedSource, FeedItem, FetchResult, FetchError } from '../interfaces/feed.types.js'; + +// Mock implementations +function createMockStorage(): IStorage { + const sources: Map = new Map(); + const items: FeedItem[] = []; + + return { + save: vi.fn(async (newItems: FeedItem[]) => { + items.push(...newItems); + }), + getRecent: vi.fn(async (limit: number) => items.slice(0, limit)), + getBySource: vi.fn(async (source: string, limit: number) => + items.filter(i => i.source === source).slice(0, limit) + ), + search: vi.fn(async (query: string) => + items.filter(i => i.title.includes(query) || (i.summary?.includes(query) ?? false)) + ), + getFeedSources: vi.fn(async (activeOnly?: boolean) => + Array.from(sources.values()).filter(s => !activeOnly || s.isActive) + ), + getFeedSourceById: vi.fn(async (id: string) => sources.get(id) ?? null), + saveFeedSource: vi.fn(async (source: FeedSource) => { + sources.set(source.id, source); + }), + updateFeedSourceStatus: vi.fn(async (id: string, updates) => { + const source = sources.get(id); + if (source) { + if (updates.lastFetchedAt !== undefined) source.lastFetchedAt = updates.lastFetchedAt; + if (updates.lastSuccessAt !== undefined) source.lastSuccessAt = updates.lastSuccessAt; + if (updates.consecutiveFailures !== undefined) source.consecutiveFailures = updates.consecutiveFailures; + if (updates.isActive !== undefined) source.isActive = updates.isActive; + source.updatedAt = new Date(); + } + }), + deleteFeedSource: vi.fn(async (id: string) => { + sources.delete(id); + }), + }; +} + +function createMockFetcher(): IFetcher { + return { + fetch: vi.fn(async () => ({ + responses: [], + errors: [], + fetchedAt: new Date(), + })), + fetchMany: vi.fn(async () => ({ + responses: [], + errors: [], + fetchedAt: new Date(), + })), + }; +} + +function createMockParser(): IParser { + return { + parse: vi.fn(async () => []), + supports: vi.fn(() => true), + }; +} + +function createMockDedup(): IDedup { + return { + filter: vi.fn(async (items: FeedItem[]) => items), + markSeen: vi.fn(async () => {}), + }; +} + +function createTestFeedSource(overrides: Partial = {}): FeedSource { + const now = new Date(); + return { + id: 'test-feed-1', + url: 'https://example.com/feed.xml', + name: 'Test Feed', + format: 'rss', + pollIntervalMs: 60000, // 1 minute + isActive: true, + lastFetchedAt: null, + lastSuccessAt: null, + consecutiveFailures: 0, + createdAt: now, + updatedAt: now, + ...overrides, + }; +} + +function createTestFeedItem(overrides: Partial = {}): FeedItem { + return { + id: 'item-1', + source: 'https://example.com/feed.xml', + title: 'Test Article', + url: 'https://example.com/article', + publishedAt: new Date(), + ...overrides, + }; +} + +describe('FeedOrchestrator', () => { + let storage: IStorage; + let fetcher: IFetcher; + let parser: IParser; + let dedup: IDedup; + let orchestrator: FeedOrchestrator; + + beforeEach(() => { + storage = createMockStorage(); + fetcher = createMockFetcher(); + parser = createMockParser(); + dedup = createMockDedup(); + + orchestrator = new FeedOrchestrator({ + storage, + fetcher, + parser, + dedup, + maxRetries: 3, + baseRetryDelayMs: 5000, + maxRetryDelayMs: 300000, + concurrency: 2, + }); + }); + + afterEach(async () => { + await orchestrator.stop(); + }); + + describe('start/stop', () => { + it('should start and poll active feeds immediately', async () => { + const source = createTestFeedSource(); + await storage.saveFeedSource(source); + + // Mock successful fetch + vi.mocked(fetcher.fetch).mockResolvedValue({ + responses: [{ + source: source.url, + body: 'Testhttps://example.com/article', + contentType: 'application/rss+xml', + statusCode: 200, + }], + errors: [], + fetchedAt: new Date(), + }); + + vi.mocked(parser.parse).mockResolvedValue([createTestFeedItem()]); + + await orchestrator.start(); + + // Should have fetched + expect(fetcher.fetch).toHaveBeenCalledWith({ + url: source.url, + expectedFormat: source.format, + }); + + // Should have updated status + expect(storage.updateFeedSourceStatus).toHaveBeenCalledWith( + source.id, + expect.objectContaining({ lastFetchedAt: expect.any(Date) }) + ); + }); + + it('should throw error if already running', async () => { + const source = createTestFeedSource(); + await storage.saveFeedSource(source); + await orchestrator.start(); + + await expect(orchestrator.start()).rejects.toThrow('Orchestrator is already running'); + }); + + it('should clear all timers on stop', async () => { + const source = createTestFeedSource(); + await storage.saveFeedSource(source); + await orchestrator.start(); + + await orchestrator.stop(); + // Should not throw and should clear timers + expect(true).toBe(true); + }); + }); + + describe('processFeed', () => { + it('should process a feed successfully', async () => { + const source = createTestFeedSource(); + await storage.saveFeedSource(source); + + const item = createTestFeedItem(); + + vi.mocked(fetcher.fetch).mockResolvedValue({ + responses: [{ + source: source.url, + body: '', + contentType: 'application/rss+xml', + statusCode: 200, + }], + errors: [], + fetchedAt: new Date(), + }); + + vi.mocked(parser.parse).mockResolvedValue([item]); + vi.mocked(dedup.filter).mockResolvedValue([item]); + + const result = await orchestrator.processFeed(source.id); + + expect(result.success).toBe(true); + expect(result.itemsFound).toBe(1); + expect(result.itemsNew).toBe(1); + expect(result.sourceId).toBe(source.id); + + // Should have saved items + expect(storage.save).toHaveBeenCalledWith([item]); + expect(dedup.markSeen).toHaveBeenCalledWith([item]); + + // Should have updated status with success + expect(storage.updateFeedSourceStatus).toHaveBeenCalledWith( + source.id, + expect.objectContaining({ + lastSuccessAt: expect.any(Date), + consecutiveFailures: 0, + }) + ); + }); + + it('should throw error for non-existent feed', async () => { + await expect(orchestrator.processFeed('non-existent')).rejects.toThrow('Feed source not found'); + }); + + it('should handle fetch errors with retry backoff', async () => { + const source = createTestFeedSource(); + await storage.saveFeedSource(source); + + const error: FetchError = { + source: source.url, + reason: 'Network timeout', + code: 'TIMEOUT', + }; + + vi.mocked(fetcher.fetch).mockResolvedValue({ + responses: [], + errors: [error], + fetchedAt: new Date(), + }); + + const result = await orchestrator.processFeed(source.id); + + expect(result.success).toBe(false); + expect(result.error).toEqual(error); + expect(result.itemsFound).toBe(0); + expect(result.itemsNew).toBe(0); + + // Should have incremented failure count + expect(storage.updateFeedSourceStatus).toHaveBeenCalledWith( + source.id, + expect.objectContaining({ consecutiveFailures: 1 }) + ); + + // Should have delay with backoff + expect(result.nextPollDelayMs).toBeGreaterThan(source.pollIntervalMs); + }); + + it('should handle parse errors', async () => { + const source = createTestFeedSource(); + await storage.saveFeedSource(source); + + vi.mocked(fetcher.fetch).mockResolvedValue({ + responses: [{ + source: source.url, + body: 'invalid xml', + contentType: 'application/rss+xml', + statusCode: 200, + }], + errors: [], + fetchedAt: new Date(), + }); + + vi.mocked(parser.parse).mockRejectedValue(new Error('Parse failed')); + + const result = await orchestrator.processFeed(source.id); + + expect(result.success).toBe(false); + expect(result.error?.code).toBe('PARSE'); + }); + + it('should deduplicate items', async () => { + const source = createTestFeedSource(); + await storage.saveFeedSource(source); + + const items = [ + createTestFeedItem({ id: 'item-1' }), + createTestFeedItem({ id: 'item-2' }), + ]; + + vi.mocked(fetcher.fetch).mockResolvedValue({ + responses: [{ + source: source.url, + body: '', + contentType: 'application/rss+xml', + statusCode: 200, + }], + errors: [], + fetchedAt: new Date(), + }); + + vi.mocked(parser.parse).mockResolvedValue(items); + vi.mocked(dedup.filter).mockResolvedValue([items[0]!]); // Only first item is new + + const result = await orchestrator.processFeed(source.id); + + expect(result.itemsFound).toBe(2); + expect(result.itemsNew).toBe(1); + + // Should only save the new item + expect(storage.save).toHaveBeenCalledWith([items[0]]); + expect(dedup.markSeen).toHaveBeenCalledWith([items[0]]); + }); + }); + + describe('processAllFeeds', () => { + it('should process all active feeds', async () => { + const source1 = createTestFeedSource({ id: 'feed-1', url: 'https://example1.com/feed.xml' }); + const source2 = createTestFeedSource({ id: 'feed-2', url: 'https://example2.com/feed.xml', isActive: false }); + + await storage.saveFeedSource(source1); + await storage.saveFeedSource(source2); + + vi.mocked(fetcher.fetch).mockResolvedValue({ + responses: [{ + source: 'url', + body: '', + contentType: 'application/rss+xml', + statusCode: 200, + }], + errors: [], + fetchedAt: new Date(), + }); + + vi.mocked(parser.parse).mockResolvedValue([createTestFeedItem()]); + + const results = await orchestrator.processAllFeeds(); + + // Only active feeds should be processed + expect(results.size).toBe(1); + expect(results.has('feed-1')).toBe(true); + expect(results.has('feed-2')).toBe(false); + }); + }); + + describe('getFeedHealth', () => { + it('should return health status for all feeds', async () => { + const healthyFeed = createTestFeedSource({ + id: 'healthy', + url: 'https://healthy.com/feed.xml', + consecutiveFailures: 0, + }); + + const unhealthyFeed = createTestFeedSource({ + id: 'unhealthy', + url: 'https://unhealthy.com/feed.xml', + consecutiveFailures: 5, // More than maxRetries (3) + }); + + await storage.saveFeedSource(healthyFeed); + await storage.saveFeedSource(unhealthyFeed); + + const health = await orchestrator.getFeedHealth(); + + expect(health).toHaveLength(2); + + const healthy = health.find(h => h.sourceId === 'healthy'); + const unhealthy = health.find(h => h.sourceId === 'unhealthy'); + + expect(healthy?.isHealthy).toBe(true); + expect(unhealthy?.isHealthy).toBe(false); + }); + }); +}); + +describe('Scheduler utilities', () => { + describe('calculateNextDelay', () => { + it('should return base delay when no failures', () => { + const base = 5000; + const delay = calculateNextDelay(0, base, 300000); + // Allow for ±10% jitter (max 5000ms) + expect(delay).toBeGreaterThanOrEqual(base - 5000); + expect(delay).toBeLessThanOrEqual(base + 5000); + }); + + it('should increase delay with failures', () => { + const base = 5000; + const delay1 = calculateNextDelay(1, base, 300000); + const delay2 = calculateNextDelay(2, base, 300000); + const delay3 = calculateNextDelay(3, base, 300000); + + // Allow for ±10% jitter (max 5000ms) + expect(delay1).toBeGreaterThanOrEqual(base * 2 - 5000); + expect(delay2).toBeGreaterThanOrEqual(base * 4 - 5000); + expect(delay3).toBeGreaterThanOrEqual(base * 8 - 5000); + + // Should generally be increasing (with some tolerance for jitter) + expect(delay1).toBeLessThan(delay3); + }); + + it('should cap delay at maxDelay', () => { + const maxDelay = 300000; + const delay = calculateNextDelay(10, 5000, maxDelay); + + expect(delay).toBeLessThanOrEqual(maxDelay + 5000); // Max + max jitter + }); + }); + + describe('calculateJitter', () => { + it('should return jitter within ±10% of delay', () => { + const delay = 5000; + const jitter = calculateJitter(delay); + + expect(Math.abs(jitter)).toBeLessThanOrEqual(delay * 0.1); + }); + + it('should cap jitter at 5000ms', () => { + const largeDelay = 100000; + const jitter = calculateJitter(largeDelay); + + expect(Math.abs(jitter)).toBeLessThanOrEqual(5000); + }); + }); + + describe('createConcurrencyLimit', () => { + it('should limit concurrent operations', async () => { + const limit = createConcurrencyLimit(2); + let running = 0; + let maxRunning = 0; + + const promises = Array.from({ length: 5 }, async () => { + return limit(async () => { + running++; + maxRunning = Math.max(maxRunning, running); + await new Promise(resolve => setTimeout(resolve, 50)); + running--; + }); + }); + + await Promise.all(promises); + + expect(maxRunning).toBe(2); + expect(running).toBe(0); + }); + + it('should handle errors correctly', async () => { + const limit = createConcurrencyLimit(2); + + const promises = [ + limit(async () => 'success'), + limit(async () => { throw new Error('fail'); }), + ]; + + const results = await Promise.allSettled(promises); + + expect(results[0]?.status).toBe('fulfilled'); + expect(results[1]?.status).toBe('rejected'); + }); + }); +}); diff --git a/orchestrator/orchestrator.ts b/orchestrator/orchestrator.ts new file mode 100644 index 0000000..d572e2c --- /dev/null +++ b/orchestrator/orchestrator.ts @@ -0,0 +1,304 @@ +/** + * Orchestrator module. + * Coordinates fetching, parsing, deduplication, and storage of feed items. + * Runs on a schedule with exponential backoff for failed feeds. + */ + +import type { IStorage } from '../interfaces/storage.interface.js'; +import type { IFetcher } from '../interfaces/fetcher.interface.js'; +import type { IParser } from '../interfaces/parser.interface.js'; +import type { IDedup } from '../interfaces/dedup.interface.js'; +import type { FeedSource, FetchError, FeedItem } from '../interfaces/feed.types.js'; +import { calculateNextDelay, createConcurrencyLimit } from './scheduler.js'; + +export interface OrchestratorOptions { + storage: IStorage; + fetcher: IFetcher; + parser: IParser; + dedup: IDedup; + maxRetries?: number; // For health status tracking only (default: 3) + baseRetryDelayMs?: number; // Default: 5000 (5s) + maxRetryDelayMs?: number; // Default: 300000 (5min) + concurrency?: number; // Default: 5 (parallel feed processing) +} + +export interface ProcessResult { + sourceId: string; + success: boolean; + itemsFound: number; + itemsNew: number; + error?: FetchError; + nextPollDelayMs: number; +} + +export interface FeedHealth { + sourceId: string; + url: string; + name: string | null; + isActive: boolean; + lastFetchedAt: Date | null; + lastSuccessAt: Date | null; + consecutiveFailures: number; + isHealthy: boolean; // true if consecutiveFailures < maxRetries +} + +export class FeedOrchestrator { + private readonly storage: IStorage; + private readonly fetcher: IFetcher; + private readonly parser: IParser; + private readonly dedup: IDedup; + private readonly maxRetries: number; + private readonly baseRetryDelayMs: number; + private readonly maxRetryDelayMs: number; + private readonly concurrency: number; + private readonly limit: (fn: () => Promise) => Promise; + + private isRunning: boolean = false; + private timers: Map = new Map(); + + constructor(options: OrchestratorOptions) { + this.storage = options.storage; + this.fetcher = options.fetcher; + this.parser = options.parser; + this.dedup = options.dedup; + this.maxRetries = options.maxRetries ?? 3; + this.baseRetryDelayMs = options.baseRetryDelayMs ?? 5000; + this.maxRetryDelayMs = options.maxRetryDelayMs ?? 300000; + this.concurrency = options.concurrency ?? 5; + this.limit = createConcurrencyLimit(this.concurrency); + } + + /** + * Start the orchestrator. + * Loads all active feed sources and immediately polls them. + * Schedules recurring polls based on each feed's pollIntervalMs. + */ + async start(): Promise { + if (this.isRunning) { + throw new Error('Orchestrator is already running'); + } + + this.isRunning = true; + + // Load all active feed sources + const sources = await this.storage.getFeedSources(true); + + // Immediately poll all feeds (with concurrency limit) + await Promise.all( + sources.map((source) => + this.limit(() => this.schedulePoll(source)) + ) + ); + } + + /** + * Stop the orchestrator. + * Clears all scheduled timers. + */ + async stop(): Promise { + this.isRunning = false; + + // Clear all timers + for (const [id, timer] of this.timers) { + clearTimeout(timer); + this.timers.delete(id); + } + } + + /** + * Process a single feed immediately. + * This still runs through the deduplication pipeline. + * + * @param sourceId The ID of the feed source to process + * @returns ProcessResult with details of the fetch + */ + async processFeed(sourceId: string): Promise { + const source = await this.storage.getFeedSourceById(sourceId); + if (!source) { + throw new Error(`Feed source not found: ${sourceId}`); + } + + return this.processFeedInternal(source); + } + + /** + * Process all feeds once (useful for cron/one-shot mode). + * Does not start recurring polling. + * + * @returns Map of sourceId to ProcessResult + */ + async processAllFeeds(): Promise> { + const sources = await this.storage.getFeedSources(true); + const results = new Map(); + + // Process all feeds with concurrency limit + const processPromises = sources.map(async (source) => { + const result = await this.limit(() => this.processFeedInternal(source)); + results.set(source.id, result); + }); + + await Promise.all(processPromises); + return results; + } + + /** + * Get health status of all feeds. + * + * @returns Array of FeedHealth for all feed sources + */ + async getFeedHealth(): Promise { + const sources = await this.storage.getFeedSources(); + + return sources.map((source) => ({ + sourceId: source.id, + url: source.url, + name: source.name, + isActive: source.isActive, + lastFetchedAt: source.lastFetchedAt, + lastSuccessAt: source.lastSuccessAt, + consecutiveFailures: source.consecutiveFailures, + isHealthy: source.consecutiveFailures < this.maxRetries, + })); + } + + /** + * Internal method to schedule the next poll for a feed. + */ + private async schedulePoll(source: FeedSource): Promise { + if (!this.isRunning) { + return; + } + + // Clear any existing timer for this source + const existingTimer = this.timers.get(source.id); + if (existingTimer) { + clearTimeout(existingTimer); + } + + // Process the feed + const result = await this.processFeedInternal(source); + + // Schedule next poll based on result + const timer = setTimeout(() => { + this.timers.delete(source.id); + this.schedulePoll(source); + }, result.nextPollDelayMs); + + this.timers.set(source.id, timer); + } + + /** + * Internal method to process a single feed. + * Updates the feed status in storage. + */ + private async processFeedInternal(source: FeedSource): Promise { + const now = new Date(); + + // Update last fetched time + await this.storage.updateFeedSourceStatus(source.id, { + lastFetchedAt: now, + }); + + // Fetch the feed + const fetchInput = { + url: source.url, + expectedFormat: source.format, + }; + + const fetchResult = await this.fetcher.fetch(fetchInput); + + // Handle fetch errors + if (fetchResult.errors.length > 0) { + const error = fetchResult.errors[0]!; + const newConsecutiveFailures = source.consecutiveFailures + 1; + + // Update status with failure + await this.storage.updateFeedSourceStatus(source.id, { + consecutiveFailures: newConsecutiveFailures, + }); + + // Calculate next delay with backoff + const nextDelay = calculateNextDelay( + newConsecutiveFailures, + source.pollIntervalMs, // Use the configured poll interval as base + this.maxRetryDelayMs + ); + + return { + sourceId: source.id, + success: false, + itemsFound: 0, + itemsNew: 0, + error, + nextPollDelayMs: Math.max(nextDelay, this.baseRetryDelayMs), + }; + } + + // Parse the feed + let items: FeedItem[] = []; + try { + for (const response of fetchResult.responses) { + const parsedItems = await this.parser.parse(response.body, response.source); + items.push(...parsedItems); + } + } catch (parseError) { + const error: FetchError = { + source: source.url, + reason: parseError instanceof Error ? parseError.message : 'Parse error', + code: 'PARSE', + }; + + const newConsecutiveFailures = source.consecutiveFailures + 1; + await this.storage.updateFeedSourceStatus(source.id, { + consecutiveFailures: newConsecutiveFailures, + }); + + const nextDelay = calculateNextDelay( + newConsecutiveFailures, + source.pollIntervalMs, + this.maxRetryDelayMs + ); + + return { + sourceId: source.id, + success: false, + itemsFound: 0, + itemsNew: 0, + error, + nextPollDelayMs: Math.max(nextDelay, this.baseRetryDelayMs), + }; + } + + const itemsFound = items.length; + + // Filter duplicates + const newItems = await this.dedup.filter(items); + + // Store new items + if (newItems.length > 0) { + await this.storage.save(newItems); + await this.dedup.markSeen(newItems); + } + + // Update status with success + await this.storage.updateFeedSourceStatus(source.id, { + lastSuccessAt: now, + consecutiveFailures: 0, + }); + + // Calculate next delay (base interval, no backoff since successful) + const nextDelay = calculateNextDelay( + 0, + source.pollIntervalMs, + this.maxRetryDelayMs + ); + + return { + sourceId: source.id, + success: true, + itemsFound, + itemsNew: newItems.length, + nextPollDelayMs: nextDelay, + }; + } +} diff --git a/orchestrator/scheduler.ts b/orchestrator/scheduler.ts new file mode 100644 index 0000000..ad15991 --- /dev/null +++ b/orchestrator/scheduler.ts @@ -0,0 +1,88 @@ +/** + * Scheduler utilities for calculating poll delays with exponential backoff. + */ + +/** + * Calculate the next poll delay in milliseconds. + * Uses exponential backoff based on consecutive failures. + * + * Formula: delay = baseDelay * (2 ^ consecutiveFailures) + jitter + * Capped at maxDelay. + * + * @param consecutiveFailures Number of consecutive failed attempts + * @param baseDelayMs Base delay in milliseconds (e.g., 5000 for 5 seconds) + * @param maxDelayMs Maximum delay in milliseconds (e.g., 300000 for 5 minutes) + * @returns Delay in milliseconds before next poll + */ +export function calculateNextDelay( + consecutiveFailures: number, + baseDelayMs: number, + maxDelayMs: number +): number { + // If no failures, use base delay + if (consecutiveFailures === 0) { + return baseDelayMs + calculateJitter(baseDelayMs); + } + + // Calculate exponential backoff: base * 2^failures + // Cap the exponent to avoid overflow + const cappedFailures = Math.min(consecutiveFailures, 10); + const exponentialDelay = baseDelayMs * Math.pow(2, cappedFailures); + + // Apply cap + const cappedDelay = Math.min(exponentialDelay, maxDelayMs); + + // Add jitter to prevent thundering herd + return cappedDelay + calculateJitter(cappedDelay); +} + +/** + * Calculate a small random jitter (±10% of delay). + * Prevents multiple feeds from syncing up and hitting the server simultaneously. + * + * @param delayMs Base delay in milliseconds + * @returns Jitter offset in milliseconds (can be positive or negative) + */ +export function calculateJitter(delayMs: number): number { + // ±10% jitter, max ±5000ms (5 seconds) + const maxJitter = Math.min(delayMs * 0.1, 5000); + return (Math.random() * 2 - 1) * maxJitter; +} + +/** + * Create a concurrency limiter function. + * Limits the number of promises running concurrently. + * + * @param concurrency Maximum number of concurrent operations + * @returns A function that wraps promises to enforce concurrency limit + */ +export function createConcurrencyLimit(concurrency: number) { + const queue: (() => void)[] = []; + let activeCount = 0; + + return function (fn: () => Promise): Promise { + return new Promise((resolve, reject) => { + const run = async () => { + activeCount++; + try { + const result = await fn(); + resolve(result); + } catch (error) { + reject(error); + } finally { + activeCount--; + if (queue.length > 0) { + const next = queue.shift()!; + next(); + } + } + }; + + if (activeCount < concurrency) { + run(); + } else { + queue.push(run); + } + }); + }; +}