Add untracked chart and schema files
This commit is contained in:
541
lib/server/db/financial-ingestion-schema.ts
Normal file
541
lib/server/db/financial-ingestion-schema.ts
Normal file
@@ -0,0 +1,541 @@
|
||||
import type { Database } from 'bun:sqlite';
|
||||
|
||||
export type FinancialSchemaRepairMode = 'auto' | 'check-only' | 'off';
|
||||
export type FinancialIngestionHealthMode = 'healthy' | 'repaired' | 'drifted' | 'failed';
|
||||
|
||||
type CriticalIndexDefinition = {
|
||||
name: string;
|
||||
table: 'company_financial_bundle' | 'filing_taxonomy_snapshot' | 'filing_taxonomy_metric_validation';
|
||||
unique: boolean;
|
||||
columns: string[];
|
||||
createSql: string;
|
||||
};
|
||||
|
||||
type DuplicateRule = {
|
||||
table: 'company_financial_bundle' | 'filing_taxonomy_snapshot' | 'filing_taxonomy_metric_validation';
|
||||
partitionColumns: string[];
|
||||
};
|
||||
|
||||
export type FinancialIngestionIndexStatus = {
|
||||
name: string;
|
||||
table: string;
|
||||
expectedColumns: string[];
|
||||
actualColumns: string[];
|
||||
unique: boolean;
|
||||
present: boolean;
|
||||
healthy: boolean;
|
||||
};
|
||||
|
||||
export type FinancialIngestionDuplicateStatus = {
|
||||
table: string;
|
||||
duplicateGroups: number;
|
||||
duplicateRows: number;
|
||||
};
|
||||
|
||||
export type FinancialIngestionSchemaReport = {
|
||||
ok: boolean;
|
||||
checkedAt: string;
|
||||
indexes: FinancialIngestionIndexStatus[];
|
||||
missingIndexes: string[];
|
||||
duplicateGroups: number;
|
||||
duplicateRows: number;
|
||||
duplicates: FinancialIngestionDuplicateStatus[];
|
||||
};
|
||||
|
||||
export type FinancialIngestionSchemaRepairResult = {
|
||||
attempted: boolean;
|
||||
requestedMode: FinancialSchemaRepairMode;
|
||||
missingIndexesBefore: string[];
|
||||
duplicateGroupsBefore: number;
|
||||
indexesCreated: string[];
|
||||
indexesRecreated: string[];
|
||||
duplicateRowsDeleted: number;
|
||||
duplicateGroupsResolved: number;
|
||||
bundleCacheCleared: boolean;
|
||||
snapshotDuplicateRowsDeleted: number;
|
||||
reportAfter: FinancialIngestionSchemaReport;
|
||||
};
|
||||
|
||||
export type FinancialIngestionSchemaEnsureResult = {
|
||||
ok: boolean;
|
||||
mode: FinancialIngestionHealthMode;
|
||||
requestedMode: FinancialSchemaRepairMode;
|
||||
missingIndexes: string[];
|
||||
duplicateGroups: number;
|
||||
lastCheckedAt: string;
|
||||
repair: FinancialIngestionSchemaRepairResult | null;
|
||||
error: string | null;
|
||||
};
|
||||
|
||||
declare global {
|
||||
// eslint-disable-next-line no-var
|
||||
var __financialIngestionSchemaStatus: FinancialIngestionSchemaEnsureResult | undefined;
|
||||
}
|
||||
|
||||
const CRITICAL_INDEX_DEFINITIONS: CriticalIndexDefinition[] = [
|
||||
{
|
||||
table: 'company_financial_bundle',
|
||||
name: 'company_financial_bundle_uidx',
|
||||
unique: true,
|
||||
columns: ['ticker', 'surface_kind', 'cadence'],
|
||||
createSql: 'CREATE UNIQUE INDEX IF NOT EXISTS `company_financial_bundle_uidx` ON `company_financial_bundle` (`ticker`,`surface_kind`,`cadence`);'
|
||||
},
|
||||
{
|
||||
table: 'company_financial_bundle',
|
||||
name: 'company_financial_bundle_ticker_idx',
|
||||
unique: false,
|
||||
columns: ['ticker', 'updated_at'],
|
||||
createSql: 'CREATE INDEX IF NOT EXISTS `company_financial_bundle_ticker_idx` ON `company_financial_bundle` (`ticker`,`updated_at`);'
|
||||
},
|
||||
{
|
||||
table: 'filing_taxonomy_snapshot',
|
||||
name: 'filing_taxonomy_snapshot_filing_uidx',
|
||||
unique: true,
|
||||
columns: ['filing_id'],
|
||||
createSql: 'CREATE UNIQUE INDEX IF NOT EXISTS `filing_taxonomy_snapshot_filing_uidx` ON `filing_taxonomy_snapshot` (`filing_id`);'
|
||||
},
|
||||
{
|
||||
table: 'filing_taxonomy_snapshot',
|
||||
name: 'filing_taxonomy_snapshot_ticker_date_idx',
|
||||
unique: false,
|
||||
columns: ['ticker', 'filing_date'],
|
||||
createSql: 'CREATE INDEX IF NOT EXISTS `filing_taxonomy_snapshot_ticker_date_idx` ON `filing_taxonomy_snapshot` (`ticker`,`filing_date`);'
|
||||
},
|
||||
{
|
||||
table: 'filing_taxonomy_snapshot',
|
||||
name: 'filing_taxonomy_snapshot_status_idx',
|
||||
unique: false,
|
||||
columns: ['parse_status'],
|
||||
createSql: 'CREATE INDEX IF NOT EXISTS `filing_taxonomy_snapshot_status_idx` ON `filing_taxonomy_snapshot` (`parse_status`);'
|
||||
},
|
||||
{
|
||||
table: 'filing_taxonomy_metric_validation',
|
||||
name: 'filing_taxonomy_metric_validation_uidx',
|
||||
unique: true,
|
||||
columns: ['snapshot_id', 'metric_key'],
|
||||
createSql: 'CREATE UNIQUE INDEX IF NOT EXISTS `filing_taxonomy_metric_validation_uidx` ON `filing_taxonomy_metric_validation` (`snapshot_id`,`metric_key`);'
|
||||
},
|
||||
{
|
||||
table: 'filing_taxonomy_metric_validation',
|
||||
name: 'filing_taxonomy_metric_validation_snapshot_idx',
|
||||
unique: false,
|
||||
columns: ['snapshot_id'],
|
||||
createSql: 'CREATE INDEX IF NOT EXISTS `filing_taxonomy_metric_validation_snapshot_idx` ON `filing_taxonomy_metric_validation` (`snapshot_id`);'
|
||||
},
|
||||
{
|
||||
table: 'filing_taxonomy_metric_validation',
|
||||
name: 'filing_taxonomy_metric_validation_status_idx',
|
||||
unique: false,
|
||||
columns: ['snapshot_id', 'status'],
|
||||
createSql: 'CREATE INDEX IF NOT EXISTS `filing_taxonomy_metric_validation_status_idx` ON `filing_taxonomy_metric_validation` (`snapshot_id`,`status`);'
|
||||
}
|
||||
];
|
||||
|
||||
const UNIQUE_DUPLICATE_RULES: DuplicateRule[] = [
|
||||
{
|
||||
table: 'company_financial_bundle',
|
||||
partitionColumns: ['ticker', 'surface_kind', 'cadence']
|
||||
},
|
||||
{
|
||||
table: 'filing_taxonomy_snapshot',
|
||||
partitionColumns: ['filing_id']
|
||||
},
|
||||
{
|
||||
table: 'filing_taxonomy_metric_validation',
|
||||
partitionColumns: ['snapshot_id', 'metric_key']
|
||||
}
|
||||
];
|
||||
|
||||
function hasTable(client: Database, tableName: string) {
|
||||
const row = client
|
||||
.query('SELECT name FROM sqlite_master WHERE type = ? AND name = ? LIMIT 1')
|
||||
.get('table', tableName) as { name: string } | null;
|
||||
|
||||
return row !== null;
|
||||
}
|
||||
|
||||
function nowIso() {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
export function resolveFinancialSchemaRepairMode(value: string | undefined): FinancialSchemaRepairMode {
|
||||
const normalized = value?.trim().toLowerCase();
|
||||
if (normalized === 'check-only') {
|
||||
return 'check-only';
|
||||
}
|
||||
|
||||
if (normalized === 'off') {
|
||||
return 'off';
|
||||
}
|
||||
|
||||
return 'auto';
|
||||
}
|
||||
|
||||
function arrayEqual(left: string[], right: string[]) {
|
||||
return left.length === right.length && left.every((value, index) => value === right[index]);
|
||||
}
|
||||
|
||||
function queryIndexColumns(client: Database, indexName: string) {
|
||||
return (client.query(`PRAGMA index_info(\`${indexName}\`)`).all() as Array<{ seqno: number; name: string }>)
|
||||
.sort((left, right) => left.seqno - right.seqno)
|
||||
.map((row) => row.name);
|
||||
}
|
||||
|
||||
function inspectIndex(client: Database, definition: CriticalIndexDefinition): FinancialIngestionIndexStatus {
|
||||
if (!hasTable(client, definition.table)) {
|
||||
return {
|
||||
name: definition.name,
|
||||
table: definition.table,
|
||||
expectedColumns: [...definition.columns],
|
||||
actualColumns: [],
|
||||
unique: definition.unique,
|
||||
present: false,
|
||||
healthy: false
|
||||
};
|
||||
}
|
||||
|
||||
const indexList = client.query(`PRAGMA index_list(\`${definition.table}\`)`).all() as Array<{
|
||||
name: string;
|
||||
unique: number;
|
||||
}>;
|
||||
const existing = indexList.find((row) => row.name === definition.name) ?? null;
|
||||
const actualColumns = existing ? queryIndexColumns(client, definition.name) : [];
|
||||
const isUnique = existing ? existing.unique === 1 : false;
|
||||
const healthy = Boolean(existing) && isUnique === definition.unique && arrayEqual(actualColumns, definition.columns);
|
||||
|
||||
return {
|
||||
name: definition.name,
|
||||
table: definition.table,
|
||||
expectedColumns: [...definition.columns],
|
||||
actualColumns,
|
||||
unique: definition.unique,
|
||||
present: existing !== null,
|
||||
healthy
|
||||
};
|
||||
}
|
||||
|
||||
function inspectDuplicates(client: Database, rule: DuplicateRule): FinancialIngestionDuplicateStatus {
|
||||
if (!hasTable(client, rule.table)) {
|
||||
return {
|
||||
table: rule.table,
|
||||
duplicateGroups: 0,
|
||||
duplicateRows: 0
|
||||
};
|
||||
}
|
||||
|
||||
const groupBy = rule.partitionColumns.map((column) => `\`${column}\``).join(', ');
|
||||
const row = client.query(`
|
||||
SELECT
|
||||
COUNT(*) AS duplicate_groups,
|
||||
COALESCE(SUM(cnt - 1), 0) AS duplicate_rows
|
||||
FROM (
|
||||
SELECT COUNT(*) AS cnt
|
||||
FROM \`${rule.table}\`
|
||||
GROUP BY ${groupBy}
|
||||
HAVING COUNT(*) > 1
|
||||
)
|
||||
`).get() as { duplicate_groups: number | null; duplicate_rows: number | null } | null;
|
||||
|
||||
return {
|
||||
table: rule.table,
|
||||
duplicateGroups: Number(row?.duplicate_groups ?? 0),
|
||||
duplicateRows: Number(row?.duplicate_rows ?? 0)
|
||||
};
|
||||
}
|
||||
|
||||
export function inspectFinancialIngestionSchema(client: Database): FinancialIngestionSchemaReport {
|
||||
const indexes = CRITICAL_INDEX_DEFINITIONS.map((definition) => inspectIndex(client, definition));
|
||||
const duplicates = UNIQUE_DUPLICATE_RULES.map((rule) => inspectDuplicates(client, rule));
|
||||
const missingIndexes = indexes.filter((index) => !index.healthy).map((index) => index.name);
|
||||
|
||||
return {
|
||||
ok: missingIndexes.length === 0 && duplicates.every((entry) => entry.duplicateGroups === 0),
|
||||
checkedAt: nowIso(),
|
||||
indexes,
|
||||
missingIndexes,
|
||||
duplicateGroups: duplicates.reduce((sum, entry) => sum + entry.duplicateGroups, 0),
|
||||
duplicateRows: duplicates.reduce((sum, entry) => sum + entry.duplicateRows, 0),
|
||||
duplicates
|
||||
};
|
||||
}
|
||||
|
||||
function runTransaction<T>(client: Database, work: () => T) {
|
||||
client.exec('BEGIN IMMEDIATE;');
|
||||
try {
|
||||
const result = work();
|
||||
client.exec('COMMIT;');
|
||||
return result;
|
||||
} catch (error) {
|
||||
client.exec('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
function deleteDuplicateRows(client: Database, rule: DuplicateRule) {
|
||||
if (!hasTable(client, rule.table)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const partitionBy = rule.partitionColumns.map((column) => `\`${column}\``).join(', ');
|
||||
client.exec(`
|
||||
DELETE FROM \`${rule.table}\`
|
||||
WHERE \`id\` IN (
|
||||
WITH ranked AS (
|
||||
SELECT
|
||||
\`id\`,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY ${partitionBy}
|
||||
ORDER BY \`updated_at\` DESC, \`id\` DESC
|
||||
) AS rn
|
||||
FROM \`${rule.table}\`
|
||||
)
|
||||
SELECT \`id\`
|
||||
FROM ranked
|
||||
WHERE rn > 1
|
||||
);
|
||||
`);
|
||||
|
||||
const row = client.query('SELECT changes() AS count').get() as { count: number } | null;
|
||||
return Number(row?.count ?? 0);
|
||||
}
|
||||
|
||||
function clearBundleCache(client: Database) {
|
||||
if (!hasTable(client, 'company_financial_bundle')) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
client.exec('DELETE FROM `company_financial_bundle`;');
|
||||
const row = client.query('SELECT changes() AS count').get() as { count: number } | null;
|
||||
return Number(row?.count ?? 0);
|
||||
}
|
||||
|
||||
function createOrRecreateIndex(client: Database, definition: CriticalIndexDefinition, status: FinancialIngestionIndexStatus) {
|
||||
if (status.present && !status.healthy) {
|
||||
client.exec(`DROP INDEX IF EXISTS \`${definition.name}\`;`);
|
||||
}
|
||||
|
||||
client.exec(definition.createSql);
|
||||
}
|
||||
|
||||
export function repairFinancialIngestionSchema(
|
||||
client: Database,
|
||||
options: {
|
||||
mode?: FinancialSchemaRepairMode;
|
||||
} = {}
|
||||
): FinancialIngestionSchemaRepairResult {
|
||||
const requestedMode = options.mode ?? 'auto';
|
||||
const before = inspectFinancialIngestionSchema(client);
|
||||
|
||||
if (requestedMode !== 'auto' || before.ok) {
|
||||
return {
|
||||
attempted: false,
|
||||
requestedMode,
|
||||
missingIndexesBefore: before.missingIndexes,
|
||||
duplicateGroupsBefore: before.duplicateGroups,
|
||||
indexesCreated: [],
|
||||
indexesRecreated: [],
|
||||
duplicateRowsDeleted: 0,
|
||||
duplicateGroupsResolved: 0,
|
||||
bundleCacheCleared: false,
|
||||
snapshotDuplicateRowsDeleted: 0,
|
||||
reportAfter: before
|
||||
};
|
||||
}
|
||||
|
||||
let duplicateRowsDeleted = 0;
|
||||
let duplicateGroupsResolved = 0;
|
||||
let snapshotDuplicateRowsDeleted = 0;
|
||||
let bundleCacheCleared = false;
|
||||
const indexesCreated: string[] = [];
|
||||
const indexesRecreated: string[] = [];
|
||||
|
||||
const snapshotStatusBefore = before.duplicates.find((entry) => entry.table === 'filing_taxonomy_snapshot');
|
||||
const companyBundleStatusBefore = before.duplicates.find((entry) => entry.table === 'company_financial_bundle');
|
||||
const metricValidationStatusBefore = before.duplicates.find((entry) => entry.table === 'filing_taxonomy_metric_validation');
|
||||
|
||||
if ((snapshotStatusBefore?.duplicateGroups ?? 0) > 0) {
|
||||
snapshotDuplicateRowsDeleted = runTransaction(client, () => {
|
||||
return deleteDuplicateRows(client, UNIQUE_DUPLICATE_RULES.find((rule) => rule.table === 'filing_taxonomy_snapshot')!);
|
||||
});
|
||||
duplicateRowsDeleted += snapshotDuplicateRowsDeleted;
|
||||
duplicateGroupsResolved += snapshotStatusBefore?.duplicateGroups ?? 0;
|
||||
|
||||
if (snapshotDuplicateRowsDeleted > 0) {
|
||||
runTransaction(client, () => {
|
||||
clearBundleCache(client);
|
||||
});
|
||||
bundleCacheCleared = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!bundleCacheCleared && (companyBundleStatusBefore?.duplicateGroups ?? 0) > 0) {
|
||||
const deleted = runTransaction(client, () => {
|
||||
return deleteDuplicateRows(client, UNIQUE_DUPLICATE_RULES.find((rule) => rule.table === 'company_financial_bundle')!);
|
||||
});
|
||||
duplicateRowsDeleted += deleted;
|
||||
duplicateGroupsResolved += companyBundleStatusBefore?.duplicateGroups ?? 0;
|
||||
}
|
||||
|
||||
if ((metricValidationStatusBefore?.duplicateGroups ?? 0) > 0) {
|
||||
const deleted = runTransaction(client, () => {
|
||||
return deleteDuplicateRows(client, UNIQUE_DUPLICATE_RULES.find((rule) => rule.table === 'filing_taxonomy_metric_validation')!);
|
||||
});
|
||||
duplicateRowsDeleted += deleted;
|
||||
duplicateGroupsResolved += metricValidationStatusBefore?.duplicateGroups ?? 0;
|
||||
}
|
||||
|
||||
for (const definition of CRITICAL_INDEX_DEFINITIONS) {
|
||||
const status = before.indexes.find((entry) => entry.name === definition.name);
|
||||
if (!status || status.healthy) {
|
||||
continue;
|
||||
}
|
||||
|
||||
runTransaction(client, () => {
|
||||
createOrRecreateIndex(client, definition, status);
|
||||
});
|
||||
|
||||
if (status.present) {
|
||||
indexesRecreated.push(definition.name);
|
||||
} else {
|
||||
indexesCreated.push(definition.name);
|
||||
}
|
||||
}
|
||||
|
||||
const reportAfter = inspectFinancialIngestionSchema(client);
|
||||
|
||||
return {
|
||||
attempted: true,
|
||||
requestedMode,
|
||||
missingIndexesBefore: before.missingIndexes,
|
||||
duplicateGroupsBefore: before.duplicateGroups,
|
||||
indexesCreated,
|
||||
indexesRecreated,
|
||||
duplicateRowsDeleted,
|
||||
duplicateGroupsResolved,
|
||||
bundleCacheCleared,
|
||||
snapshotDuplicateRowsDeleted,
|
||||
reportAfter
|
||||
};
|
||||
}
|
||||
|
||||
function cacheEnsureResult(result: FinancialIngestionSchemaEnsureResult) {
|
||||
globalThis.__financialIngestionSchemaStatus = result;
|
||||
}
|
||||
|
||||
export function getLatestFinancialIngestionSchemaStatus() {
|
||||
return globalThis.__financialIngestionSchemaStatus ?? null;
|
||||
}
|
||||
|
||||
export function ensureFinancialIngestionSchemaHealthy(
|
||||
client: Database,
|
||||
options: {
|
||||
mode?: FinancialSchemaRepairMode;
|
||||
} = {}
|
||||
): FinancialIngestionSchemaEnsureResult {
|
||||
const requestedMode = options.mode ?? 'auto';
|
||||
|
||||
try {
|
||||
const before = inspectFinancialIngestionSchema(client);
|
||||
if (before.ok) {
|
||||
const result: FinancialIngestionSchemaEnsureResult = {
|
||||
ok: true,
|
||||
mode: 'healthy',
|
||||
requestedMode,
|
||||
missingIndexes: [],
|
||||
duplicateGroups: 0,
|
||||
lastCheckedAt: before.checkedAt,
|
||||
repair: null,
|
||||
error: null
|
||||
};
|
||||
cacheEnsureResult(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
if (requestedMode !== 'auto') {
|
||||
const result: FinancialIngestionSchemaEnsureResult = {
|
||||
ok: false,
|
||||
mode: 'drifted',
|
||||
requestedMode,
|
||||
missingIndexes: before.missingIndexes,
|
||||
duplicateGroups: before.duplicateGroups,
|
||||
lastCheckedAt: before.checkedAt,
|
||||
repair: null,
|
||||
error: null
|
||||
};
|
||||
cacheEnsureResult(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
const repair = repairFinancialIngestionSchema(client, { mode: requestedMode });
|
||||
const finalReport = repair.reportAfter;
|
||||
const result: FinancialIngestionSchemaEnsureResult = {
|
||||
ok: finalReport.ok,
|
||||
mode: finalReport.ok ? 'repaired' : 'failed',
|
||||
requestedMode,
|
||||
missingIndexes: finalReport.missingIndexes,
|
||||
duplicateGroups: finalReport.duplicateGroups,
|
||||
lastCheckedAt: finalReport.checkedAt,
|
||||
repair,
|
||||
error: finalReport.ok ? null : 'Financial ingestion schema remains drifted after automatic repair.'
|
||||
};
|
||||
cacheEnsureResult(result);
|
||||
return result;
|
||||
} catch (error) {
|
||||
const result: FinancialIngestionSchemaEnsureResult = {
|
||||
ok: false,
|
||||
mode: 'failed',
|
||||
requestedMode,
|
||||
missingIndexes: [],
|
||||
duplicateGroups: 0,
|
||||
lastCheckedAt: nowIso(),
|
||||
repair: null,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
};
|
||||
cacheEnsureResult(result);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
export function isMissingOnConflictConstraintError(error: unknown) {
|
||||
return error instanceof Error
|
||||
&& error.message.toLowerCase().includes('on conflict clause does not match any primary key or unique constraint');
|
||||
}
|
||||
|
||||
export async function withFinancialIngestionSchemaRetry<T>(input: {
|
||||
client: Database;
|
||||
operation: () => Promise<T>;
|
||||
context: string;
|
||||
}) {
|
||||
try {
|
||||
return await input.operation();
|
||||
} catch (error) {
|
||||
if (!isMissingOnConflictConstraintError(error)) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
const ensureResult = ensureFinancialIngestionSchemaHealthy(input.client, {
|
||||
mode: resolveFinancialSchemaRepairMode(process.env.FINANCIAL_SCHEMA_REPAIR_MODE)
|
||||
});
|
||||
|
||||
try {
|
||||
return await input.operation();
|
||||
} catch (retryError) {
|
||||
const suffix = ensureResult.error
|
||||
? ` repair_error=${ensureResult.error}`
|
||||
: ` missing_indexes=${ensureResult.missingIndexes.join(',') || 'none'} duplicate_groups=${ensureResult.duplicateGroups}`;
|
||||
const reason = retryError instanceof Error ? retryError.message : String(retryError);
|
||||
throw new Error(`[${input.context}] ingestion schema retry failed:${suffix}; cause=${reason}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const __financialIngestionSchemaInternals = {
|
||||
CRITICAL_INDEX_DEFINITIONS,
|
||||
UNIQUE_DUPLICATE_RULES,
|
||||
clearBundleCache,
|
||||
deleteDuplicateRows,
|
||||
hasTable,
|
||||
inspectDuplicates,
|
||||
inspectIndex,
|
||||
runTransaction
|
||||
};
|
||||
Reference in New Issue
Block a user