pulse/modules/fetcher/fetcher.ts

179 lines
5.1 KiB
TypeScript

import { request, type Dispatcher } from 'undici';
import type { FetchInput, FetchResult, FeedResponse, FetchError } from '../../interfaces/feed.types.js';
import type { IFetcher } from '../../interfaces/fetcher.interface.js';
const VALID_RSS_CONTENT_TYPES = [
'application/xml',
'text/xml',
'application/rss+xml',
];
const VALID_ATOM_CONTENT_TYPES = [
'application/xml',
'text/xml',
'application/atom+xml',
];
function getValidTypes(expectedFormat: FetchInput['expectedFormat']): string[] {
return expectedFormat === 'rss' ? VALID_RSS_CONTENT_TYPES : VALID_ATOM_CONTENT_TYPES;
}
function normalizeContentType(contentType: string): string {
return contentType.split(';')[0].trim().toLowerCase();
}
function isValidFormat(expectedFormat: FetchInput['expectedFormat'], contentType: string): boolean {
const normalized = normalizeContentType(contentType);
const validTypes = getValidTypes(expectedFormat);
return validTypes.includes(normalized);
}
function bodyLooksLikeXml(body: string): boolean {
const trimmed = body.trimStart().toLowerCase();
return trimmed.startsWith('<?xml') || trimmed.startsWith('<rss') || trimmed.startsWith('<feed');
}
function classifyError(error: unknown, source: string): FetchError {
const err = error as Error & { code?: string };
const message = err.message ?? String(error);
if (
err.name === 'AbortError' ||
err.code === 'UND_ERR_CONNECT_TIMEOUT' ||
err.code === 'UND_ERR_REQUEST_TIMEOUT' ||
err.code === 'UND_ERR_HEADERS_TIMEOUT'
) {
return { source, reason: message, code: 'TIMEOUT' };
}
const networkCodes = ['ECONNREFUSED', 'ENOTFOUND', 'ETIMEDOUT', 'EAI_AGAIN', 'ECONNRESET', 'EPIPE', 'ENETUNREACH'];
if (
networkCodes.includes(err.code ?? '') ||
networkCodes.some((code) => message.includes(code))
) {
return { source, reason: message, code: 'NETWORK' };
}
return { source, reason: message, code: 'UNKNOWN' };
}
export interface HttpFetcherOptions {
timeout?: number;
concurrency?: number;
dispatcher?: Dispatcher;
}
export class HttpFetcher implements IFetcher {
private readonly timeout: number;
private readonly concurrency: number;
private readonly dispatcher?: Dispatcher;
constructor(options: HttpFetcherOptions = {}) {
this.timeout = options.timeout ?? 10_000;
this.concurrency = options.concurrency ?? 5;
this.dispatcher = options.dispatcher;
}
async fetch(input: FetchInput): Promise<FetchResult> {
const result: FetchResult = { responses: [], errors: [], fetchedAt: new Date() };
try {
const { statusCode, headers, body } = await request(input.url, {
method: 'GET',
headers: { 'User-Agent': 'Pulse-RSS-Fetcher/1.0' },
signal: AbortSignal.timeout(this.timeout),
dispatcher: this.dispatcher,
});
const responseBody = await body.text();
if (statusCode < 200 || statusCode >= 300) {
result.errors.push({
source: input.url,
reason: `HTTP ${statusCode}`,
code: 'UNKNOWN',
});
return result;
}
const contentType = (headers['content-type'] as string | undefined) ?? '';
if (contentType) {
if (!isValidFormat(input.expectedFormat, contentType)) {
if (!bodyLooksLikeXml(responseBody)) {
result.errors.push({
source: input.url,
reason: `Unexpected content type: ${contentType}`,
code: 'PARSE',
});
return result;
}
}
} else if (!bodyLooksLikeXml(responseBody)) {
result.errors.push({
source: input.url,
reason: 'Response does not appear to be XML',
code: 'PARSE',
});
return result;
}
result.responses.push({
source: input.url,
body: responseBody,
contentType: contentType || 'application/xml',
statusCode,
});
} catch (error) {
result.errors.push(classifyError(error, input.url));
}
return result;
}
async fetchMany(inputs: FetchInput[]): Promise<FetchResult> {
const result: FetchResult = { responses: [], errors: [], fetchedAt: new Date() };
const limit = createConcurrencyLimit(this.concurrency);
const results = await Promise.all(inputs.map((input) => limit(() => this.fetch(input))));
for (const r of results) {
result.responses.push(...r.responses);
result.errors.push(...r.errors);
}
return result;
}
}
function createConcurrencyLimit(concurrency: number) {
const queue: (() => void)[] = [];
let activeCount = 0;
return function <T>(fn: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const run = async () => {
activeCount++;
try {
const result = await fn();
resolve(result);
} catch (error) {
reject(error);
} finally {
activeCount--;
if (queue.length > 0) {
const next = queue.shift()!;
next();
}
}
};
if (activeCount < concurrency) {
run();
} else {
queue.push(run);
}
});
};
}