import { request, type Dispatcher } from 'undici'; import type { FetchInput, FetchResult, FeedResponse, FetchError } from '../../interfaces/feed.types.js'; import type { IFetcher } from '../../interfaces/fetcher.interface.js'; const VALID_RSS_CONTENT_TYPES = [ 'application/xml', 'text/xml', 'application/rss+xml', ]; const VALID_ATOM_CONTENT_TYPES = [ 'application/xml', 'text/xml', 'application/atom+xml', ]; function getValidTypes(expectedFormat: FetchInput['expectedFormat']): string[] { return expectedFormat === 'rss' ? VALID_RSS_CONTENT_TYPES : VALID_ATOM_CONTENT_TYPES; } function normalizeContentType(contentType: string): string { return contentType.split(';')[0].trim().toLowerCase(); } function isValidFormat(expectedFormat: FetchInput['expectedFormat'], contentType: string): boolean { const normalized = normalizeContentType(contentType); const validTypes = getValidTypes(expectedFormat); return validTypes.includes(normalized); } function bodyLooksLikeXml(body: string): boolean { const trimmed = body.trimStart().toLowerCase(); return trimmed.startsWith(' message.includes(code)) ) { return { source, reason: message, code: 'NETWORK' }; } return { source, reason: message, code: 'UNKNOWN' }; } export interface HttpFetcherOptions { timeout?: number; concurrency?: number; dispatcher?: Dispatcher; } export class HttpFetcher implements IFetcher { private readonly timeout: number; private readonly concurrency: number; private readonly dispatcher?: Dispatcher; constructor(options: HttpFetcherOptions = {}) { this.timeout = options.timeout ?? 10_000; this.concurrency = options.concurrency ?? 5; this.dispatcher = options.dispatcher; } async fetch(input: FetchInput): Promise { const result: FetchResult = { responses: [], errors: [], fetchedAt: new Date() }; try { const { statusCode, headers, body } = await request(input.url, { method: 'GET', headers: { 'User-Agent': 'Pulse-RSS-Fetcher/1.0' }, signal: AbortSignal.timeout(this.timeout), dispatcher: this.dispatcher, }); const responseBody = await body.text(); if (statusCode < 200 || statusCode >= 300) { result.errors.push({ source: input.url, reason: `HTTP ${statusCode}`, code: 'UNKNOWN', }); return result; } const contentType = (headers['content-type'] as string | undefined) ?? ''; if (contentType) { if (!isValidFormat(input.expectedFormat, contentType)) { if (!bodyLooksLikeXml(responseBody)) { result.errors.push({ source: input.url, reason: `Unexpected content type: ${contentType}`, code: 'PARSE', }); return result; } } } else if (!bodyLooksLikeXml(responseBody)) { result.errors.push({ source: input.url, reason: 'Response does not appear to be XML', code: 'PARSE', }); return result; } result.responses.push({ source: input.url, body: responseBody, contentType: contentType || 'application/xml', statusCode, }); } catch (error) { result.errors.push(classifyError(error, input.url)); } return result; } async fetchMany(inputs: FetchInput[]): Promise { const result: FetchResult = { responses: [], errors: [], fetchedAt: new Date() }; const limit = createConcurrencyLimit(this.concurrency); const results = await Promise.all(inputs.map((input) => limit(() => this.fetch(input)))); for (const r of results) { result.responses.push(...r.responses); result.errors.push(...r.errors); } return result; } } function createConcurrencyLimit(concurrency: number) { const queue: (() => void)[] = []; let activeCount = 0; return function (fn: () => Promise): Promise { return new Promise((resolve, reject) => { const run = async () => { activeCount++; try { const result = await fn(); resolve(result); } catch (error) { reject(error); } finally { activeCount--; if (queue.length > 0) { const next = queue.shift()!; next(); } } }; if (activeCount < concurrency) { run(); } else { queue.push(run); } }); }; }