import type { Database } from 'bun:sqlite'; export type FinancialSchemaRepairMode = 'auto' | 'check-only' | 'off'; export type FinancialIngestionHealthMode = 'healthy' | 'repaired' | 'drifted' | 'failed'; type CriticalIndexDefinition = { name: string; table: 'company_financial_bundle' | 'filing_taxonomy_snapshot' | 'filing_taxonomy_metric_validation'; unique: boolean; columns: string[]; createSql: string; }; type DuplicateRule = { table: 'company_financial_bundle' | 'filing_taxonomy_snapshot' | 'filing_taxonomy_metric_validation'; partitionColumns: string[]; }; export type FinancialIngestionIndexStatus = { name: string; table: string; expectedColumns: string[]; actualColumns: string[]; unique: boolean; present: boolean; healthy: boolean; }; export type FinancialIngestionDuplicateStatus = { table: string; duplicateGroups: number; duplicateRows: number; }; export type FinancialIngestionSchemaReport = { ok: boolean; checkedAt: string; indexes: FinancialIngestionIndexStatus[]; missingIndexes: string[]; duplicateGroups: number; duplicateRows: number; duplicates: FinancialIngestionDuplicateStatus[]; }; export type FinancialIngestionSchemaRepairResult = { attempted: boolean; requestedMode: FinancialSchemaRepairMode; missingIndexesBefore: string[]; duplicateGroupsBefore: number; indexesCreated: string[]; indexesRecreated: string[]; duplicateRowsDeleted: number; duplicateGroupsResolved: number; bundleCacheCleared: boolean; snapshotDuplicateRowsDeleted: number; reportAfter: FinancialIngestionSchemaReport; }; export type FinancialIngestionSchemaEnsureResult = { ok: boolean; mode: FinancialIngestionHealthMode; requestedMode: FinancialSchemaRepairMode; missingIndexes: string[]; duplicateGroups: number; lastCheckedAt: string; repair: FinancialIngestionSchemaRepairResult | null; error: string | null; }; declare global { // eslint-disable-next-line no-var var __financialIngestionSchemaStatus: FinancialIngestionSchemaEnsureResult | undefined; } const CRITICAL_INDEX_DEFINITIONS: CriticalIndexDefinition[] = [ { table: 'company_financial_bundle', name: 'company_financial_bundle_uidx', unique: true, columns: ['ticker', 'surface_kind', 'cadence'], createSql: 'CREATE UNIQUE INDEX IF NOT EXISTS `company_financial_bundle_uidx` ON `company_financial_bundle` (`ticker`,`surface_kind`,`cadence`);' }, { table: 'company_financial_bundle', name: 'company_financial_bundle_ticker_idx', unique: false, columns: ['ticker', 'updated_at'], createSql: 'CREATE INDEX IF NOT EXISTS `company_financial_bundle_ticker_idx` ON `company_financial_bundle` (`ticker`,`updated_at`);' }, { table: 'filing_taxonomy_snapshot', name: 'filing_taxonomy_snapshot_filing_uidx', unique: true, columns: ['filing_id'], createSql: 'CREATE UNIQUE INDEX IF NOT EXISTS `filing_taxonomy_snapshot_filing_uidx` ON `filing_taxonomy_snapshot` (`filing_id`);' }, { table: 'filing_taxonomy_snapshot', name: 'filing_taxonomy_snapshot_ticker_date_idx', unique: false, columns: ['ticker', 'filing_date'], createSql: 'CREATE INDEX IF NOT EXISTS `filing_taxonomy_snapshot_ticker_date_idx` ON `filing_taxonomy_snapshot` (`ticker`,`filing_date`);' }, { table: 'filing_taxonomy_snapshot', name: 'filing_taxonomy_snapshot_status_idx', unique: false, columns: ['parse_status'], createSql: 'CREATE INDEX IF NOT EXISTS `filing_taxonomy_snapshot_status_idx` ON `filing_taxonomy_snapshot` (`parse_status`);' }, { table: 'filing_taxonomy_metric_validation', name: 'filing_taxonomy_metric_validation_uidx', unique: true, columns: ['snapshot_id', 'metric_key'], createSql: 'CREATE UNIQUE INDEX IF NOT EXISTS `filing_taxonomy_metric_validation_uidx` ON `filing_taxonomy_metric_validation` (`snapshot_id`,`metric_key`);' }, { table: 'filing_taxonomy_metric_validation', name: 'filing_taxonomy_metric_validation_snapshot_idx', unique: false, columns: ['snapshot_id'], createSql: 'CREATE INDEX IF NOT EXISTS `filing_taxonomy_metric_validation_snapshot_idx` ON `filing_taxonomy_metric_validation` (`snapshot_id`);' }, { table: 'filing_taxonomy_metric_validation', name: 'filing_taxonomy_metric_validation_status_idx', unique: false, columns: ['snapshot_id', 'status'], createSql: 'CREATE INDEX IF NOT EXISTS `filing_taxonomy_metric_validation_status_idx` ON `filing_taxonomy_metric_validation` (`snapshot_id`,`status`);' } ]; const UNIQUE_DUPLICATE_RULES: DuplicateRule[] = [ { table: 'company_financial_bundle', partitionColumns: ['ticker', 'surface_kind', 'cadence'] }, { table: 'filing_taxonomy_snapshot', partitionColumns: ['filing_id'] }, { table: 'filing_taxonomy_metric_validation', partitionColumns: ['snapshot_id', 'metric_key'] } ]; function hasTable(client: Database, tableName: string) { const row = client .query('SELECT name FROM sqlite_master WHERE type = ? AND name = ? LIMIT 1') .get('table', tableName) as { name: string } | null; return row !== null; } function nowIso() { return new Date().toISOString(); } export function resolveFinancialSchemaRepairMode(value: string | undefined): FinancialSchemaRepairMode { const normalized = value?.trim().toLowerCase(); if (normalized === 'check-only') { return 'check-only'; } if (normalized === 'off') { return 'off'; } return 'auto'; } function arrayEqual(left: string[], right: string[]) { return left.length === right.length && left.every((value, index) => value === right[index]); } function queryIndexColumns(client: Database, indexName: string) { return (client.query(`PRAGMA index_info(\`${indexName}\`)`).all() as Array<{ seqno: number; name: string }>) .sort((left, right) => left.seqno - right.seqno) .map((row) => row.name); } function inspectIndex(client: Database, definition: CriticalIndexDefinition): FinancialIngestionIndexStatus { if (!hasTable(client, definition.table)) { return { name: definition.name, table: definition.table, expectedColumns: [...definition.columns], actualColumns: [], unique: definition.unique, present: false, healthy: false }; } const indexList = client.query(`PRAGMA index_list(\`${definition.table}\`)`).all() as Array<{ name: string; unique: number; }>; const existing = indexList.find((row) => row.name === definition.name) ?? null; const actualColumns = existing ? queryIndexColumns(client, definition.name) : []; const isUnique = existing ? existing.unique === 1 : false; const healthy = Boolean(existing) && isUnique === definition.unique && arrayEqual(actualColumns, definition.columns); return { name: definition.name, table: definition.table, expectedColumns: [...definition.columns], actualColumns, unique: definition.unique, present: existing !== null, healthy }; } function inspectDuplicates(client: Database, rule: DuplicateRule): FinancialIngestionDuplicateStatus { if (!hasTable(client, rule.table)) { return { table: rule.table, duplicateGroups: 0, duplicateRows: 0 }; } const groupBy = rule.partitionColumns.map((column) => `\`${column}\``).join(', '); const row = client.query(` SELECT COUNT(*) AS duplicate_groups, COALESCE(SUM(cnt - 1), 0) AS duplicate_rows FROM ( SELECT COUNT(*) AS cnt FROM \`${rule.table}\` GROUP BY ${groupBy} HAVING COUNT(*) > 1 ) `).get() as { duplicate_groups: number | null; duplicate_rows: number | null } | null; return { table: rule.table, duplicateGroups: Number(row?.duplicate_groups ?? 0), duplicateRows: Number(row?.duplicate_rows ?? 0) }; } export function inspectFinancialIngestionSchema(client: Database): FinancialIngestionSchemaReport { const indexes = CRITICAL_INDEX_DEFINITIONS.map((definition) => inspectIndex(client, definition)); const duplicates = UNIQUE_DUPLICATE_RULES.map((rule) => inspectDuplicates(client, rule)); const missingIndexes = indexes.filter((index) => !index.healthy).map((index) => index.name); return { ok: missingIndexes.length === 0 && duplicates.every((entry) => entry.duplicateGroups === 0), checkedAt: nowIso(), indexes, missingIndexes, duplicateGroups: duplicates.reduce((sum, entry) => sum + entry.duplicateGroups, 0), duplicateRows: duplicates.reduce((sum, entry) => sum + entry.duplicateRows, 0), duplicates }; } function runTransaction(client: Database, work: () => T) { client.exec('BEGIN IMMEDIATE;'); try { const result = work(); client.exec('COMMIT;'); return result; } catch (error) { client.exec('ROLLBACK;'); throw error; } } function deleteDuplicateRows(client: Database, rule: DuplicateRule) { if (!hasTable(client, rule.table)) { return 0; } const partitionBy = rule.partitionColumns.map((column) => `\`${column}\``).join(', '); client.exec(` DELETE FROM \`${rule.table}\` WHERE \`id\` IN ( WITH ranked AS ( SELECT \`id\`, ROW_NUMBER() OVER ( PARTITION BY ${partitionBy} ORDER BY \`updated_at\` DESC, \`id\` DESC ) AS rn FROM \`${rule.table}\` ) SELECT \`id\` FROM ranked WHERE rn > 1 ); `); const row = client.query('SELECT changes() AS count').get() as { count: number } | null; return Number(row?.count ?? 0); } function clearBundleCache(client: Database) { if (!hasTable(client, 'company_financial_bundle')) { return 0; } client.exec('DELETE FROM `company_financial_bundle`;'); const row = client.query('SELECT changes() AS count').get() as { count: number } | null; return Number(row?.count ?? 0); } function createOrRecreateIndex(client: Database, definition: CriticalIndexDefinition, status: FinancialIngestionIndexStatus) { if (status.present && !status.healthy) { client.exec(`DROP INDEX IF EXISTS \`${definition.name}\`;`); } client.exec(definition.createSql); } export function repairFinancialIngestionSchema( client: Database, options: { mode?: FinancialSchemaRepairMode; } = {} ): FinancialIngestionSchemaRepairResult { const requestedMode = options.mode ?? 'auto'; const before = inspectFinancialIngestionSchema(client); if (requestedMode !== 'auto' || before.ok) { return { attempted: false, requestedMode, missingIndexesBefore: before.missingIndexes, duplicateGroupsBefore: before.duplicateGroups, indexesCreated: [], indexesRecreated: [], duplicateRowsDeleted: 0, duplicateGroupsResolved: 0, bundleCacheCleared: false, snapshotDuplicateRowsDeleted: 0, reportAfter: before }; } let duplicateRowsDeleted = 0; let duplicateGroupsResolved = 0; let snapshotDuplicateRowsDeleted = 0; let bundleCacheCleared = false; const indexesCreated: string[] = []; const indexesRecreated: string[] = []; const snapshotStatusBefore = before.duplicates.find((entry) => entry.table === 'filing_taxonomy_snapshot'); const companyBundleStatusBefore = before.duplicates.find((entry) => entry.table === 'company_financial_bundle'); const metricValidationStatusBefore = before.duplicates.find((entry) => entry.table === 'filing_taxonomy_metric_validation'); if ((snapshotStatusBefore?.duplicateGroups ?? 0) > 0) { snapshotDuplicateRowsDeleted = runTransaction(client, () => { return deleteDuplicateRows(client, UNIQUE_DUPLICATE_RULES.find((rule) => rule.table === 'filing_taxonomy_snapshot')!); }); duplicateRowsDeleted += snapshotDuplicateRowsDeleted; duplicateGroupsResolved += snapshotStatusBefore?.duplicateGroups ?? 0; if (snapshotDuplicateRowsDeleted > 0) { runTransaction(client, () => { clearBundleCache(client); }); bundleCacheCleared = true; } } if (!bundleCacheCleared && (companyBundleStatusBefore?.duplicateGroups ?? 0) > 0) { const deleted = runTransaction(client, () => { return deleteDuplicateRows(client, UNIQUE_DUPLICATE_RULES.find((rule) => rule.table === 'company_financial_bundle')!); }); duplicateRowsDeleted += deleted; duplicateGroupsResolved += companyBundleStatusBefore?.duplicateGroups ?? 0; } if ((metricValidationStatusBefore?.duplicateGroups ?? 0) > 0) { const deleted = runTransaction(client, () => { return deleteDuplicateRows(client, UNIQUE_DUPLICATE_RULES.find((rule) => rule.table === 'filing_taxonomy_metric_validation')!); }); duplicateRowsDeleted += deleted; duplicateGroupsResolved += metricValidationStatusBefore?.duplicateGroups ?? 0; } for (const definition of CRITICAL_INDEX_DEFINITIONS) { const status = before.indexes.find((entry) => entry.name === definition.name); if (!status || status.healthy) { continue; } runTransaction(client, () => { createOrRecreateIndex(client, definition, status); }); if (status.present) { indexesRecreated.push(definition.name); } else { indexesCreated.push(definition.name); } } const reportAfter = inspectFinancialIngestionSchema(client); return { attempted: true, requestedMode, missingIndexesBefore: before.missingIndexes, duplicateGroupsBefore: before.duplicateGroups, indexesCreated, indexesRecreated, duplicateRowsDeleted, duplicateGroupsResolved, bundleCacheCleared, snapshotDuplicateRowsDeleted, reportAfter }; } function cacheEnsureResult(result: FinancialIngestionSchemaEnsureResult) { globalThis.__financialIngestionSchemaStatus = result; } export function getLatestFinancialIngestionSchemaStatus() { return globalThis.__financialIngestionSchemaStatus ?? null; } export function ensureFinancialIngestionSchemaHealthy( client: Database, options: { mode?: FinancialSchemaRepairMode; } = {} ): FinancialIngestionSchemaEnsureResult { const requestedMode = options.mode ?? 'auto'; try { const before = inspectFinancialIngestionSchema(client); if (before.ok) { const result: FinancialIngestionSchemaEnsureResult = { ok: true, mode: 'healthy', requestedMode, missingIndexes: [], duplicateGroups: 0, lastCheckedAt: before.checkedAt, repair: null, error: null }; cacheEnsureResult(result); return result; } if (requestedMode !== 'auto') { const result: FinancialIngestionSchemaEnsureResult = { ok: false, mode: 'drifted', requestedMode, missingIndexes: before.missingIndexes, duplicateGroups: before.duplicateGroups, lastCheckedAt: before.checkedAt, repair: null, error: null }; cacheEnsureResult(result); return result; } const repair = repairFinancialIngestionSchema(client, { mode: requestedMode }); const finalReport = repair.reportAfter; const result: FinancialIngestionSchemaEnsureResult = { ok: finalReport.ok, mode: finalReport.ok ? 'repaired' : 'failed', requestedMode, missingIndexes: finalReport.missingIndexes, duplicateGroups: finalReport.duplicateGroups, lastCheckedAt: finalReport.checkedAt, repair, error: finalReport.ok ? null : 'Financial ingestion schema remains drifted after automatic repair.' }; cacheEnsureResult(result); return result; } catch (error) { const result: FinancialIngestionSchemaEnsureResult = { ok: false, mode: 'failed', requestedMode, missingIndexes: [], duplicateGroups: 0, lastCheckedAt: nowIso(), repair: null, error: error instanceof Error ? error.message : String(error) }; cacheEnsureResult(result); return result; } } export function isMissingOnConflictConstraintError(error: unknown) { return error instanceof Error && error.message.toLowerCase().includes('on conflict clause does not match any primary key or unique constraint'); } export async function withFinancialIngestionSchemaRetry(input: { client: Database; operation: () => Promise; context: string; }) { try { return await input.operation(); } catch (error) { if (!isMissingOnConflictConstraintError(error)) { throw error; } const ensureResult = ensureFinancialIngestionSchemaHealthy(input.client, { mode: resolveFinancialSchemaRepairMode(process.env.FINANCIAL_SCHEMA_REPAIR_MODE) }); try { return await input.operation(); } catch (retryError) { const suffix = ensureResult.error ? ` repair_error=${ensureResult.error}` : ` missing_indexes=${ensureResult.missingIndexes.join(',') || 'none'} duplicate_groups=${ensureResult.duplicateGroups}`; const reason = retryError instanceof Error ? retryError.message : String(retryError); throw new Error(`[${input.context}] ingestion schema retry failed:${suffix}; cause=${reason}`); } } } export const __financialIngestionSchemaInternals = { CRITICAL_INDEX_DEFINITIONS, UNIQUE_DUPLICATE_RULES, clearBundleCache, deleteDuplicateRows, hasTable, inspectDuplicates, inspectIndex, runTransaction };