Files
Neon-Desk/lib/server/db/financial-ingestion-schema.ts

542 lines
17 KiB
TypeScript

import type { Database } from 'bun:sqlite';
type FinancialSchemaRepairMode = 'auto' | 'check-only' | 'off';
type FinancialIngestionHealthMode = 'healthy' | 'repaired' | 'drifted' | 'failed';
type CriticalIndexDefinition = {
name: string;
table: 'company_financial_bundle' | 'filing_taxonomy_snapshot' | 'filing_taxonomy_metric_validation';
unique: boolean;
columns: string[];
createSql: string;
};
type DuplicateRule = {
table: 'company_financial_bundle' | 'filing_taxonomy_snapshot' | 'filing_taxonomy_metric_validation';
partitionColumns: string[];
};
type FinancialIngestionIndexStatus = {
name: string;
table: string;
expectedColumns: string[];
actualColumns: string[];
unique: boolean;
present: boolean;
healthy: boolean;
};
type FinancialIngestionDuplicateStatus = {
table: string;
duplicateGroups: number;
duplicateRows: number;
};
type FinancialIngestionSchemaReport = {
ok: boolean;
checkedAt: string;
indexes: FinancialIngestionIndexStatus[];
missingIndexes: string[];
duplicateGroups: number;
duplicateRows: number;
duplicates: FinancialIngestionDuplicateStatus[];
};
type FinancialIngestionSchemaRepairResult = {
attempted: boolean;
requestedMode: FinancialSchemaRepairMode;
missingIndexesBefore: string[];
duplicateGroupsBefore: number;
indexesCreated: string[];
indexesRecreated: string[];
duplicateRowsDeleted: number;
duplicateGroupsResolved: number;
bundleCacheCleared: boolean;
snapshotDuplicateRowsDeleted: number;
reportAfter: FinancialIngestionSchemaReport;
};
type FinancialIngestionSchemaEnsureResult = {
ok: boolean;
mode: FinancialIngestionHealthMode;
requestedMode: FinancialSchemaRepairMode;
missingIndexes: string[];
duplicateGroups: number;
lastCheckedAt: string;
repair: FinancialIngestionSchemaRepairResult | null;
error: string | null;
};
declare global {
// eslint-disable-next-line no-var
var __financialIngestionSchemaStatus: FinancialIngestionSchemaEnsureResult | undefined;
}
const CRITICAL_INDEX_DEFINITIONS: CriticalIndexDefinition[] = [
{
table: 'company_financial_bundle',
name: 'company_financial_bundle_uidx',
unique: true,
columns: ['ticker', 'surface_kind', 'cadence'],
createSql: 'CREATE UNIQUE INDEX IF NOT EXISTS `company_financial_bundle_uidx` ON `company_financial_bundle` (`ticker`,`surface_kind`,`cadence`);'
},
{
table: 'company_financial_bundle',
name: 'company_financial_bundle_ticker_idx',
unique: false,
columns: ['ticker', 'updated_at'],
createSql: 'CREATE INDEX IF NOT EXISTS `company_financial_bundle_ticker_idx` ON `company_financial_bundle` (`ticker`,`updated_at`);'
},
{
table: 'filing_taxonomy_snapshot',
name: 'filing_taxonomy_snapshot_filing_uidx',
unique: true,
columns: ['filing_id'],
createSql: 'CREATE UNIQUE INDEX IF NOT EXISTS `filing_taxonomy_snapshot_filing_uidx` ON `filing_taxonomy_snapshot` (`filing_id`);'
},
{
table: 'filing_taxonomy_snapshot',
name: 'filing_taxonomy_snapshot_ticker_date_idx',
unique: false,
columns: ['ticker', 'filing_date'],
createSql: 'CREATE INDEX IF NOT EXISTS `filing_taxonomy_snapshot_ticker_date_idx` ON `filing_taxonomy_snapshot` (`ticker`,`filing_date`);'
},
{
table: 'filing_taxonomy_snapshot',
name: 'filing_taxonomy_snapshot_status_idx',
unique: false,
columns: ['parse_status'],
createSql: 'CREATE INDEX IF NOT EXISTS `filing_taxonomy_snapshot_status_idx` ON `filing_taxonomy_snapshot` (`parse_status`);'
},
{
table: 'filing_taxonomy_metric_validation',
name: 'filing_taxonomy_metric_validation_uidx',
unique: true,
columns: ['snapshot_id', 'metric_key'],
createSql: 'CREATE UNIQUE INDEX IF NOT EXISTS `filing_taxonomy_metric_validation_uidx` ON `filing_taxonomy_metric_validation` (`snapshot_id`,`metric_key`);'
},
{
table: 'filing_taxonomy_metric_validation',
name: 'filing_taxonomy_metric_validation_snapshot_idx',
unique: false,
columns: ['snapshot_id'],
createSql: 'CREATE INDEX IF NOT EXISTS `filing_taxonomy_metric_validation_snapshot_idx` ON `filing_taxonomy_metric_validation` (`snapshot_id`);'
},
{
table: 'filing_taxonomy_metric_validation',
name: 'filing_taxonomy_metric_validation_status_idx',
unique: false,
columns: ['snapshot_id', 'status'],
createSql: 'CREATE INDEX IF NOT EXISTS `filing_taxonomy_metric_validation_status_idx` ON `filing_taxonomy_metric_validation` (`snapshot_id`,`status`);'
}
];
const UNIQUE_DUPLICATE_RULES: DuplicateRule[] = [
{
table: 'company_financial_bundle',
partitionColumns: ['ticker', 'surface_kind', 'cadence']
},
{
table: 'filing_taxonomy_snapshot',
partitionColumns: ['filing_id']
},
{
table: 'filing_taxonomy_metric_validation',
partitionColumns: ['snapshot_id', 'metric_key']
}
];
function hasTable(client: Database, tableName: string) {
const row = client
.query('SELECT name FROM sqlite_master WHERE type = ? AND name = ? LIMIT 1')
.get('table', tableName) as { name: string } | null;
return row !== null;
}
function nowIso() {
return new Date().toISOString();
}
export function resolveFinancialSchemaRepairMode(value: string | undefined): FinancialSchemaRepairMode {
const normalized = value?.trim().toLowerCase();
if (normalized === 'check-only') {
return 'check-only';
}
if (normalized === 'off') {
return 'off';
}
return 'auto';
}
function arrayEqual(left: string[], right: string[]) {
return left.length === right.length && left.every((value, index) => value === right[index]);
}
function queryIndexColumns(client: Database, indexName: string) {
return (client.query(`PRAGMA index_info(\`${indexName}\`)`).all() as Array<{ seqno: number; name: string }>)
.sort((left, right) => left.seqno - right.seqno)
.map((row) => row.name);
}
function inspectIndex(client: Database, definition: CriticalIndexDefinition): FinancialIngestionIndexStatus {
if (!hasTable(client, definition.table)) {
return {
name: definition.name,
table: definition.table,
expectedColumns: [...definition.columns],
actualColumns: [],
unique: definition.unique,
present: false,
healthy: false
};
}
const indexList = client.query(`PRAGMA index_list(\`${definition.table}\`)`).all() as Array<{
name: string;
unique: number;
}>;
const existing = indexList.find((row) => row.name === definition.name) ?? null;
const actualColumns = existing ? queryIndexColumns(client, definition.name) : [];
const isUnique = existing ? existing.unique === 1 : false;
const healthy = Boolean(existing) && isUnique === definition.unique && arrayEqual(actualColumns, definition.columns);
return {
name: definition.name,
table: definition.table,
expectedColumns: [...definition.columns],
actualColumns,
unique: definition.unique,
present: existing !== null,
healthy
};
}
function inspectDuplicates(client: Database, rule: DuplicateRule): FinancialIngestionDuplicateStatus {
if (!hasTable(client, rule.table)) {
return {
table: rule.table,
duplicateGroups: 0,
duplicateRows: 0
};
}
const groupBy = rule.partitionColumns.map((column) => `\`${column}\``).join(', ');
const row = client.query(`
SELECT
COUNT(*) AS duplicate_groups,
COALESCE(SUM(cnt - 1), 0) AS duplicate_rows
FROM (
SELECT COUNT(*) AS cnt
FROM \`${rule.table}\`
GROUP BY ${groupBy}
HAVING COUNT(*) > 1
)
`).get() as { duplicate_groups: number | null; duplicate_rows: number | null } | null;
return {
table: rule.table,
duplicateGroups: Number(row?.duplicate_groups ?? 0),
duplicateRows: Number(row?.duplicate_rows ?? 0)
};
}
export function inspectFinancialIngestionSchema(client: Database): FinancialIngestionSchemaReport {
const indexes = CRITICAL_INDEX_DEFINITIONS.map((definition) => inspectIndex(client, definition));
const duplicates = UNIQUE_DUPLICATE_RULES.map((rule) => inspectDuplicates(client, rule));
const missingIndexes = indexes.filter((index) => !index.healthy).map((index) => index.name);
return {
ok: missingIndexes.length === 0 && duplicates.every((entry) => entry.duplicateGroups === 0),
checkedAt: nowIso(),
indexes,
missingIndexes,
duplicateGroups: duplicates.reduce((sum, entry) => sum + entry.duplicateGroups, 0),
duplicateRows: duplicates.reduce((sum, entry) => sum + entry.duplicateRows, 0),
duplicates
};
}
function runTransaction<T>(client: Database, work: () => T) {
client.exec('BEGIN IMMEDIATE;');
try {
const result = work();
client.exec('COMMIT;');
return result;
} catch (error) {
client.exec('ROLLBACK;');
throw error;
}
}
function deleteDuplicateRows(client: Database, rule: DuplicateRule) {
if (!hasTable(client, rule.table)) {
return 0;
}
const partitionBy = rule.partitionColumns.map((column) => `\`${column}\``).join(', ');
client.exec(`
DELETE FROM \`${rule.table}\`
WHERE \`id\` IN (
WITH ranked AS (
SELECT
\`id\`,
ROW_NUMBER() OVER (
PARTITION BY ${partitionBy}
ORDER BY \`updated_at\` DESC, \`id\` DESC
) AS rn
FROM \`${rule.table}\`
)
SELECT \`id\`
FROM ranked
WHERE rn > 1
);
`);
const row = client.query('SELECT changes() AS count').get() as { count: number } | null;
return Number(row?.count ?? 0);
}
function clearBundleCache(client: Database) {
if (!hasTable(client, 'company_financial_bundle')) {
return 0;
}
client.exec('DELETE FROM `company_financial_bundle`;');
const row = client.query('SELECT changes() AS count').get() as { count: number } | null;
return Number(row?.count ?? 0);
}
function createOrRecreateIndex(client: Database, definition: CriticalIndexDefinition, status: FinancialIngestionIndexStatus) {
if (status.present && !status.healthy) {
client.exec(`DROP INDEX IF EXISTS \`${definition.name}\`;`);
}
client.exec(definition.createSql);
}
function repairFinancialIngestionSchema(
client: Database,
options: {
mode?: FinancialSchemaRepairMode;
} = {}
): FinancialIngestionSchemaRepairResult {
const requestedMode = options.mode ?? 'auto';
const before = inspectFinancialIngestionSchema(client);
if (requestedMode !== 'auto' || before.ok) {
return {
attempted: false,
requestedMode,
missingIndexesBefore: before.missingIndexes,
duplicateGroupsBefore: before.duplicateGroups,
indexesCreated: [],
indexesRecreated: [],
duplicateRowsDeleted: 0,
duplicateGroupsResolved: 0,
bundleCacheCleared: false,
snapshotDuplicateRowsDeleted: 0,
reportAfter: before
};
}
let duplicateRowsDeleted = 0;
let duplicateGroupsResolved = 0;
let snapshotDuplicateRowsDeleted = 0;
let bundleCacheCleared = false;
const indexesCreated: string[] = [];
const indexesRecreated: string[] = [];
const snapshotStatusBefore = before.duplicates.find((entry) => entry.table === 'filing_taxonomy_snapshot');
const companyBundleStatusBefore = before.duplicates.find((entry) => entry.table === 'company_financial_bundle');
const metricValidationStatusBefore = before.duplicates.find((entry) => entry.table === 'filing_taxonomy_metric_validation');
if ((snapshotStatusBefore?.duplicateGroups ?? 0) > 0) {
snapshotDuplicateRowsDeleted = runTransaction(client, () => {
return deleteDuplicateRows(client, UNIQUE_DUPLICATE_RULES.find((rule) => rule.table === 'filing_taxonomy_snapshot')!);
});
duplicateRowsDeleted += snapshotDuplicateRowsDeleted;
duplicateGroupsResolved += snapshotStatusBefore?.duplicateGroups ?? 0;
if (snapshotDuplicateRowsDeleted > 0) {
runTransaction(client, () => {
clearBundleCache(client);
});
bundleCacheCleared = true;
}
}
if (!bundleCacheCleared && (companyBundleStatusBefore?.duplicateGroups ?? 0) > 0) {
const deleted = runTransaction(client, () => {
return deleteDuplicateRows(client, UNIQUE_DUPLICATE_RULES.find((rule) => rule.table === 'company_financial_bundle')!);
});
duplicateRowsDeleted += deleted;
duplicateGroupsResolved += companyBundleStatusBefore?.duplicateGroups ?? 0;
}
if ((metricValidationStatusBefore?.duplicateGroups ?? 0) > 0) {
const deleted = runTransaction(client, () => {
return deleteDuplicateRows(client, UNIQUE_DUPLICATE_RULES.find((rule) => rule.table === 'filing_taxonomy_metric_validation')!);
});
duplicateRowsDeleted += deleted;
duplicateGroupsResolved += metricValidationStatusBefore?.duplicateGroups ?? 0;
}
for (const definition of CRITICAL_INDEX_DEFINITIONS) {
const status = before.indexes.find((entry) => entry.name === definition.name);
if (!status || status.healthy) {
continue;
}
runTransaction(client, () => {
createOrRecreateIndex(client, definition, status);
});
if (status.present) {
indexesRecreated.push(definition.name);
} else {
indexesCreated.push(definition.name);
}
}
const reportAfter = inspectFinancialIngestionSchema(client);
return {
attempted: true,
requestedMode,
missingIndexesBefore: before.missingIndexes,
duplicateGroupsBefore: before.duplicateGroups,
indexesCreated,
indexesRecreated,
duplicateRowsDeleted,
duplicateGroupsResolved,
bundleCacheCleared,
snapshotDuplicateRowsDeleted,
reportAfter
};
}
function cacheEnsureResult(result: FinancialIngestionSchemaEnsureResult) {
globalThis.__financialIngestionSchemaStatus = result;
}
export function getLatestFinancialIngestionSchemaStatus() {
return globalThis.__financialIngestionSchemaStatus ?? null;
}
export function ensureFinancialIngestionSchemaHealthy(
client: Database,
options: {
mode?: FinancialSchemaRepairMode;
} = {}
): FinancialIngestionSchemaEnsureResult {
const requestedMode = options.mode ?? 'auto';
try {
const before = inspectFinancialIngestionSchema(client);
if (before.ok) {
const result: FinancialIngestionSchemaEnsureResult = {
ok: true,
mode: 'healthy',
requestedMode,
missingIndexes: [],
duplicateGroups: 0,
lastCheckedAt: before.checkedAt,
repair: null,
error: null
};
cacheEnsureResult(result);
return result;
}
if (requestedMode !== 'auto') {
const result: FinancialIngestionSchemaEnsureResult = {
ok: false,
mode: 'drifted',
requestedMode,
missingIndexes: before.missingIndexes,
duplicateGroups: before.duplicateGroups,
lastCheckedAt: before.checkedAt,
repair: null,
error: null
};
cacheEnsureResult(result);
return result;
}
const repair = repairFinancialIngestionSchema(client, { mode: requestedMode });
const finalReport = repair.reportAfter;
const result: FinancialIngestionSchemaEnsureResult = {
ok: finalReport.ok,
mode: finalReport.ok ? 'repaired' : 'failed',
requestedMode,
missingIndexes: finalReport.missingIndexes,
duplicateGroups: finalReport.duplicateGroups,
lastCheckedAt: finalReport.checkedAt,
repair,
error: finalReport.ok ? null : 'Financial ingestion schema remains drifted after automatic repair.'
};
cacheEnsureResult(result);
return result;
} catch (error) {
const result: FinancialIngestionSchemaEnsureResult = {
ok: false,
mode: 'failed',
requestedMode,
missingIndexes: [],
duplicateGroups: 0,
lastCheckedAt: nowIso(),
repair: null,
error: error instanceof Error ? error.message : String(error)
};
cacheEnsureResult(result);
return result;
}
}
function isMissingOnConflictConstraintError(error: unknown) {
return error instanceof Error
&& error.message.toLowerCase().includes('on conflict clause does not match any primary key or unique constraint');
}
export async function withFinancialIngestionSchemaRetry<T>(input: {
client: Database;
operation: () => Promise<T>;
context: string;
}) {
try {
return await input.operation();
} catch (error) {
if (!isMissingOnConflictConstraintError(error)) {
throw error;
}
const ensureResult = ensureFinancialIngestionSchemaHealthy(input.client, {
mode: resolveFinancialSchemaRepairMode(process.env.FINANCIAL_SCHEMA_REPAIR_MODE)
});
try {
return await input.operation();
} catch (retryError) {
const suffix = ensureResult.error
? ` repair_error=${ensureResult.error}`
: ` missing_indexes=${ensureResult.missingIndexes.join(',') || 'none'} duplicate_groups=${ensureResult.duplicateGroups}`;
const reason = retryError instanceof Error ? retryError.message : String(retryError);
throw new Error(`[${input.context}] ingestion schema retry failed:${suffix}; cause=${reason}`);
}
}
}
const __financialIngestionSchemaInternals = {
CRITICAL_INDEX_DEFINITIONS,
UNIQUE_DUPLICATE_RULES,
clearBundleCache,
deleteDuplicateRows,
hasTable,
inspectDuplicates,
inspectIndex,
runTransaction
};