pulse/orchestrator/orchestrator.test.ts
Edo Limburg 78a2b27f6d feat: implement orchestrator module with feed source management
Add FeedOrchestrator that coordinates fetch→parse→dedup→store pipeline:
- FeedSource type for managing RSS/Atom feed configurations
- Feed source CRUD operations in IStorage interface
- Database schema migration for feed_sources table
- Exponential backoff retry with configurable delays
- Per-feed poll intervals with health tracking
- Concurrency-limited parallel feed processing
- ProcessResult and FeedHealth interfaces for status monitoring

Files added:
- orchestrator/orchestrator.ts - main orchestrator class
- orchestrator/scheduler.ts - backoff calculation utilities
- orchestrator/index.ts - module exports
- orchestrator/orchestrator.test.ts - comprehensive test suite

Files modified:
- interfaces/feed.types.ts - add FeedSource type
- interfaces/storage.interface.ts - extend with feed source methods
- infrastructure/db/database.ts - add FeedSourceTable interface
- infrastructure/db/schema.ts - add feed_sources table migration
- modules/storage/storage.ts - implement feed source CRUD
- modules/storage/storage.test.ts - add feed source tests
2026-05-05 22:17:16 +02:00

475 lines
14 KiB
TypeScript

/**
* Orchestrator tests.
* Tests the FeedOrchestrator class with mocked dependencies.
*/
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { FeedOrchestrator, type OrchestratorOptions } from './orchestrator.js';
import { calculateNextDelay, calculateJitter, createConcurrencyLimit } from './scheduler.js';
import type { IStorage } from '../interfaces/storage.interface.js';
import type { IFetcher } from '../interfaces/fetcher.interface.js';
import type { IParser } from '../interfaces/parser.interface.js';
import type { IDedup } from '../interfaces/dedup.interface.js';
import type { FeedSource, FeedItem, FetchResult, FetchError } from '../interfaces/feed.types.js';
// Mock implementations
function createMockStorage(): IStorage {
const sources: Map<string, FeedSource> = new Map();
const items: FeedItem[] = [];
return {
save: vi.fn(async (newItems: FeedItem[]) => {
items.push(...newItems);
}),
getRecent: vi.fn(async (limit: number) => items.slice(0, limit)),
getBySource: vi.fn(async (source: string, limit: number) =>
items.filter(i => i.source === source).slice(0, limit)
),
search: vi.fn(async (query: string) =>
items.filter(i => i.title.includes(query) || (i.summary?.includes(query) ?? false))
),
getFeedSources: vi.fn(async (activeOnly?: boolean) =>
Array.from(sources.values()).filter(s => !activeOnly || s.isActive)
),
getFeedSourceById: vi.fn(async (id: string) => sources.get(id) ?? null),
saveFeedSource: vi.fn(async (source: FeedSource) => {
sources.set(source.id, source);
}),
updateFeedSourceStatus: vi.fn(async (id: string, updates) => {
const source = sources.get(id);
if (source) {
if (updates.lastFetchedAt !== undefined) source.lastFetchedAt = updates.lastFetchedAt;
if (updates.lastSuccessAt !== undefined) source.lastSuccessAt = updates.lastSuccessAt;
if (updates.consecutiveFailures !== undefined) source.consecutiveFailures = updates.consecutiveFailures;
if (updates.isActive !== undefined) source.isActive = updates.isActive;
source.updatedAt = new Date();
}
}),
deleteFeedSource: vi.fn(async (id: string) => {
sources.delete(id);
}),
};
}
function createMockFetcher(): IFetcher {
return {
fetch: vi.fn(async () => ({
responses: [],
errors: [],
fetchedAt: new Date(),
})),
fetchMany: vi.fn(async () => ({
responses: [],
errors: [],
fetchedAt: new Date(),
})),
};
}
function createMockParser(): IParser {
return {
parse: vi.fn(async () => []),
supports: vi.fn(() => true),
};
}
function createMockDedup(): IDedup {
return {
filter: vi.fn(async (items: FeedItem[]) => items),
markSeen: vi.fn(async () => {}),
};
}
function createTestFeedSource(overrides: Partial<FeedSource> = {}): FeedSource {
const now = new Date();
return {
id: 'test-feed-1',
url: 'https://example.com/feed.xml',
name: 'Test Feed',
format: 'rss',
pollIntervalMs: 60000, // 1 minute
isActive: true,
lastFetchedAt: null,
lastSuccessAt: null,
consecutiveFailures: 0,
createdAt: now,
updatedAt: now,
...overrides,
};
}
function createTestFeedItem(overrides: Partial<FeedItem> = {}): FeedItem {
return {
id: 'item-1',
source: 'https://example.com/feed.xml',
title: 'Test Article',
url: 'https://example.com/article',
publishedAt: new Date(),
...overrides,
};
}
describe('FeedOrchestrator', () => {
let storage: IStorage;
let fetcher: IFetcher;
let parser: IParser;
let dedup: IDedup;
let orchestrator: FeedOrchestrator;
beforeEach(() => {
storage = createMockStorage();
fetcher = createMockFetcher();
parser = createMockParser();
dedup = createMockDedup();
orchestrator = new FeedOrchestrator({
storage,
fetcher,
parser,
dedup,
maxRetries: 3,
baseRetryDelayMs: 5000,
maxRetryDelayMs: 300000,
concurrency: 2,
});
});
afterEach(async () => {
await orchestrator.stop();
});
describe('start/stop', () => {
it('should start and poll active feeds immediately', async () => {
const source = createTestFeedSource();
await storage.saveFeedSource(source);
// Mock successful fetch
vi.mocked(fetcher.fetch).mockResolvedValue({
responses: [{
source: source.url,
body: '<rss><channel><item><title>Test</title><link>https://example.com/article</link></item></channel></rss>',
contentType: 'application/rss+xml',
statusCode: 200,
}],
errors: [],
fetchedAt: new Date(),
});
vi.mocked(parser.parse).mockResolvedValue([createTestFeedItem()]);
await orchestrator.start();
// Should have fetched
expect(fetcher.fetch).toHaveBeenCalledWith({
url: source.url,
expectedFormat: source.format,
});
// Should have updated status
expect(storage.updateFeedSourceStatus).toHaveBeenCalledWith(
source.id,
expect.objectContaining({ lastFetchedAt: expect.any(Date) })
);
});
it('should throw error if already running', async () => {
const source = createTestFeedSource();
await storage.saveFeedSource(source);
await orchestrator.start();
await expect(orchestrator.start()).rejects.toThrow('Orchestrator is already running');
});
it('should clear all timers on stop', async () => {
const source = createTestFeedSource();
await storage.saveFeedSource(source);
await orchestrator.start();
await orchestrator.stop();
// Should not throw and should clear timers
expect(true).toBe(true);
});
});
describe('processFeed', () => {
it('should process a feed successfully', async () => {
const source = createTestFeedSource();
await storage.saveFeedSource(source);
const item = createTestFeedItem();
vi.mocked(fetcher.fetch).mockResolvedValue({
responses: [{
source: source.url,
body: '<rss></rss>',
contentType: 'application/rss+xml',
statusCode: 200,
}],
errors: [],
fetchedAt: new Date(),
});
vi.mocked(parser.parse).mockResolvedValue([item]);
vi.mocked(dedup.filter).mockResolvedValue([item]);
const result = await orchestrator.processFeed(source.id);
expect(result.success).toBe(true);
expect(result.itemsFound).toBe(1);
expect(result.itemsNew).toBe(1);
expect(result.sourceId).toBe(source.id);
// Should have saved items
expect(storage.save).toHaveBeenCalledWith([item]);
expect(dedup.markSeen).toHaveBeenCalledWith([item]);
// Should have updated status with success
expect(storage.updateFeedSourceStatus).toHaveBeenCalledWith(
source.id,
expect.objectContaining({
lastSuccessAt: expect.any(Date),
consecutiveFailures: 0,
})
);
});
it('should throw error for non-existent feed', async () => {
await expect(orchestrator.processFeed('non-existent')).rejects.toThrow('Feed source not found');
});
it('should handle fetch errors with retry backoff', async () => {
const source = createTestFeedSource();
await storage.saveFeedSource(source);
const error: FetchError = {
source: source.url,
reason: 'Network timeout',
code: 'TIMEOUT',
};
vi.mocked(fetcher.fetch).mockResolvedValue({
responses: [],
errors: [error],
fetchedAt: new Date(),
});
const result = await orchestrator.processFeed(source.id);
expect(result.success).toBe(false);
expect(result.error).toEqual(error);
expect(result.itemsFound).toBe(0);
expect(result.itemsNew).toBe(0);
// Should have incremented failure count
expect(storage.updateFeedSourceStatus).toHaveBeenCalledWith(
source.id,
expect.objectContaining({ consecutiveFailures: 1 })
);
// Should have delay with backoff
expect(result.nextPollDelayMs).toBeGreaterThan(source.pollIntervalMs);
});
it('should handle parse errors', async () => {
const source = createTestFeedSource();
await storage.saveFeedSource(source);
vi.mocked(fetcher.fetch).mockResolvedValue({
responses: [{
source: source.url,
body: 'invalid xml',
contentType: 'application/rss+xml',
statusCode: 200,
}],
errors: [],
fetchedAt: new Date(),
});
vi.mocked(parser.parse).mockRejectedValue(new Error('Parse failed'));
const result = await orchestrator.processFeed(source.id);
expect(result.success).toBe(false);
expect(result.error?.code).toBe('PARSE');
});
it('should deduplicate items', async () => {
const source = createTestFeedSource();
await storage.saveFeedSource(source);
const items = [
createTestFeedItem({ id: 'item-1' }),
createTestFeedItem({ id: 'item-2' }),
];
vi.mocked(fetcher.fetch).mockResolvedValue({
responses: [{
source: source.url,
body: '<rss></rss>',
contentType: 'application/rss+xml',
statusCode: 200,
}],
errors: [],
fetchedAt: new Date(),
});
vi.mocked(parser.parse).mockResolvedValue(items);
vi.mocked(dedup.filter).mockResolvedValue([items[0]!]); // Only first item is new
const result = await orchestrator.processFeed(source.id);
expect(result.itemsFound).toBe(2);
expect(result.itemsNew).toBe(1);
// Should only save the new item
expect(storage.save).toHaveBeenCalledWith([items[0]]);
expect(dedup.markSeen).toHaveBeenCalledWith([items[0]]);
});
});
describe('processAllFeeds', () => {
it('should process all active feeds', async () => {
const source1 = createTestFeedSource({ id: 'feed-1', url: 'https://example1.com/feed.xml' });
const source2 = createTestFeedSource({ id: 'feed-2', url: 'https://example2.com/feed.xml', isActive: false });
await storage.saveFeedSource(source1);
await storage.saveFeedSource(source2);
vi.mocked(fetcher.fetch).mockResolvedValue({
responses: [{
source: 'url',
body: '<rss></rss>',
contentType: 'application/rss+xml',
statusCode: 200,
}],
errors: [],
fetchedAt: new Date(),
});
vi.mocked(parser.parse).mockResolvedValue([createTestFeedItem()]);
const results = await orchestrator.processAllFeeds();
// Only active feeds should be processed
expect(results.size).toBe(1);
expect(results.has('feed-1')).toBe(true);
expect(results.has('feed-2')).toBe(false);
});
});
describe('getFeedHealth', () => {
it('should return health status for all feeds', async () => {
const healthyFeed = createTestFeedSource({
id: 'healthy',
url: 'https://healthy.com/feed.xml',
consecutiveFailures: 0,
});
const unhealthyFeed = createTestFeedSource({
id: 'unhealthy',
url: 'https://unhealthy.com/feed.xml',
consecutiveFailures: 5, // More than maxRetries (3)
});
await storage.saveFeedSource(healthyFeed);
await storage.saveFeedSource(unhealthyFeed);
const health = await orchestrator.getFeedHealth();
expect(health).toHaveLength(2);
const healthy = health.find(h => h.sourceId === 'healthy');
const unhealthy = health.find(h => h.sourceId === 'unhealthy');
expect(healthy?.isHealthy).toBe(true);
expect(unhealthy?.isHealthy).toBe(false);
});
});
});
describe('Scheduler utilities', () => {
describe('calculateNextDelay', () => {
it('should return base delay when no failures', () => {
const base = 5000;
const delay = calculateNextDelay(0, base, 300000);
// Allow for ±10% jitter (max 5000ms)
expect(delay).toBeGreaterThanOrEqual(base - 5000);
expect(delay).toBeLessThanOrEqual(base + 5000);
});
it('should increase delay with failures', () => {
const base = 5000;
const delay1 = calculateNextDelay(1, base, 300000);
const delay2 = calculateNextDelay(2, base, 300000);
const delay3 = calculateNextDelay(3, base, 300000);
// Allow for ±10% jitter (max 5000ms)
expect(delay1).toBeGreaterThanOrEqual(base * 2 - 5000);
expect(delay2).toBeGreaterThanOrEqual(base * 4 - 5000);
expect(delay3).toBeGreaterThanOrEqual(base * 8 - 5000);
// Should generally be increasing (with some tolerance for jitter)
expect(delay1).toBeLessThan(delay3);
});
it('should cap delay at maxDelay', () => {
const maxDelay = 300000;
const delay = calculateNextDelay(10, 5000, maxDelay);
expect(delay).toBeLessThanOrEqual(maxDelay + 5000); // Max + max jitter
});
});
describe('calculateJitter', () => {
it('should return jitter within ±10% of delay', () => {
const delay = 5000;
const jitter = calculateJitter(delay);
expect(Math.abs(jitter)).toBeLessThanOrEqual(delay * 0.1);
});
it('should cap jitter at 5000ms', () => {
const largeDelay = 100000;
const jitter = calculateJitter(largeDelay);
expect(Math.abs(jitter)).toBeLessThanOrEqual(5000);
});
});
describe('createConcurrencyLimit', () => {
it('should limit concurrent operations', async () => {
const limit = createConcurrencyLimit(2);
let running = 0;
let maxRunning = 0;
const promises = Array.from({ length: 5 }, async () => {
return limit(async () => {
running++;
maxRunning = Math.max(maxRunning, running);
await new Promise(resolve => setTimeout(resolve, 50));
running--;
});
});
await Promise.all(promises);
expect(maxRunning).toBe(2);
expect(running).toBe(0);
});
it('should handle errors correctly', async () => {
const limit = createConcurrencyLimit(2);
const promises = [
limit(async () => 'success'),
limit(async () => { throw new Error('fail'); }),
];
const results = await Promise.allSettled(promises);
expect(results[0]?.status).toBe('fulfilled');
expect(results[1]?.status).toBe('rejected');
});
});
});