#!/usr/bin/env node /** * Pulse CLI - RSS Feed Aggregator * * Commands: * pulse start Start the orchestrator (foreground mode) * pulse add Add a new feed source * pulse remove Remove a feed source * pulse list List all feed sources * pulse fetch One-shot fetch of all active feeds * pulse status Show feed health status * pulse items [limit] Show recent items (default: 20) */ import { Command } from 'commander'; import { getDatabase, migrate } from './infrastructure/db/index.js'; import { SqlStorage } from './modules/storage/index.js'; import { DatabaseDedup } from './modules/dedup/index.js'; import { HttpFetcher } from './modules/fetcher/index.js'; import { RssParser, AtomParser } from './modules/parser/index.js'; import { FeedOrchestrator } from './orchestrator/index.js'; import { Formatter } from './modules/formatter/formatter.js'; import { request } from 'undici'; import type { FeedSource } from './interfaces/feed.types.js'; const program = new Command(); program .name('pulse') .description('RSS feed aggregator') .version('0.1.0'); // Helper to create wired-up modules async function createContext() { const db = getDatabase(); // Auto-run migrations await migrate(db); const storage = new SqlStorage(db); const dedup = new DatabaseDedup(db); const fetcher = new HttpFetcher(); const parser = { async parse(rawXml: string, source: string) { // Try RSS first, then Atom const rssParser = new RssParser(); const atomParser = new AtomParser(); try { return await rssParser.parse(rawXml, source); } catch { return await atomParser.parse(rawXml, source); } }, supports() { return true; }, }; return { db, storage, dedup, fetcher, parser }; } // Helper to generate feed ID from URL function generateFeedId(url: string): string { let hash = 5381; for (let i = 0; i < url.length; i++) { hash = ((hash << 5) + hash) + url.charCodeAt(i); } return (hash >>> 0).toString(16); } // Helper to auto-detect feed format async function detectFeedFormat(url: string): Promise<'rss' | 'atom'> { try { const { headers, body } = await request(url, { method: 'GET', headers: { 'User-Agent': 'Pulse-RSS-Fetcher/1.0' }, signal: AbortSignal.timeout(10000), }); const contentType = (headers['content-type'] as string | undefined) ?? ''; const responseBody = await body.text(); // Check content-type header if (contentType.toLowerCase().includes('atom')) { return 'atom'; } // Check body content const trimmed = responseBody.trim().toLowerCase(); if (trimmed.includes('', 'Number of concurrent fetches', '5') .action(async (options) => { try { const { db, storage, dedup, fetcher, parser } = await createContext(); const orchestrator = new FeedOrchestrator({ storage, fetcher, parser, dedup, concurrency: parseInt(options.concurrency, 10), }); console.log('Starting Pulse orchestrator...'); // Handle graceful shutdown const shutdown = async () => { console.log('\nShutting down gracefully...'); await orchestrator.stop(); // Note: Kysely doesn't have a close method, but we should clean up if needed console.log('Shutdown complete.'); process.exit(0); }; process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown); await orchestrator.start(); console.log('Orchestrator running. Press Ctrl+C to stop.'); // Keep process alive await new Promise(() => {}); } catch (error) { console.error('Failed to start orchestrator:', error instanceof Error ? error.message : error); process.exit(1); } }); // Add command program .command('add ') .description('Add a new feed source') .option('-n, --name ', 'Display name for the feed') .option('-f, --format ', 'Feed format (rss or atom)', 'auto') .option('-i, --interval ', 'Poll interval in milliseconds', '300000') .action(async (url, options) => { try { const { storage } = await createContext(); const id = generateFeedId(url); // Check if already exists const existing = await storage.getFeedSourceById(id); if (existing) { console.error(`Feed already exists: ${url}`); process.exit(1); } // Auto-detect format if needed let format: 'rss' | 'atom'; if (options.format === 'auto') { console.log('Detecting feed format...'); format = await detectFeedFormat(url); console.log(`Detected format: ${format}`); } else if (options.format === 'rss' || options.format === 'atom') { format = options.format; } else { console.error(`Invalid format: ${options.format}. Use 'rss', 'atom', or 'auto'.`); process.exit(1); } const now = new Date(); const source: FeedSource = { id, url, name: options.name ?? null, format, pollIntervalMs: parseInt(options.interval, 10), isActive: true, lastFetchedAt: null, lastSuccessAt: null, consecutiveFailures: 0, createdAt: now, updatedAt: now, }; await storage.saveFeedSource(source); console.log(`Added feed: ${url}`); console.log(` ID: ${id}`); console.log(` Format: ${format}`); console.log(` Interval: ${options.interval}ms`); } catch (error) { console.error('Failed to add feed:', error instanceof Error ? error.message : error); process.exit(1); } }); // Remove command program .command('remove ') .description('Remove a feed source by ID') .action(async (id) => { try { const { storage } = await createContext(); const existing = await storage.getFeedSourceById(id); if (!existing) { console.error(`Feed not found: ${id}`); process.exit(1); } await storage.deleteFeedSource(id); console.log(`Removed feed: ${existing.name ?? existing.url} (${id})`); } catch (error) { console.error('Failed to remove feed:', error instanceof Error ? error.message : error); process.exit(1); } }); // List command program .command('list') .description('List all feed sources') .action(async () => { try { const { storage } = await createContext(); const sources = await storage.getFeedSources(); if (sources.length === 0) { console.log('No feed sources configured.'); return; } console.log(`\n ${sources.length} feed source(s):\n`); sources.forEach((source) => { const status = source.isActive ? '✓' : '✗'; const health = source.consecutiveFailures > 0 ? ` (${source.consecutiveFailures} failures)` : ''; console.log(` ${status} ${source.name ?? source.url}`); console.log(` ID: ${source.id}`); console.log(` URL: ${source.url}`); console.log(` Format: ${source.format}`); console.log(` Interval: ${source.pollIntervalMs}ms`); if (source.lastSuccessAt) { console.log(` Last success: ${source.lastSuccessAt.toLocaleString()}`); } console.log(` ${health}`); console.log(); }); } catch (error) { console.error('Failed to list feeds:', error instanceof Error ? error.message : error); process.exit(1); } }); // Fetch command program .command('fetch') .description('One-shot fetch of all active feeds') .action(async () => { try { const { db, storage, dedup, fetcher, parser } = await createContext(); const orchestrator = new FeedOrchestrator({ storage, fetcher, parser, dedup, }); console.log('Fetching all active feeds...\n'); const results = await orchestrator.processAllFeeds(); let totalFound = 0; let totalNew = 0; for (const [sourceId, result] of results) { const source = await storage.getFeedSourceById(sourceId); const name = source?.name ?? source?.url ?? sourceId; if (result.success) { console.log(`✓ ${name}`); console.log(` Found: ${result.itemsFound} items`); console.log(` New: ${result.itemsNew} items`); totalFound += result.itemsFound; totalNew += result.itemsNew; } else { console.log(`✗ ${name}`); console.log(` Error: ${result.error?.reason ?? 'Unknown error'}`); } console.log(); } console.log(`Total: ${totalFound} items found, ${totalNew} new items`); } catch (error) { console.error('Failed to fetch feeds:', error instanceof Error ? error.message : error); process.exit(1); } }); // Status command program .command('status') .description('Show feed health status') .action(async () => { try { const { db, storage, dedup, fetcher, parser } = await createContext(); const orchestrator = new FeedOrchestrator({ storage, fetcher, parser, dedup, }); const health = await orchestrator.getFeedHealth(); if (health.length === 0) { console.log('No feed sources configured.'); return; } console.log(`\n Feed Health Status:\n`); const healthy = health.filter((h) => h.isHealthy); const unhealthy = health.filter((h) => !h.isHealthy); if (healthy.length > 0) { console.log(` Healthy (${healthy.length}):`); healthy.forEach((h) => { console.log(` ✓ ${h.name ?? h.url}`); }); console.log(); } if (unhealthy.length > 0) { console.log(` Unhealthy (${unhealthy.length}):`); unhealthy.forEach((h) => { console.log(` ✗ ${h.name ?? h.url}`); console.log(` Failures: ${h.consecutiveFailures}`); if (h.lastSuccessAt) { console.log(` Last success: ${h.lastSuccessAt.toLocaleString()}`); } else { console.log(` Never successfully fetched`); } }); console.log(); } } catch (error) { console.error('Failed to get status:', error instanceof Error ? error.message : error); process.exit(1); } }); // Items command program .command('items [limit]') .description('Show recent items (default: 20)') .action(async (limitStr) => { try { const { storage } = await createContext(); const formatter = new Formatter(); const limit = parseInt(limitStr ?? '20', 10); const items = await storage.getRecent(limit); if (items.length === 0) { console.log('No items found.'); return; } const output = await formatter.format(items, 'terminal'); console.log(output); } catch (error) { console.error('Failed to get items:', error instanceof Error ? error.message : error); process.exit(1); } }); program.parse();