Add FeedOrchestrator that coordinates fetch→parse→dedup→store pipeline: - FeedSource type for managing RSS/Atom feed configurations - Feed source CRUD operations in IStorage interface - Database schema migration for feed_sources table - Exponential backoff retry with configurable delays - Per-feed poll intervals with health tracking - Concurrency-limited parallel feed processing - ProcessResult and FeedHealth interfaces for status monitoring Files added: - orchestrator/orchestrator.ts - main orchestrator class - orchestrator/scheduler.ts - backoff calculation utilities - orchestrator/index.ts - module exports - orchestrator/orchestrator.test.ts - comprehensive test suite Files modified: - interfaces/feed.types.ts - add FeedSource type - interfaces/storage.interface.ts - extend with feed source methods - infrastructure/db/database.ts - add FeedSourceTable interface - infrastructure/db/schema.ts - add feed_sources table migration - modules/storage/storage.ts - implement feed source CRUD - modules/storage/storage.test.ts - add feed source tests
475 lines
14 KiB
TypeScript
475 lines
14 KiB
TypeScript
/**
|
|
* Orchestrator tests.
|
|
* Tests the FeedOrchestrator class with mocked dependencies.
|
|
*/
|
|
|
|
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
|
|
import { FeedOrchestrator, type OrchestratorOptions } from './orchestrator.js';
|
|
import { calculateNextDelay, calculateJitter, createConcurrencyLimit } from './scheduler.js';
|
|
import type { IStorage } from '../interfaces/storage.interface.js';
|
|
import type { IFetcher } from '../interfaces/fetcher.interface.js';
|
|
import type { IParser } from '../interfaces/parser.interface.js';
|
|
import type { IDedup } from '../interfaces/dedup.interface.js';
|
|
import type { FeedSource, FeedItem, FetchResult, FetchError } from '../interfaces/feed.types.js';
|
|
|
|
// Mock implementations
|
|
function createMockStorage(): IStorage {
|
|
const sources: Map<string, FeedSource> = new Map();
|
|
const items: FeedItem[] = [];
|
|
|
|
return {
|
|
save: vi.fn(async (newItems: FeedItem[]) => {
|
|
items.push(...newItems);
|
|
}),
|
|
getRecent: vi.fn(async (limit: number) => items.slice(0, limit)),
|
|
getBySource: vi.fn(async (source: string, limit: number) =>
|
|
items.filter(i => i.source === source).slice(0, limit)
|
|
),
|
|
search: vi.fn(async (query: string) =>
|
|
items.filter(i => i.title.includes(query) || (i.summary?.includes(query) ?? false))
|
|
),
|
|
getFeedSources: vi.fn(async (activeOnly?: boolean) =>
|
|
Array.from(sources.values()).filter(s => !activeOnly || s.isActive)
|
|
),
|
|
getFeedSourceById: vi.fn(async (id: string) => sources.get(id) ?? null),
|
|
saveFeedSource: vi.fn(async (source: FeedSource) => {
|
|
sources.set(source.id, source);
|
|
}),
|
|
updateFeedSourceStatus: vi.fn(async (id: string, updates) => {
|
|
const source = sources.get(id);
|
|
if (source) {
|
|
if (updates.lastFetchedAt !== undefined) source.lastFetchedAt = updates.lastFetchedAt;
|
|
if (updates.lastSuccessAt !== undefined) source.lastSuccessAt = updates.lastSuccessAt;
|
|
if (updates.consecutiveFailures !== undefined) source.consecutiveFailures = updates.consecutiveFailures;
|
|
if (updates.isActive !== undefined) source.isActive = updates.isActive;
|
|
source.updatedAt = new Date();
|
|
}
|
|
}),
|
|
deleteFeedSource: vi.fn(async (id: string) => {
|
|
sources.delete(id);
|
|
}),
|
|
};
|
|
}
|
|
|
|
function createMockFetcher(): IFetcher {
|
|
return {
|
|
fetch: vi.fn(async () => ({
|
|
responses: [],
|
|
errors: [],
|
|
fetchedAt: new Date(),
|
|
})),
|
|
fetchMany: vi.fn(async () => ({
|
|
responses: [],
|
|
errors: [],
|
|
fetchedAt: new Date(),
|
|
})),
|
|
};
|
|
}
|
|
|
|
function createMockParser(): IParser {
|
|
return {
|
|
parse: vi.fn(async () => []),
|
|
supports: vi.fn(() => true),
|
|
};
|
|
}
|
|
|
|
function createMockDedup(): IDedup {
|
|
return {
|
|
filter: vi.fn(async (items: FeedItem[]) => items),
|
|
markSeen: vi.fn(async () => {}),
|
|
};
|
|
}
|
|
|
|
function createTestFeedSource(overrides: Partial<FeedSource> = {}): FeedSource {
|
|
const now = new Date();
|
|
return {
|
|
id: 'test-feed-1',
|
|
url: 'https://example.com/feed.xml',
|
|
name: 'Test Feed',
|
|
format: 'rss',
|
|
pollIntervalMs: 60000, // 1 minute
|
|
isActive: true,
|
|
lastFetchedAt: null,
|
|
lastSuccessAt: null,
|
|
consecutiveFailures: 0,
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
...overrides,
|
|
};
|
|
}
|
|
|
|
function createTestFeedItem(overrides: Partial<FeedItem> = {}): FeedItem {
|
|
return {
|
|
id: 'item-1',
|
|
source: 'https://example.com/feed.xml',
|
|
title: 'Test Article',
|
|
url: 'https://example.com/article',
|
|
publishedAt: new Date(),
|
|
...overrides,
|
|
};
|
|
}
|
|
|
|
describe('FeedOrchestrator', () => {
|
|
let storage: IStorage;
|
|
let fetcher: IFetcher;
|
|
let parser: IParser;
|
|
let dedup: IDedup;
|
|
let orchestrator: FeedOrchestrator;
|
|
|
|
beforeEach(() => {
|
|
storage = createMockStorage();
|
|
fetcher = createMockFetcher();
|
|
parser = createMockParser();
|
|
dedup = createMockDedup();
|
|
|
|
orchestrator = new FeedOrchestrator({
|
|
storage,
|
|
fetcher,
|
|
parser,
|
|
dedup,
|
|
maxRetries: 3,
|
|
baseRetryDelayMs: 5000,
|
|
maxRetryDelayMs: 300000,
|
|
concurrency: 2,
|
|
});
|
|
});
|
|
|
|
afterEach(async () => {
|
|
await orchestrator.stop();
|
|
});
|
|
|
|
describe('start/stop', () => {
|
|
it('should start and poll active feeds immediately', async () => {
|
|
const source = createTestFeedSource();
|
|
await storage.saveFeedSource(source);
|
|
|
|
// Mock successful fetch
|
|
vi.mocked(fetcher.fetch).mockResolvedValue({
|
|
responses: [{
|
|
source: source.url,
|
|
body: '<rss><channel><item><title>Test</title><link>https://example.com/article</link></item></channel></rss>',
|
|
contentType: 'application/rss+xml',
|
|
statusCode: 200,
|
|
}],
|
|
errors: [],
|
|
fetchedAt: new Date(),
|
|
});
|
|
|
|
vi.mocked(parser.parse).mockResolvedValue([createTestFeedItem()]);
|
|
|
|
await orchestrator.start();
|
|
|
|
// Should have fetched
|
|
expect(fetcher.fetch).toHaveBeenCalledWith({
|
|
url: source.url,
|
|
expectedFormat: source.format,
|
|
});
|
|
|
|
// Should have updated status
|
|
expect(storage.updateFeedSourceStatus).toHaveBeenCalledWith(
|
|
source.id,
|
|
expect.objectContaining({ lastFetchedAt: expect.any(Date) })
|
|
);
|
|
});
|
|
|
|
it('should throw error if already running', async () => {
|
|
const source = createTestFeedSource();
|
|
await storage.saveFeedSource(source);
|
|
await orchestrator.start();
|
|
|
|
await expect(orchestrator.start()).rejects.toThrow('Orchestrator is already running');
|
|
});
|
|
|
|
it('should clear all timers on stop', async () => {
|
|
const source = createTestFeedSource();
|
|
await storage.saveFeedSource(source);
|
|
await orchestrator.start();
|
|
|
|
await orchestrator.stop();
|
|
// Should not throw and should clear timers
|
|
expect(true).toBe(true);
|
|
});
|
|
});
|
|
|
|
describe('processFeed', () => {
|
|
it('should process a feed successfully', async () => {
|
|
const source = createTestFeedSource();
|
|
await storage.saveFeedSource(source);
|
|
|
|
const item = createTestFeedItem();
|
|
|
|
vi.mocked(fetcher.fetch).mockResolvedValue({
|
|
responses: [{
|
|
source: source.url,
|
|
body: '<rss></rss>',
|
|
contentType: 'application/rss+xml',
|
|
statusCode: 200,
|
|
}],
|
|
errors: [],
|
|
fetchedAt: new Date(),
|
|
});
|
|
|
|
vi.mocked(parser.parse).mockResolvedValue([item]);
|
|
vi.mocked(dedup.filter).mockResolvedValue([item]);
|
|
|
|
const result = await orchestrator.processFeed(source.id);
|
|
|
|
expect(result.success).toBe(true);
|
|
expect(result.itemsFound).toBe(1);
|
|
expect(result.itemsNew).toBe(1);
|
|
expect(result.sourceId).toBe(source.id);
|
|
|
|
// Should have saved items
|
|
expect(storage.save).toHaveBeenCalledWith([item]);
|
|
expect(dedup.markSeen).toHaveBeenCalledWith([item]);
|
|
|
|
// Should have updated status with success
|
|
expect(storage.updateFeedSourceStatus).toHaveBeenCalledWith(
|
|
source.id,
|
|
expect.objectContaining({
|
|
lastSuccessAt: expect.any(Date),
|
|
consecutiveFailures: 0,
|
|
})
|
|
);
|
|
});
|
|
|
|
it('should throw error for non-existent feed', async () => {
|
|
await expect(orchestrator.processFeed('non-existent')).rejects.toThrow('Feed source not found');
|
|
});
|
|
|
|
it('should handle fetch errors with retry backoff', async () => {
|
|
const source = createTestFeedSource();
|
|
await storage.saveFeedSource(source);
|
|
|
|
const error: FetchError = {
|
|
source: source.url,
|
|
reason: 'Network timeout',
|
|
code: 'TIMEOUT',
|
|
};
|
|
|
|
vi.mocked(fetcher.fetch).mockResolvedValue({
|
|
responses: [],
|
|
errors: [error],
|
|
fetchedAt: new Date(),
|
|
});
|
|
|
|
const result = await orchestrator.processFeed(source.id);
|
|
|
|
expect(result.success).toBe(false);
|
|
expect(result.error).toEqual(error);
|
|
expect(result.itemsFound).toBe(0);
|
|
expect(result.itemsNew).toBe(0);
|
|
|
|
// Should have incremented failure count
|
|
expect(storage.updateFeedSourceStatus).toHaveBeenCalledWith(
|
|
source.id,
|
|
expect.objectContaining({ consecutiveFailures: 1 })
|
|
);
|
|
|
|
// Should have delay with backoff
|
|
expect(result.nextPollDelayMs).toBeGreaterThan(source.pollIntervalMs);
|
|
});
|
|
|
|
it('should handle parse errors', async () => {
|
|
const source = createTestFeedSource();
|
|
await storage.saveFeedSource(source);
|
|
|
|
vi.mocked(fetcher.fetch).mockResolvedValue({
|
|
responses: [{
|
|
source: source.url,
|
|
body: 'invalid xml',
|
|
contentType: 'application/rss+xml',
|
|
statusCode: 200,
|
|
}],
|
|
errors: [],
|
|
fetchedAt: new Date(),
|
|
});
|
|
|
|
vi.mocked(parser.parse).mockRejectedValue(new Error('Parse failed'));
|
|
|
|
const result = await orchestrator.processFeed(source.id);
|
|
|
|
expect(result.success).toBe(false);
|
|
expect(result.error?.code).toBe('PARSE');
|
|
});
|
|
|
|
it('should deduplicate items', async () => {
|
|
const source = createTestFeedSource();
|
|
await storage.saveFeedSource(source);
|
|
|
|
const items = [
|
|
createTestFeedItem({ id: 'item-1' }),
|
|
createTestFeedItem({ id: 'item-2' }),
|
|
];
|
|
|
|
vi.mocked(fetcher.fetch).mockResolvedValue({
|
|
responses: [{
|
|
source: source.url,
|
|
body: '<rss></rss>',
|
|
contentType: 'application/rss+xml',
|
|
statusCode: 200,
|
|
}],
|
|
errors: [],
|
|
fetchedAt: new Date(),
|
|
});
|
|
|
|
vi.mocked(parser.parse).mockResolvedValue(items);
|
|
vi.mocked(dedup.filter).mockResolvedValue([items[0]!]); // Only first item is new
|
|
|
|
const result = await orchestrator.processFeed(source.id);
|
|
|
|
expect(result.itemsFound).toBe(2);
|
|
expect(result.itemsNew).toBe(1);
|
|
|
|
// Should only save the new item
|
|
expect(storage.save).toHaveBeenCalledWith([items[0]]);
|
|
expect(dedup.markSeen).toHaveBeenCalledWith([items[0]]);
|
|
});
|
|
});
|
|
|
|
describe('processAllFeeds', () => {
|
|
it('should process all active feeds', async () => {
|
|
const source1 = createTestFeedSource({ id: 'feed-1', url: 'https://example1.com/feed.xml' });
|
|
const source2 = createTestFeedSource({ id: 'feed-2', url: 'https://example2.com/feed.xml', isActive: false });
|
|
|
|
await storage.saveFeedSource(source1);
|
|
await storage.saveFeedSource(source2);
|
|
|
|
vi.mocked(fetcher.fetch).mockResolvedValue({
|
|
responses: [{
|
|
source: 'url',
|
|
body: '<rss></rss>',
|
|
contentType: 'application/rss+xml',
|
|
statusCode: 200,
|
|
}],
|
|
errors: [],
|
|
fetchedAt: new Date(),
|
|
});
|
|
|
|
vi.mocked(parser.parse).mockResolvedValue([createTestFeedItem()]);
|
|
|
|
const results = await orchestrator.processAllFeeds();
|
|
|
|
// Only active feeds should be processed
|
|
expect(results.size).toBe(1);
|
|
expect(results.has('feed-1')).toBe(true);
|
|
expect(results.has('feed-2')).toBe(false);
|
|
});
|
|
});
|
|
|
|
describe('getFeedHealth', () => {
|
|
it('should return health status for all feeds', async () => {
|
|
const healthyFeed = createTestFeedSource({
|
|
id: 'healthy',
|
|
url: 'https://healthy.com/feed.xml',
|
|
consecutiveFailures: 0,
|
|
});
|
|
|
|
const unhealthyFeed = createTestFeedSource({
|
|
id: 'unhealthy',
|
|
url: 'https://unhealthy.com/feed.xml',
|
|
consecutiveFailures: 5, // More than maxRetries (3)
|
|
});
|
|
|
|
await storage.saveFeedSource(healthyFeed);
|
|
await storage.saveFeedSource(unhealthyFeed);
|
|
|
|
const health = await orchestrator.getFeedHealth();
|
|
|
|
expect(health).toHaveLength(2);
|
|
|
|
const healthy = health.find(h => h.sourceId === 'healthy');
|
|
const unhealthy = health.find(h => h.sourceId === 'unhealthy');
|
|
|
|
expect(healthy?.isHealthy).toBe(true);
|
|
expect(unhealthy?.isHealthy).toBe(false);
|
|
});
|
|
});
|
|
});
|
|
|
|
describe('Scheduler utilities', () => {
|
|
describe('calculateNextDelay', () => {
|
|
it('should return base delay when no failures', () => {
|
|
const base = 5000;
|
|
const delay = calculateNextDelay(0, base, 300000);
|
|
// Allow for ±10% jitter (max 5000ms)
|
|
expect(delay).toBeGreaterThanOrEqual(base - 5000);
|
|
expect(delay).toBeLessThanOrEqual(base + 5000);
|
|
});
|
|
|
|
it('should increase delay with failures', () => {
|
|
const base = 5000;
|
|
const delay1 = calculateNextDelay(1, base, 300000);
|
|
const delay2 = calculateNextDelay(2, base, 300000);
|
|
const delay3 = calculateNextDelay(3, base, 300000);
|
|
|
|
// Allow for ±10% jitter (max 5000ms)
|
|
expect(delay1).toBeGreaterThanOrEqual(base * 2 - 5000);
|
|
expect(delay2).toBeGreaterThanOrEqual(base * 4 - 5000);
|
|
expect(delay3).toBeGreaterThanOrEqual(base * 8 - 5000);
|
|
|
|
// Should generally be increasing (with some tolerance for jitter)
|
|
expect(delay1).toBeLessThan(delay3);
|
|
});
|
|
|
|
it('should cap delay at maxDelay', () => {
|
|
const maxDelay = 300000;
|
|
const delay = calculateNextDelay(10, 5000, maxDelay);
|
|
|
|
expect(delay).toBeLessThanOrEqual(maxDelay + 5000); // Max + max jitter
|
|
});
|
|
});
|
|
|
|
describe('calculateJitter', () => {
|
|
it('should return jitter within ±10% of delay', () => {
|
|
const delay = 5000;
|
|
const jitter = calculateJitter(delay);
|
|
|
|
expect(Math.abs(jitter)).toBeLessThanOrEqual(delay * 0.1);
|
|
});
|
|
|
|
it('should cap jitter at 5000ms', () => {
|
|
const largeDelay = 100000;
|
|
const jitter = calculateJitter(largeDelay);
|
|
|
|
expect(Math.abs(jitter)).toBeLessThanOrEqual(5000);
|
|
});
|
|
});
|
|
|
|
describe('createConcurrencyLimit', () => {
|
|
it('should limit concurrent operations', async () => {
|
|
const limit = createConcurrencyLimit(2);
|
|
let running = 0;
|
|
let maxRunning = 0;
|
|
|
|
const promises = Array.from({ length: 5 }, async () => {
|
|
return limit(async () => {
|
|
running++;
|
|
maxRunning = Math.max(maxRunning, running);
|
|
await new Promise(resolve => setTimeout(resolve, 50));
|
|
running--;
|
|
});
|
|
});
|
|
|
|
await Promise.all(promises);
|
|
|
|
expect(maxRunning).toBe(2);
|
|
expect(running).toBe(0);
|
|
});
|
|
|
|
it('should handle errors correctly', async () => {
|
|
const limit = createConcurrencyLimit(2);
|
|
|
|
const promises = [
|
|
limit(async () => 'success'),
|
|
limit(async () => { throw new Error('fail'); }),
|
|
];
|
|
|
|
const results = await Promise.allSettled(promises);
|
|
|
|
expect(results[0]?.status).toBe('fulfilled');
|
|
expect(results[1]?.status).toBe('rejected');
|
|
});
|
|
});
|
|
});
|