diff --git a/lib/server/repos/filing-taxonomy.ts b/lib/server/repos/filing-taxonomy.ts index 7cf35a0..667fa7c 100644 --- a/lib/server/repos/filing-taxonomy.ts +++ b/lib/server/repos/filing-taxonomy.ts @@ -962,10 +962,8 @@ export async function upsertFilingTaxonomySnapshot(input: UpsertFilingTaxonomySn const now = new Date().toISOString(); const normalized = normalizeFilingTaxonomySnapshotPayload(input); - const [saved] = await withFinancialIngestionSchemaRetry({ - client: getSqliteClient(), - context: 'filing-taxonomy-snapshot-upsert', - operation: async () => await db + return db.transaction(async (tx) => { + const [saved] = await tx .insert(filingTaxonomySnapshot) .values({ filing_id: input.filing_id, @@ -1022,125 +1020,149 @@ export async function upsertFilingTaxonomySnapshot(input: UpsertFilingTaxonomySn updated_at: now } }) - .returning() + .returning(); + + const snapshotId = saved.id; + + try { + await tx.delete(filingTaxonomyAsset).where(eq(filingTaxonomyAsset.snapshot_id, snapshotId)); + await tx.delete(filingTaxonomyContext).where(eq(filingTaxonomyContext.snapshot_id, snapshotId)); + await tx.delete(filingTaxonomyConcept).where(eq(filingTaxonomyConcept.snapshot_id, snapshotId)); + await tx.delete(filingTaxonomyFact).where(eq(filingTaxonomyFact.snapshot_id, snapshotId)); + await tx.delete(filingTaxonomyMetricValidation).where(eq(filingTaxonomyMetricValidation.snapshot_id, snapshotId)); + } catch (error) { + throw new Error(`Failed to delete child records for snapshot ${snapshotId}: ${error}`); + } + + if (input.contexts.length > 0) { + try { + await tx.insert(filingTaxonomyContext).values(input.contexts.map((context) => ({ + snapshot_id: snapshotId, + context_id: context.context_id, + entity_identifier: context.entity_identifier, + entity_scheme: context.entity_scheme, + period_start: context.period_start, + period_end: context.period_end, + period_instant: context.period_instant, + segment_json: context.segment_json, + scenario_json: context.scenario_json, + created_at: now + }))); + } catch (error) { + throw new Error(`Failed to insert ${input.contexts.length} contexts for snapshot ${snapshotId}: ${error}`); + } + } + + if (input.assets.length > 0) { + try { + await tx.insert(filingTaxonomyAsset).values(input.assets.map((asset) => ({ + snapshot_id: snapshotId, + asset_type: asset.asset_type, + name: asset.name, + url: asset.url, + size_bytes: asset.size_bytes, + score: asNumericText(asset.score), + is_selected: asset.is_selected, + created_at: now + }))); + } catch (error) { + throw new Error(`Failed to insert ${input.assets.length} assets for snapshot ${snapshotId}: ${error}`); + } + } + + if (input.concepts.length > 0) { + try { + await tx.insert(filingTaxonomyConcept).values(input.concepts.map((concept) => ({ + snapshot_id: snapshotId, + concept_key: concept.concept_key, + qname: concept.qname, + namespace_uri: concept.namespace_uri, + local_name: concept.local_name, + label: concept.label, + is_extension: concept.is_extension, + balance: concept.balance, + period_type: concept.period_type, + data_type: concept.data_type, + statement_kind: concept.statement_kind, + role_uri: concept.role_uri, + authoritative_concept_key: concept.authoritative_concept_key, + mapping_method: concept.mapping_method, + surface_key: concept.surface_key, + detail_parent_surface_key: concept.detail_parent_surface_key, + kpi_key: concept.kpi_key, + residual_flag: concept.residual_flag, + presentation_order: asNumericText(concept.presentation_order), + presentation_depth: concept.presentation_depth, + parent_concept_key: concept.parent_concept_key, + is_abstract: concept.is_abstract, + created_at: now + }))); + } catch (error) { + throw new Error(`Failed to insert ${input.concepts.length} concepts for snapshot ${snapshotId}: ${error}`); + } + } + + if (input.facts.length > 0) { + try { + await tx.insert(filingTaxonomyFact).values(input.facts.map((fact) => ({ + snapshot_id: snapshotId, + concept_key: fact.concept_key, + qname: fact.qname, + namespace_uri: fact.namespace_uri, + local_name: fact.local_name, + data_type: fact.data_type, + statement_kind: fact.statement_kind, + role_uri: fact.role_uri, + authoritative_concept_key: fact.authoritative_concept_key, + mapping_method: fact.mapping_method, + surface_key: fact.surface_key, + detail_parent_surface_key: fact.detail_parent_surface_key, + kpi_key: fact.kpi_key, + residual_flag: fact.residual_flag, + context_id: fact.context_id, + unit: fact.unit, + decimals: fact.decimals, + precision: fact.precision, + nil: fact.nil, + value_num: String(fact.value_num), + period_start: fact.period_start, + period_end: fact.period_end, + period_instant: fact.period_instant, + dimensions: fact.dimensions, + is_dimensionless: fact.is_dimensionless, + source_file: fact.source_file, + created_at: now + }))); + } catch (error) { + throw new Error(`Failed to insert ${input.facts.length} facts for snapshot ${snapshotId}: ${error}`); + } + } + + if (input.metric_validations.length > 0) { + try { + await tx.insert(filingTaxonomyMetricValidation).values(input.metric_validations.map((check) => ({ + snapshot_id: snapshotId, + metric_key: check.metric_key, + taxonomy_value: asNumericText(check.taxonomy_value), + llm_value: asNumericText(check.llm_value), + absolute_diff: asNumericText(check.absolute_diff), + relative_diff: asNumericText(check.relative_diff), + status: check.status, + evidence_pages: check.evidence_pages, + pdf_url: check.pdf_url, + provider: check.provider, + model: check.model, + error: check.error, + created_at: now, + updated_at: now + }))); + } catch (error) { + throw new Error(`Failed to insert ${input.metric_validations.length} metric validations for snapshot ${snapshotId}: ${error}`); + } + } + + return toSnapshotRecord(saved); }); - - const snapshotId = saved.id; - - await db.delete(filingTaxonomyAsset).where(eq(filingTaxonomyAsset.snapshot_id, snapshotId)); - await db.delete(filingTaxonomyContext).where(eq(filingTaxonomyContext.snapshot_id, snapshotId)); - await db.delete(filingTaxonomyConcept).where(eq(filingTaxonomyConcept.snapshot_id, snapshotId)); - await db.delete(filingTaxonomyFact).where(eq(filingTaxonomyFact.snapshot_id, snapshotId)); - await db.delete(filingTaxonomyMetricValidation).where(eq(filingTaxonomyMetricValidation.snapshot_id, snapshotId)); - - if (input.contexts.length > 0) { - await db.insert(filingTaxonomyContext).values(input.contexts.map((context) => ({ - snapshot_id: snapshotId, - context_id: context.context_id, - entity_identifier: context.entity_identifier, - entity_scheme: context.entity_scheme, - period_start: context.period_start, - period_end: context.period_end, - period_instant: context.period_instant, - segment_json: context.segment_json, - scenario_json: context.scenario_json, - created_at: now - }))); - } - - if (input.assets.length > 0) { - await db.insert(filingTaxonomyAsset).values(input.assets.map((asset) => ({ - snapshot_id: snapshotId, - asset_type: asset.asset_type, - name: asset.name, - url: asset.url, - size_bytes: asset.size_bytes, - score: asNumericText(asset.score), - is_selected: asset.is_selected, - created_at: now - }))); - } - - if (input.concepts.length > 0) { - await db.insert(filingTaxonomyConcept).values(input.concepts.map((concept) => ({ - snapshot_id: snapshotId, - concept_key: concept.concept_key, - qname: concept.qname, - namespace_uri: concept.namespace_uri, - local_name: concept.local_name, - label: concept.label, - is_extension: concept.is_extension, - balance: concept.balance, - period_type: concept.period_type, - data_type: concept.data_type, - statement_kind: concept.statement_kind, - role_uri: concept.role_uri, - authoritative_concept_key: concept.authoritative_concept_key, - mapping_method: concept.mapping_method, - surface_key: concept.surface_key, - detail_parent_surface_key: concept.detail_parent_surface_key, - kpi_key: concept.kpi_key, - residual_flag: concept.residual_flag, - presentation_order: asNumericText(concept.presentation_order), - presentation_depth: concept.presentation_depth, - parent_concept_key: concept.parent_concept_key, - is_abstract: concept.is_abstract, - created_at: now - }))); - } - - if (input.facts.length > 0) { - await db.insert(filingTaxonomyFact).values(input.facts.map((fact) => ({ - snapshot_id: snapshotId, - concept_key: fact.concept_key, - qname: fact.qname, - namespace_uri: fact.namespace_uri, - local_name: fact.local_name, - data_type: fact.data_type, - statement_kind: fact.statement_kind, - role_uri: fact.role_uri, - authoritative_concept_key: fact.authoritative_concept_key, - mapping_method: fact.mapping_method, - surface_key: fact.surface_key, - detail_parent_surface_key: fact.detail_parent_surface_key, - kpi_key: fact.kpi_key, - residual_flag: fact.residual_flag, - context_id: fact.context_id, - unit: fact.unit, - decimals: fact.decimals, - precision: fact.precision, - nil: fact.nil, - value_num: String(fact.value_num), - period_start: fact.period_start, - period_end: fact.period_end, - period_instant: fact.period_instant, - dimensions: fact.dimensions, - is_dimensionless: fact.is_dimensionless, - source_file: fact.source_file, - created_at: now - }))); - } - - if (input.metric_validations.length > 0) { - await db.insert(filingTaxonomyMetricValidation).values(input.metric_validations.map((check) => ({ - snapshot_id: snapshotId, - metric_key: check.metric_key, - taxonomy_value: asNumericText(check.taxonomy_value), - llm_value: asNumericText(check.llm_value), - absolute_diff: asNumericText(check.absolute_diff), - relative_diff: asNumericText(check.relative_diff), - status: check.status, - evidence_pages: check.evidence_pages, - pdf_url: check.pdf_url, - provider: check.provider, - model: check.model, - error: check.error, - created_at: now, - updated_at: now - }))); - } - - return toSnapshotRecord(saved); } export async function listFilingTaxonomySnapshotsByTicker(input: { diff --git a/lib/server/taxonomy/engine.test.ts b/lib/server/taxonomy/engine.test.ts index 8821156..8086be4 100644 --- a/lib/server/taxonomy/engine.test.ts +++ b/lib/server/taxonomy/engine.test.ts @@ -49,6 +49,9 @@ function createHydrationResult(): TaxonomyHydrationResult { unmapped_row_count: 0, material_unmapped_row_count: 0, warnings: ['rust_warning'] + }, + xbrl_validation: { + status: 'passed' } }; } diff --git a/lib/server/taxonomy/parser-client.ts b/lib/server/taxonomy/parser-client.ts index e55c914..d1fdcbc 100644 --- a/lib/server/taxonomy/parser-client.ts +++ b/lib/server/taxonomy/parser-client.ts @@ -1,6 +1,7 @@ import { existsSync } from 'node:fs'; import { join } from 'node:path'; import type { TaxonomyHydrationInput, TaxonomyHydrationResult } from '@/lib/server/taxonomy/types'; +import { withRetry } from '@/lib/server/utils/retry'; function candidateBinaryPaths() { return [ @@ -23,6 +24,10 @@ export function resolveFiscalXbrlBinary() { export async function hydrateFilingTaxonomySnapshotFromSidecar( input: TaxonomyHydrationInput ): Promise { + return withRetry(() => hydrateFromSidecarImpl(input)); +} + +async function hydrateFromSidecarImpl(input: TaxonomyHydrationInput): Promise { const binary = resolveFiscalXbrlBinary(); const timeoutMs = Math.max(Number(process.env.XBRL_ENGINE_TIMEOUT_MS ?? 45_000), 1_000); const command = [binary, 'hydrate-filing']; diff --git a/lib/server/taxonomy/types.ts b/lib/server/taxonomy/types.ts index b37e81c..16d0857 100644 --- a/lib/server/taxonomy/types.ts +++ b/lib/server/taxonomy/types.ts @@ -204,6 +204,11 @@ export type TaxonomyHydrationNormalizationSummary = { warnings: string[]; }; +export type XbrlValidationResult = { + status: 'passed' | 'warning' | 'error'; + message?: string; +}; + export type TaxonomyHydrationInput = { filingId: number; ticker: string; @@ -279,4 +284,5 @@ export type TaxonomyHydrationResult = { }>; metric_validations: TaxonomyMetricValidationCheck[]; normalization_summary: TaxonomyHydrationNormalizationSummary; + xbrl_validation: XbrlValidationResult; }; diff --git a/lib/server/utils/index.ts b/lib/server/utils/index.ts new file mode 100644 index 0000000..7b6f19e --- /dev/null +++ b/lib/server/utils/index.ts @@ -0,0 +1,22 @@ +export { + normalizeTicker, + normalizeTickerOrNull, + normalizeTags, + normalizeTagsOrNull, + normalizeOptionalString, + normalizeRecord, + normalizePositiveInteger, + nowIso, + todayIso +} from './normalize'; + +export { + asRecord, + asOptionalRecord, + asPositiveNumber, + asBoolean, + asStringArray, + asEnum +} from './validation'; + +export { withRetry, type RetryOptions } from './retry'; diff --git a/lib/server/utils/normalize.ts b/lib/server/utils/normalize.ts new file mode 100644 index 0000000..ac6c607 --- /dev/null +++ b/lib/server/utils/normalize.ts @@ -0,0 +1,51 @@ +export function normalizeTicker(ticker: string): string { + return ticker.trim().toUpperCase(); +} + +export function normalizeTickerOrNull(ticker: unknown): string | null { + if (typeof ticker !== 'string') return null; + const normalized = ticker.trim().toUpperCase(); + return normalized || null; +} + +export function normalizeTags(tags?: unknown): string[] { + if (!Array.isArray(tags)) return []; + + const unique = new Set(); + for (const entry of tags) { + if (typeof entry !== 'string') continue; + const tag = entry.trim(); + if (tag) unique.add(tag); + } + return [...unique]; +} + +export function normalizeTagsOrNull(tags?: unknown): string[] | null { + const result = normalizeTags(tags); + return result.length > 0 ? result : null; +} + +export function normalizeOptionalString(value?: unknown): string | null { + if (typeof value !== 'string') return null; + const normalized = value.trim(); + return normalized || null; +} + +export function normalizeRecord(value?: unknown): Record | null { + if (!value || typeof value !== 'object' || Array.isArray(value)) return null; + return value as Record; +} + +export function normalizePositiveInteger(value?: unknown): number | null { + if (value === null || value === undefined || !Number.isFinite(value as number)) return null; + const normalized = Math.trunc(value as number); + return normalized > 0 ? normalized : null; +} + +export function nowIso(): string { + return new Date().toISOString(); +} + +export function todayIso(): string { + return new Date().toISOString().slice(0, 10); +} diff --git a/lib/server/utils/retry.ts b/lib/server/utils/retry.ts new file mode 100644 index 0000000..11ed175 --- /dev/null +++ b/lib/server/utils/retry.ts @@ -0,0 +1,59 @@ +export interface RetryOptions { + maxRetries: number; + baseDelayMs: number; + maxDelayMs: number; + jitterFactor: number; + retryableErrors: RegExp[]; +} + +const DEFAULT_RETRY_OPTIONS: RetryOptions = { + maxRetries: 3, + baseDelayMs: 2000, + maxDelayMs: 10000, + jitterFactor: 0.3, + retryableErrors: [ + /timeout/i, + /ECONNRESET/, + /ETIMEDOUT/, + /ENOTFOUND/, + /exit code 1/, + /signal/, + /killed/ + ] +}; + +export async function withRetry( + fn: () => Promise, + options?: Partial +): Promise { + const opts = { ...DEFAULT_RETRY_OPTIONS, ...options }; + let lastError: Error | null = null; + + for (let attempt = 0; attempt < opts.maxRetries; attempt++) { + try { + return await fn(); + } catch (error) { + lastError = error instanceof Error ? error : new Error(String(error)); + + const isRetryable = opts.retryableErrors.some( + (pattern) => pattern.test(lastError!.message) + ); + + if (!isRetryable || attempt === opts.maxRetries - 1) { + throw lastError; + } + + const baseDelay = opts.baseDelayMs * Math.pow(2, attempt); + const jitter = Math.random() * opts.jitterFactor * baseDelay; + const delay = Math.min(baseDelay + jitter, opts.maxDelayMs); + + console.warn( + `[retry] Attempt ${attempt + 1}/${opts.maxRetries} failed, retrying in ${Math.round(delay)}ms: ${lastError.message}` + ); + + await Bun.sleep(delay); + } + } + + throw lastError; +} diff --git a/lib/server/utils/validation.ts b/lib/server/utils/validation.ts new file mode 100644 index 0000000..08c2d64 --- /dev/null +++ b/lib/server/utils/validation.ts @@ -0,0 +1,56 @@ +export function asRecord(value: unknown): Record { + if (!value || typeof value !== 'object' || Array.isArray(value)) { + return {}; + } + return value as Record; +} + +export function asOptionalRecord(value: unknown): Record | null { + if (!value || typeof value !== 'object' || Array.isArray(value)) { + return null; + } + return value as Record; +} + +export function asPositiveNumber(value: unknown): number | null { + const parsed = typeof value === 'number' ? value : Number(value); + return Number.isFinite(parsed) && parsed > 0 ? parsed : null; +} + +export function asBoolean(value: unknown, fallback = false): boolean { + if (typeof value === 'boolean') { + return value; + } + + if (typeof value === 'string') { + const normalized = value.trim().toLowerCase(); + if (normalized === 'true' || normalized === '1' || normalized === 'yes') { + return true; + } + if (normalized === 'false' || normalized === '0' || normalized === 'no') { + return false; + } + } + + return fallback; +} + +export function asStringArray(value: unknown): string[] { + const source = Array.isArray(value) + ? value + : typeof value === 'string' + ? value.split(',') + : []; + + const unique = new Set(); + for (const entry of source) { + if (typeof entry !== 'string') continue; + const tag = entry.trim(); + if (tag) unique.add(tag); + } + return [...unique]; +} + +export function asEnum(value: unknown, allowed: readonly T[]): T | undefined { + return allowed.includes(value as T) ? (value as T) : undefined; +} diff --git a/rust/fiscal-xbrl-core/src/lib.rs b/rust/fiscal-xbrl-core/src/lib.rs index 7219ac3..f163946 100644 --- a/rust/fiscal-xbrl-core/src/lib.rs +++ b/rust/fiscal-xbrl-core/src/lib.rs @@ -4,6 +4,8 @@ use regex::Regex; use reqwest::blocking::Client; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::Mutex; +use std::time::{Duration, Instant}; mod kpi_mapper; mod metrics; @@ -20,6 +22,39 @@ use crabrl as _; pub const PARSER_ENGINE: &str = "fiscal-xbrl"; pub const PARSER_VERSION: &str = env!("CARGO_PKG_VERSION"); +const DEFAULT_SEC_RATE_LIMIT_MS: u64 = 100; +const HTTP_TIMEOUT_SECS: u64 = 30; + +static RATE_LIMITER: Lazy> = Lazy::new(|| Mutex::new(Instant::now())); + +fn sec_rate_limit_delay() -> u64 { + std::env::var("SEC_RATE_LIMIT_MS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_SEC_RATE_LIMIT_MS) +} + +fn rate_limited_fetch(fetch_fn: F) -> Result +where + F: FnOnce() -> Result, +{ + let delay_ms = sec_rate_limit_delay(); + + { + let mut last_request = RATE_LIMITER.lock().unwrap(); + let elapsed = last_request.elapsed(); + let min_delay = Duration::from_millis(delay_ms); + + if elapsed < min_delay { + std::thread::sleep(min_delay - elapsed); + } + + *last_request = Instant::now(); + } + + fetch_fn() +} + static CONTEXT_RE: Lazy = Lazy::new(|| { Regex::new(r#"(?is)<(?:[a-z0-9_\-]+:)?context\b[^>]*\bid=["']([^"']+)["'][^>]*>(.*?)"#).unwrap() }); @@ -118,6 +153,7 @@ pub struct HydrateFilingResponse { pub contexts: Vec, pub derived_metrics: FilingMetrics, pub validation_result: ValidationResultOutput, + pub xbrl_validation: XbrlValidationResult, pub facts_count: usize, pub concepts_count: usize, pub dimensions_count: usize, @@ -481,6 +517,7 @@ struct PresentationNode { pub fn hydrate_filing(input: HydrateFilingRequest) -> Result { let client = Client::builder() .user_agent("Fiscal Clone ") + .timeout(Duration::from_secs(120)) .build() .context("unable to build HTTP client")?; @@ -521,7 +558,11 @@ pub fn hydrate_filing(input: HydrateFilingRequest) -> Result Result Result) -> Option { } fn fetch_text(client: &Client, url: &str) -> Result { - let response = client - .get(url) - .send() - .with_context(|| format!("request failed for {url}"))?; - if !response.status().is_success() { - return Err(anyhow!("request failed for {url} ({})", response.status())); - } - response - .text() - .with_context(|| format!("unable to read response body for {url}")) + rate_limited_fetch(|| { + let response = client + .get(url) + .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS)) + .send() + .with_context(|| format!("request failed for {url}"))?; + if !response.status().is_success() { + return Err(anyhow!("request failed for {url} ({})", response.status())); + } + response + .text() + .with_context(|| format!("unable to read response body for {url}")) + }) } fn fetch_json Deserialize<'de>>(client: &Client, url: &str) -> Result { - let response = client - .get(url) - .send() - .with_context(|| format!("request failed for {url}"))?; - if !response.status().is_success() { - return Err(anyhow!("request failed for {url} ({})", response.status())); + rate_limited_fetch(|| { + let response = client + .get(url) + .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS)) + .send() + .with_context(|| format!("request failed for {url}"))?; + if !response.status().is_success() { + return Err(anyhow!("request failed for {url} ({})", response.status())); + } + response + .json::() + .with_context(|| format!("unable to parse JSON response for {url}")) + }) +} + +#[derive(Debug, Clone, Serialize)] +pub struct XbrlValidationResult { + pub status: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option, +} + +fn validate_xbrl_structure(xml: &str, source_file: Option<&str>) -> XbrlValidationResult { + if xml.is_empty() { + return XbrlValidationResult { + status: "error".to_string(), + message: Some("XBRL content is empty".to_string()), + }; + } + + if !xml.contains("').count(); + + if open_count != close_count { + return XbrlValidationResult { + status: "warning".to_string(), + message: Some(format!( + "Malformed XML detected in {:?} ({} open, {} close tags)", + source_file.unwrap_or("unknown"), + open_count, + close_count + )), + }; + } + + XbrlValidationResult { + status: "passed".to_string(), + message: None, } - response - .json::() - .with_context(|| format!("unable to parse JSON response for {url}")) } struct ParsedInstance {