pulse/modules/storage/storage.ts
Edo Limburg c79eb6d76d Add CLI entry point, RSS content extraction, and image support
Features:
- Add CLI with commands: start, add, remove, list, fetch, status, items
- Auto-detect RSS format when adding feeds
- Auto-run database migrations on startup
- Extract full HTML content from RSS description field (NOS-style feeds)
- Extract image URLs from RSS enclosure tags
- Display images in terminal output with emoji
- Include imageUrl in JSON formatter output

Database:
- Add image_url column to feed_items table
- Update storage layer to persist imageUrl field

Tests:
- Add 10 CLI integration tests
- Add 3 RSS parser tests for image/content extraction
- Add 2 storage tests for imageUrl persistence

Dependencies:
- Add commander for CLI framework

All 144 tests passing
2026-05-05 23:05:30 +02:00

216 lines
6.4 KiB
TypeScript

/**
* Storage module implementation.
* Persists FeedItems to database using Kysely.
*/
import type { Kysely } from 'kysely';
import type { FeedItem, FeedSource } from '../../interfaces/feed.types.js';
import type { IStorage } from '../../interfaces/storage.interface.js';
import type { Database, FeedItemTable, FeedSourceTable } from '../../infrastructure/db/database.js';
export class SqlStorage implements IStorage {
private readonly db: Kysely<Database>;
constructor(db: Kysely<Database>) {
this.db = db;
}
async save(items: FeedItem[]): Promise<void> {
if (items.length === 0) {
return;
}
const rows: FeedItemTable[] = items.map((item) => ({
id: item.id,
source: item.source,
title: item.title,
url: item.url,
published_at: item.publishedAt.toISOString(),
content: item.content ?? null,
summary: item.summary ?? null,
image_url: item.imageUrl ?? null,
created_at: new Date().toISOString(),
}));
// Upsert: insert or update on conflict
await this.db
.insertInto('feed_items')
.values(rows)
.onConflict((oc) =>
oc.column('id').doUpdateSet({
title: (eb) => eb.ref('excluded.title'),
content: (eb) => eb.ref('excluded.content'),
summary: (eb) => eb.ref('excluded.summary'),
image_url: (eb) => eb.ref('excluded.image_url'),
})
)
.execute();
}
async getRecent(limit: number): Promise<FeedItem[]> {
const rows = await this.db
.selectFrom('feed_items')
.selectAll()
.orderBy('published_at', 'desc')
.limit(limit)
.execute();
return rows.map(this.rowToFeedItem);
}
async getBySource(source: string, limit: number): Promise<FeedItem[]> {
const rows = await this.db
.selectFrom('feed_items')
.selectAll()
.where('source', '=', source)
.orderBy('published_at', 'desc')
.limit(limit)
.execute();
return rows.map(this.rowToFeedItem);
}
async search(query: string): Promise<FeedItem[]> {
const searchTerm = `%${query}%`;
const rows = await this.db
.selectFrom('feed_items')
.selectAll()
.where((eb) =>
eb.or([
eb('title', 'like', searchTerm),
eb('summary', 'like', searchTerm),
eb('content', 'like', searchTerm),
])
)
.orderBy('published_at', 'desc')
.execute();
return rows.map(this.rowToFeedItem);
}
private rowToFeedItem(row: FeedItemTable): FeedItem {
return {
id: row.id,
source: row.source,
title: row.title,
url: row.url,
publishedAt: new Date(row.published_at),
content: row.content ?? undefined,
summary: row.summary ?? undefined,
imageUrl: row.image_url ?? undefined,
};
}
// Feed source management methods
async getFeedSources(activeOnly?: boolean): Promise<FeedSource[]> {
let query = this.db.selectFrom('feed_sources').selectAll();
if (activeOnly) {
query = query.where('is_active', '=', 1); // Use 1 for SQLite boolean
}
const rows = await query.orderBy('created_at', 'asc').execute();
return rows.map(this.rowToFeedSource);
}
async getFeedSourceById(id: string): Promise<FeedSource | null> {
const row = await this.db
.selectFrom('feed_sources')
.selectAll()
.where('id', '=', id)
.executeTakeFirst();
return row ? this.rowToFeedSource(row) : null;
}
async saveFeedSource(source: FeedSource): Promise<void> {
const now = new Date().toISOString();
const row: FeedSourceTable = {
id: source.id,
url: source.url,
name: source.name,
format: source.format,
poll_interval_ms: source.pollIntervalMs,
is_active: source.isActive ? 1 : 0, // Convert boolean to integer for SQLite
last_fetched_at: source.lastFetchedAt?.toISOString() ?? null,
last_success_at: source.lastSuccessAt?.toISOString() ?? null,
consecutive_failures: source.consecutiveFailures,
created_at: source.createdAt.toISOString(),
updated_at: now,
};
// Upsert: insert or update on conflict
await this.db
.insertInto('feed_sources')
.values(row)
.onConflict((oc) =>
oc.column('id').doUpdateSet({
name: (eb) => eb.ref('excluded.name'),
format: (eb) => eb.ref('excluded.format'),
poll_interval_ms: (eb) => eb.ref('excluded.poll_interval_ms'),
is_active: (eb) => eb.ref('excluded.is_active'),
last_fetched_at: (eb) => eb.ref('excluded.last_fetched_at'),
last_success_at: (eb) => eb.ref('excluded.last_success_at'),
consecutive_failures: (eb) => eb.ref('excluded.consecutive_failures'),
updated_at: (eb) => eb.ref('excluded.updated_at'),
})
)
.execute();
}
async updateFeedSourceStatus(
id: string,
updates: {
lastFetchedAt?: Date;
lastSuccessAt?: Date;
consecutiveFailures?: number;
isActive?: boolean;
}
): Promise<void> {
const setClause: Partial<FeedSourceTable> = {
updated_at: new Date().toISOString(),
};
if (updates.lastFetchedAt !== undefined) {
setClause.last_fetched_at = updates.lastFetchedAt.toISOString();
}
if (updates.lastSuccessAt !== undefined) {
setClause.last_success_at = updates.lastSuccessAt?.toISOString() ?? null;
}
if (updates.consecutiveFailures !== undefined) {
setClause.consecutive_failures = updates.consecutiveFailures;
}
if (updates.isActive !== undefined) {
setClause.is_active = updates.isActive ? 1 : 0; // Convert boolean to integer for SQLite
}
await this.db
.updateTable('feed_sources')
.set(setClause)
.where('id', '=', id)
.execute();
}
async deleteFeedSource(id: string): Promise<void> {
await this.db.deleteFrom('feed_sources').where('id', '=', id).execute();
}
private rowToFeedSource(row: FeedSourceTable): FeedSource {
return {
id: row.id,
url: row.url,
name: row.name,
format: row.format,
pollIntervalMs: row.poll_interval_ms,
isActive: Boolean(row.is_active), // Convert integer to boolean
lastFetchedAt: row.last_fetched_at ? new Date(row.last_fetched_at) : null,
lastSuccessAt: row.last_success_at ? new Date(row.last_success_at) : null,
consecutiveFailures: row.consecutive_failures,
createdAt: new Date(row.created_at),
updatedAt: new Date(row.updated_at),
};
}
}