pulse/cli.ts
Edo Limburg c79eb6d76d Add CLI entry point, RSS content extraction, and image support
Features:
- Add CLI with commands: start, add, remove, list, fetch, status, items
- Auto-detect RSS format when adding feeds
- Auto-run database migrations on startup
- Extract full HTML content from RSS description field (NOS-style feeds)
- Extract image URLs from RSS enclosure tags
- Display images in terminal output with emoji
- Include imageUrl in JSON formatter output

Database:
- Add image_url column to feed_items table
- Update storage layer to persist imageUrl field

Tests:
- Add 10 CLI integration tests
- Add 3 RSS parser tests for image/content extraction
- Add 2 storage tests for imageUrl persistence

Dependencies:
- Add commander for CLI framework

All 144 tests passing
2026-05-05 23:05:30 +02:00

388 lines
11 KiB
JavaScript
Executable File

#!/usr/bin/env node
/**
* Pulse CLI - RSS Feed Aggregator
*
* Commands:
* pulse start Start the orchestrator (foreground mode)
* pulse add <url> Add a new feed source
* pulse remove <id> Remove a feed source
* pulse list List all feed sources
* pulse fetch One-shot fetch of all active feeds
* pulse status Show feed health status
* pulse items [limit] Show recent items (default: 20)
*/
import { Command } from 'commander';
import { getDatabase, migrate } from './infrastructure/db/index.js';
import { SqlStorage } from './modules/storage/index.js';
import { DatabaseDedup } from './modules/dedup/index.js';
import { HttpFetcher } from './modules/fetcher/index.js';
import { RssParser, AtomParser } from './modules/parser/index.js';
import { FeedOrchestrator } from './orchestrator/index.js';
import { Formatter } from './modules/formatter/formatter.js';
import { request } from 'undici';
import type { FeedSource } from './interfaces/feed.types.js';
const program = new Command();
program
.name('pulse')
.description('RSS feed aggregator')
.version('0.1.0');
// Helper to create wired-up modules
async function createContext() {
const db = getDatabase();
// Auto-run migrations
await migrate(db);
const storage = new SqlStorage(db);
const dedup = new DatabaseDedup(db);
const fetcher = new HttpFetcher();
const parser = {
async parse(rawXml: string, source: string) {
// Try RSS first, then Atom
const rssParser = new RssParser();
const atomParser = new AtomParser();
try {
return await rssParser.parse(rawXml, source);
} catch {
return await atomParser.parse(rawXml, source);
}
},
supports() {
return true;
},
};
return { db, storage, dedup, fetcher, parser };
}
// Helper to generate feed ID from URL
function generateFeedId(url: string): string {
let hash = 5381;
for (let i = 0; i < url.length; i++) {
hash = ((hash << 5) + hash) + url.charCodeAt(i);
}
return (hash >>> 0).toString(16);
}
// Helper to auto-detect feed format
async function detectFeedFormat(url: string): Promise<'rss' | 'atom'> {
try {
const { headers, body } = await request(url, {
method: 'GET',
headers: { 'User-Agent': 'Pulse-RSS-Fetcher/1.0' },
signal: AbortSignal.timeout(10000),
});
const contentType = (headers['content-type'] as string | undefined) ?? '';
const responseBody = await body.text();
// Check content-type header
if (contentType.toLowerCase().includes('atom')) {
return 'atom';
}
// Check body content
const trimmed = responseBody.trim().toLowerCase();
if (trimmed.includes('<feed')) {
return 'atom';
}
return 'rss';
} catch {
// Default to RSS if detection fails
return 'rss';
}
}
// Start command
program
.command('start')
.description('Start the orchestrator (runs in foreground)')
.option('-c, --concurrency <number>', 'Number of concurrent fetches', '5')
.action(async (options) => {
try {
const { db, storage, dedup, fetcher, parser } = await createContext();
const orchestrator = new FeedOrchestrator({
storage,
fetcher,
parser,
dedup,
concurrency: parseInt(options.concurrency, 10),
});
console.log('Starting Pulse orchestrator...');
// Handle graceful shutdown
const shutdown = async () => {
console.log('\nShutting down gracefully...');
await orchestrator.stop();
// Note: Kysely doesn't have a close method, but we should clean up if needed
console.log('Shutdown complete.');
process.exit(0);
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
await orchestrator.start();
console.log('Orchestrator running. Press Ctrl+C to stop.');
// Keep process alive
await new Promise(() => {});
} catch (error) {
console.error('Failed to start orchestrator:', error instanceof Error ? error.message : error);
process.exit(1);
}
});
// Add command
program
.command('add <url>')
.description('Add a new feed source')
.option('-n, --name <name>', 'Display name for the feed')
.option('-f, --format <format>', 'Feed format (rss or atom)', 'auto')
.option('-i, --interval <ms>', 'Poll interval in milliseconds', '300000')
.action(async (url, options) => {
try {
const { storage } = await createContext();
const id = generateFeedId(url);
// Check if already exists
const existing = await storage.getFeedSourceById(id);
if (existing) {
console.error(`Feed already exists: ${url}`);
process.exit(1);
}
// Auto-detect format if needed
let format: 'rss' | 'atom';
if (options.format === 'auto') {
console.log('Detecting feed format...');
format = await detectFeedFormat(url);
console.log(`Detected format: ${format}`);
} else if (options.format === 'rss' || options.format === 'atom') {
format = options.format;
} else {
console.error(`Invalid format: ${options.format}. Use 'rss', 'atom', or 'auto'.`);
process.exit(1);
}
const now = new Date();
const source: FeedSource = {
id,
url,
name: options.name ?? null,
format,
pollIntervalMs: parseInt(options.interval, 10),
isActive: true,
lastFetchedAt: null,
lastSuccessAt: null,
consecutiveFailures: 0,
createdAt: now,
updatedAt: now,
};
await storage.saveFeedSource(source);
console.log(`Added feed: ${url}`);
console.log(` ID: ${id}`);
console.log(` Format: ${format}`);
console.log(` Interval: ${options.interval}ms`);
} catch (error) {
console.error('Failed to add feed:', error instanceof Error ? error.message : error);
process.exit(1);
}
});
// Remove command
program
.command('remove <id>')
.description('Remove a feed source by ID')
.action(async (id) => {
try {
const { storage } = await createContext();
const existing = await storage.getFeedSourceById(id);
if (!existing) {
console.error(`Feed not found: ${id}`);
process.exit(1);
}
await storage.deleteFeedSource(id);
console.log(`Removed feed: ${existing.name ?? existing.url} (${id})`);
} catch (error) {
console.error('Failed to remove feed:', error instanceof Error ? error.message : error);
process.exit(1);
}
});
// List command
program
.command('list')
.description('List all feed sources')
.action(async () => {
try {
const { storage } = await createContext();
const sources = await storage.getFeedSources();
if (sources.length === 0) {
console.log('No feed sources configured.');
return;
}
console.log(`\n ${sources.length} feed source(s):\n`);
sources.forEach((source) => {
const status = source.isActive ? '✓' : '✗';
const health = source.consecutiveFailures > 0 ? ` (${source.consecutiveFailures} failures)` : '';
console.log(` ${status} ${source.name ?? source.url}`);
console.log(` ID: ${source.id}`);
console.log(` URL: ${source.url}`);
console.log(` Format: ${source.format}`);
console.log(` Interval: ${source.pollIntervalMs}ms`);
if (source.lastSuccessAt) {
console.log(` Last success: ${source.lastSuccessAt.toLocaleString()}`);
}
console.log(` ${health}`);
console.log();
});
} catch (error) {
console.error('Failed to list feeds:', error instanceof Error ? error.message : error);
process.exit(1);
}
});
// Fetch command
program
.command('fetch')
.description('One-shot fetch of all active feeds')
.action(async () => {
try {
const { db, storage, dedup, fetcher, parser } = await createContext();
const orchestrator = new FeedOrchestrator({
storage,
fetcher,
parser,
dedup,
});
console.log('Fetching all active feeds...\n');
const results = await orchestrator.processAllFeeds();
let totalFound = 0;
let totalNew = 0;
for (const [sourceId, result] of results) {
const source = await storage.getFeedSourceById(sourceId);
const name = source?.name ?? source?.url ?? sourceId;
if (result.success) {
console.log(`${name}`);
console.log(` Found: ${result.itemsFound} items`);
console.log(` New: ${result.itemsNew} items`);
totalFound += result.itemsFound;
totalNew += result.itemsNew;
} else {
console.log(`${name}`);
console.log(` Error: ${result.error?.reason ?? 'Unknown error'}`);
}
console.log();
}
console.log(`Total: ${totalFound} items found, ${totalNew} new items`);
} catch (error) {
console.error('Failed to fetch feeds:', error instanceof Error ? error.message : error);
process.exit(1);
}
});
// Status command
program
.command('status')
.description('Show feed health status')
.action(async () => {
try {
const { db, storage, dedup, fetcher, parser } = await createContext();
const orchestrator = new FeedOrchestrator({
storage,
fetcher,
parser,
dedup,
});
const health = await orchestrator.getFeedHealth();
if (health.length === 0) {
console.log('No feed sources configured.');
return;
}
console.log(`\n Feed Health Status:\n`);
const healthy = health.filter((h) => h.isHealthy);
const unhealthy = health.filter((h) => !h.isHealthy);
if (healthy.length > 0) {
console.log(` Healthy (${healthy.length}):`);
healthy.forEach((h) => {
console.log(`${h.name ?? h.url}`);
});
console.log();
}
if (unhealthy.length > 0) {
console.log(` Unhealthy (${unhealthy.length}):`);
unhealthy.forEach((h) => {
console.log(`${h.name ?? h.url}`);
console.log(` Failures: ${h.consecutiveFailures}`);
if (h.lastSuccessAt) {
console.log(` Last success: ${h.lastSuccessAt.toLocaleString()}`);
} else {
console.log(` Never successfully fetched`);
}
});
console.log();
}
} catch (error) {
console.error('Failed to get status:', error instanceof Error ? error.message : error);
process.exit(1);
}
});
// Items command
program
.command('items [limit]')
.description('Show recent items (default: 20)')
.action(async (limitStr) => {
try {
const { storage } = await createContext();
const formatter = new Formatter();
const limit = parseInt(limitStr ?? '20', 10);
const items = await storage.getRecent(limit);
if (items.length === 0) {
console.log('No items found.');
return;
}
const output = await formatter.format(items, 'terminal');
console.log(output);
} catch (error) {
console.error('Failed to get items:', error instanceof Error ? error.message : error);
process.exit(1);
}
});
program.parse();