179 lines
5.1 KiB
TypeScript
179 lines
5.1 KiB
TypeScript
import { request, type Dispatcher } from 'undici';
|
|
import type { FetchInput, FetchResult, FeedResponse, FetchError } from '../../interfaces/feed.types.js';
|
|
import type { IFetcher } from '../../interfaces/fetcher.interface.js';
|
|
|
|
const VALID_RSS_CONTENT_TYPES = [
|
|
'application/xml',
|
|
'text/xml',
|
|
'application/rss+xml',
|
|
];
|
|
|
|
const VALID_ATOM_CONTENT_TYPES = [
|
|
'application/xml',
|
|
'text/xml',
|
|
'application/atom+xml',
|
|
];
|
|
|
|
function getValidTypes(expectedFormat: FetchInput['expectedFormat']): string[] {
|
|
return expectedFormat === 'rss' ? VALID_RSS_CONTENT_TYPES : VALID_ATOM_CONTENT_TYPES;
|
|
}
|
|
|
|
function normalizeContentType(contentType: string): string {
|
|
return contentType.split(';')[0].trim().toLowerCase();
|
|
}
|
|
|
|
function isValidFormat(expectedFormat: FetchInput['expectedFormat'], contentType: string): boolean {
|
|
const normalized = normalizeContentType(contentType);
|
|
const validTypes = getValidTypes(expectedFormat);
|
|
return validTypes.includes(normalized);
|
|
}
|
|
|
|
function bodyLooksLikeXml(body: string): boolean {
|
|
const trimmed = body.trimStart().toLowerCase();
|
|
return trimmed.startsWith('<?xml') || trimmed.startsWith('<rss') || trimmed.startsWith('<feed');
|
|
}
|
|
|
|
function classifyError(error: unknown, source: string): FetchError {
|
|
const err = error as Error & { code?: string };
|
|
const message = err.message ?? String(error);
|
|
|
|
if (
|
|
err.name === 'AbortError' ||
|
|
err.code === 'UND_ERR_CONNECT_TIMEOUT' ||
|
|
err.code === 'UND_ERR_REQUEST_TIMEOUT' ||
|
|
err.code === 'UND_ERR_HEADERS_TIMEOUT'
|
|
) {
|
|
return { source, reason: message, code: 'TIMEOUT' };
|
|
}
|
|
|
|
const networkCodes = ['ECONNREFUSED', 'ENOTFOUND', 'ETIMEDOUT', 'EAI_AGAIN', 'ECONNRESET', 'EPIPE', 'ENETUNREACH'];
|
|
if (
|
|
networkCodes.includes(err.code ?? '') ||
|
|
networkCodes.some((code) => message.includes(code))
|
|
) {
|
|
return { source, reason: message, code: 'NETWORK' };
|
|
}
|
|
|
|
return { source, reason: message, code: 'UNKNOWN' };
|
|
}
|
|
|
|
export interface HttpFetcherOptions {
|
|
timeout?: number;
|
|
concurrency?: number;
|
|
dispatcher?: Dispatcher;
|
|
}
|
|
|
|
export class HttpFetcher implements IFetcher {
|
|
private readonly timeout: number;
|
|
private readonly concurrency: number;
|
|
private readonly dispatcher?: Dispatcher;
|
|
|
|
constructor(options: HttpFetcherOptions = {}) {
|
|
this.timeout = options.timeout ?? 10_000;
|
|
this.concurrency = options.concurrency ?? 5;
|
|
this.dispatcher = options.dispatcher;
|
|
}
|
|
|
|
async fetch(input: FetchInput): Promise<FetchResult> {
|
|
const result: FetchResult = { responses: [], errors: [], fetchedAt: new Date() };
|
|
|
|
try {
|
|
const { statusCode, headers, body } = await request(input.url, {
|
|
method: 'GET',
|
|
headers: { 'User-Agent': 'Pulse-RSS-Fetcher/1.0' },
|
|
signal: AbortSignal.timeout(this.timeout),
|
|
dispatcher: this.dispatcher,
|
|
});
|
|
|
|
const responseBody = await body.text();
|
|
|
|
if (statusCode < 200 || statusCode >= 300) {
|
|
result.errors.push({
|
|
source: input.url,
|
|
reason: `HTTP ${statusCode}`,
|
|
code: 'UNKNOWN',
|
|
});
|
|
return result;
|
|
}
|
|
|
|
const contentType = (headers['content-type'] as string | undefined) ?? '';
|
|
|
|
if (contentType) {
|
|
if (!isValidFormat(input.expectedFormat, contentType)) {
|
|
if (!bodyLooksLikeXml(responseBody)) {
|
|
result.errors.push({
|
|
source: input.url,
|
|
reason: `Unexpected content type: ${contentType}`,
|
|
code: 'PARSE',
|
|
});
|
|
return result;
|
|
}
|
|
}
|
|
} else if (!bodyLooksLikeXml(responseBody)) {
|
|
result.errors.push({
|
|
source: input.url,
|
|
reason: 'Response does not appear to be XML',
|
|
code: 'PARSE',
|
|
});
|
|
return result;
|
|
}
|
|
|
|
result.responses.push({
|
|
source: input.url,
|
|
body: responseBody,
|
|
contentType: contentType || 'application/xml',
|
|
statusCode,
|
|
});
|
|
} catch (error) {
|
|
result.errors.push(classifyError(error, input.url));
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
async fetchMany(inputs: FetchInput[]): Promise<FetchResult> {
|
|
const result: FetchResult = { responses: [], errors: [], fetchedAt: new Date() };
|
|
const limit = createConcurrencyLimit(this.concurrency);
|
|
|
|
const results = await Promise.all(inputs.map((input) => limit(() => this.fetch(input))));
|
|
|
|
for (const r of results) {
|
|
result.responses.push(...r.responses);
|
|
result.errors.push(...r.errors);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
}
|
|
|
|
function createConcurrencyLimit(concurrency: number) {
|
|
const queue: (() => void)[] = [];
|
|
let activeCount = 0;
|
|
|
|
return function <T>(fn: () => Promise<T>): Promise<T> {
|
|
return new Promise((resolve, reject) => {
|
|
const run = async () => {
|
|
activeCount++;
|
|
try {
|
|
const result = await fn();
|
|
resolve(result);
|
|
} catch (error) {
|
|
reject(error);
|
|
} finally {
|
|
activeCount--;
|
|
if (queue.length > 0) {
|
|
const next = queue.shift()!;
|
|
next();
|
|
}
|
|
}
|
|
};
|
|
|
|
if (activeCount < concurrency) {
|
|
run();
|
|
} else {
|
|
queue.push(run);
|
|
}
|
|
});
|
|
};
|
|
}
|