Files
Neon-Desk/lib/server/task-processors.ts
francy51 14a7773504 Add consolidated disclosure statement type
Create unified disclosure statement to organize footnote disclosures
separate from primary financial statements. Disclosures are now grouped
by type (tax, debt, securities, derivatives, leases, intangibles, ma,
revenue, cash_flow) in a dedicated statement type for cleaner UI
presentation.
2026-03-16 18:54:23 -04:00

1553 lines
42 KiB
TypeScript

import type {
Filing,
FilingExtraction,
FilingExtractionMeta,
Holding,
Task,
TaskStage,
TaskStageContext,
} from "@/lib/types";
import { runAiAnalysis } from "@/lib/server/ai";
import { buildPortfolioSummary } from "@/lib/server/portfolio";
import { getQuote } from "@/lib/server/prices";
import { indexSearchDocuments } from "@/lib/server/search";
import {
getFilingByAccession,
listFilingsRecords,
saveFilingAnalysis,
updateFilingMetricsById,
upsertFilingsRecords,
} from "@/lib/server/repos/filings";
import { deleteCompanyFinancialBundlesForTicker } from "@/lib/server/repos/company-financial-bundles";
import {
getFilingTaxonomySnapshotByFilingId,
normalizeFilingTaxonomySnapshotPayload,
upsertFilingTaxonomySnapshot,
} from "@/lib/server/repos/filing-taxonomy";
import {
applyRefreshedPrices,
listHoldingsForPriceRefresh,
listUserHoldings,
} from "@/lib/server/repos/holdings";
import { createPortfolioInsight } from "@/lib/server/repos/insights";
import { updateTaskStage } from "@/lib/server/repos/tasks";
import { fetchPrimaryFilingText, fetchRecentFilings } from "@/lib/server/sec";
import { enqueueTask } from "@/lib/server/tasks";
import { hydrateFilingTaxonomySnapshot } from "@/lib/server/taxonomy/engine";
import { validateMetricsWithPdfLlm } from "@/lib/server/taxonomy/pdf-validation";
const EXTRACTION_REQUIRED_KEYS = [
"summary",
"keyPoints",
"redFlags",
"followUpQuestions",
"portfolioSignals",
"segmentSpecificData",
"geographicRevenueBreakdown",
"companySpecificData",
"secApiCrossChecks",
"confidence",
] as const;
const EXTRACTION_MAX_ITEMS = 6;
const EXTRACTION_ITEM_MAX_LENGTH = 280;
const EXTRACTION_SUMMARY_MAX_LENGTH = 900;
const STATEMENT_HYDRATION_DELAY_MS = 120;
const STATEMENT_HYDRATION_MAX_FILINGS = 80;
const SEGMENT_PATTERNS = [
/\boperating segment\b/i,
/\bsegment revenue\b/i,
/\bsegment margin\b/i,
/\bsegment profit\b/i,
/\bbusiness segment\b/i,
/\breportable segment\b/i,
];
const GEOGRAPHIC_PATTERNS = [
/\bgeographic\b/i,
/\bamericas\b/i,
/\bemea\b/i,
/\bapac\b/i,
/\basia pacific\b/i,
/\bnorth america\b/i,
/\beurope\b/i,
/\bchina\b/i,
/\binternational\b/i,
];
const COMPANY_SPECIFIC_PATTERNS = [
/\bsame[- ]store\b/i,
/\bcomparable[- ]store\b/i,
/\bcomp sales\b/i,
/\borganic sales\b/i,
/\bbookings\b/i,
/\bbacklog\b/i,
/\barpu\b/i,
/\bmau\b/i,
/\bdau\b/i,
/\bsubscriber\b/i,
/\boccupancy\b/i,
/\brevpar\b/i,
/\bretention\b/i,
/\bchurn\b/i,
];
type FilingMetricKey = keyof NonNullable<Filing["metrics"]>;
function isFinancialMetricsForm(
filingType: string,
): filingType is "10-K" | "10-Q" {
return filingType === "10-K" || filingType === "10-Q";
}
const METRIC_CHECK_PATTERNS: Array<{
key: FilingMetricKey;
label: string;
patterns: RegExp[];
}> = [
{
key: "revenue",
label: "Revenue",
patterns: [/\brevenue\b/i, /\bsales\b/i],
},
{
key: "netIncome",
label: "Net income",
patterns: [/\bnet income\b/i, /\bprofit\b/i],
},
{
key: "totalAssets",
label: "Total assets",
patterns: [/\btotal assets\b/i, /\bassets\b/i],
},
{
key: "cash",
label: "Cash",
patterns: [/\bcash\b/i, /\bcash equivalents\b/i],
},
{
key: "debt",
label: "Debt",
patterns: [/\bdebt\b/i, /\bborrowings\b/i, /\bliabilit(?:y|ies)\b/i],
},
];
function toTaskResult(value: unknown): Record<string, unknown> {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return { value };
}
return value as Record<string, unknown>;
}
export type TaskExecutionOutcome = {
result: Record<string, unknown>;
completionDetail: string;
completionContext?: TaskStageContext | null;
};
function buildTaskOutcome(
result: unknown,
completionDetail: string,
completionContext: TaskStageContext | null = null,
): TaskExecutionOutcome {
return {
result: toTaskResult(result),
completionDetail,
completionContext,
};
}
async function setProjectionStage(
task: Task,
stage: TaskStage,
detail: string | null = null,
context: TaskStageContext | null = null,
) {
await updateTaskStage(task.id, stage, detail, context);
}
function buildProgressContext(input: {
current: number;
total: number;
unit: string;
counters?: Record<string, number>;
subject?: TaskStageContext["subject"];
}): TaskStageContext {
return {
progress: {
current: input.current,
total: input.total,
unit: input.unit,
},
counters: input.counters,
subject: input.subject,
};
}
function parseTicker(raw: unknown) {
if (typeof raw !== "string" || raw.trim().length < 1) {
throw new Error("Ticker is required");
}
return raw.trim().toUpperCase();
}
function parseLimit(raw: unknown, fallback: number, min: number, max: number) {
const numberValue = typeof raw === "number" ? raw : Number(raw);
if (!Number.isFinite(numberValue)) {
return fallback;
}
const intValue = Math.trunc(numberValue);
return Math.min(Math.max(intValue, min), max);
}
function parseOptionalText(raw: unknown) {
if (typeof raw !== "string") {
return null;
}
const normalized = raw.trim();
return normalized.length > 0 ? normalized : null;
}
function parseOptionalStringArray(raw: unknown) {
if (!Array.isArray(raw)) {
return [];
}
return raw
.filter((entry): entry is string => typeof entry === "string")
.map((entry) => entry.trim())
.filter((entry) => entry.length > 0);
}
function parseTags(raw: unknown) {
if (!Array.isArray(raw)) {
return [];
}
const unique = new Set<string>();
for (const entry of raw) {
if (typeof entry !== "string") {
continue;
}
const tag = entry.trim();
if (!tag) {
continue;
}
unique.add(tag);
}
return [...unique];
}
function sanitizeExtractionText(value: unknown, maxLength: number) {
if (typeof value !== "string") {
return null;
}
const collapsed = value.replace(/\s+/g, " ").trim();
if (!collapsed) {
return null;
}
return collapsed.slice(0, maxLength);
}
function sanitizeExtractionList(value: unknown) {
if (!Array.isArray(value)) {
return null;
}
const cleaned: string[] = [];
for (const entry of value) {
const normalized = sanitizeExtractionText(
entry,
EXTRACTION_ITEM_MAX_LENGTH,
);
if (!normalized) {
continue;
}
cleaned.push(normalized);
if (cleaned.length >= EXTRACTION_MAX_ITEMS) {
break;
}
}
return cleaned;
}
function uniqueExtractionList(items: Array<string | null | undefined>) {
const seen = new Set<string>();
const unique: string[] = [];
for (const item of items) {
const normalized = sanitizeExtractionText(item, EXTRACTION_ITEM_MAX_LENGTH);
if (!normalized) {
continue;
}
const signature = normalized.toLowerCase();
if (seen.has(signature)) {
continue;
}
seen.add(signature);
unique.push(normalized);
if (unique.length >= EXTRACTION_MAX_ITEMS) {
break;
}
}
return unique;
}
function collectTextSignals(filingText: string, patterns: RegExp[]) {
const lines = filingText
.replace(/\r/g, "\n")
.split(/\n+/)
.map((line) => line.replace(/\s+/g, " ").trim())
.filter((line) => line.length >= 24);
const matches: string[] = [];
for (const line of lines) {
if (!patterns.some((pattern) => pattern.test(line))) {
continue;
}
matches.push(line);
if (matches.length >= EXTRACTION_MAX_ITEMS * 2) {
break;
}
}
return uniqueExtractionList(matches);
}
function parseExtractionPayload(raw: string): FilingExtraction | null {
const fencedJson = raw.match(/```(?:json)?\s*([\s\S]*?)```/i)?.[1];
const candidate =
fencedJson ??
(() => {
const start = raw.indexOf("{");
const end = raw.lastIndexOf("}");
return start >= 0 && end > start ? raw.slice(start, end + 1) : null;
})();
if (!candidate) {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(candidate);
} catch {
return null;
}
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
return null;
}
const payload = parsed as Record<string, unknown>;
const keys = Object.keys(payload);
if (keys.length !== EXTRACTION_REQUIRED_KEYS.length) {
return null;
}
for (const key of EXTRACTION_REQUIRED_KEYS) {
if (!(key in payload)) {
return null;
}
}
for (const key of keys) {
if (
!EXTRACTION_REQUIRED_KEYS.includes(
key as (typeof EXTRACTION_REQUIRED_KEYS)[number],
)
) {
return null;
}
}
const summary = sanitizeExtractionText(
payload.summary,
EXTRACTION_SUMMARY_MAX_LENGTH,
);
const keyPoints = sanitizeExtractionList(payload.keyPoints);
const redFlags = sanitizeExtractionList(payload.redFlags);
const followUpQuestions = sanitizeExtractionList(payload.followUpQuestions);
const portfolioSignals = sanitizeExtractionList(payload.portfolioSignals);
const segmentSpecificData = sanitizeExtractionList(
payload.segmentSpecificData,
);
const geographicRevenueBreakdown = sanitizeExtractionList(
payload.geographicRevenueBreakdown,
);
const companySpecificData = sanitizeExtractionList(
payload.companySpecificData,
);
const secApiCrossChecks = sanitizeExtractionList(payload.secApiCrossChecks);
const confidenceRaw =
typeof payload.confidence === "number"
? payload.confidence
: Number(payload.confidence);
if (
!summary ||
!keyPoints ||
!redFlags ||
!followUpQuestions ||
!portfolioSignals ||
!segmentSpecificData ||
!geographicRevenueBreakdown ||
!companySpecificData ||
!secApiCrossChecks ||
!Number.isFinite(confidenceRaw)
) {
return null;
}
return {
summary,
keyPoints,
redFlags,
followUpQuestions,
portfolioSignals,
segmentSpecificData,
geographicRevenueBreakdown,
companySpecificData,
secApiCrossChecks,
confidence: Math.min(Math.max(confidenceRaw, 0), 1),
};
}
function metricSnapshotLine(label: string, value: number | null | undefined) {
if (value === null || value === undefined || !Number.isFinite(value)) {
return `${label}: not reported`;
}
return `${label}: ${Math.round(value).toLocaleString("en-US")}`;
}
function buildSecApiCrossChecks(filing: Filing, filingText: string) {
const normalizedText = filingText.toLowerCase();
const checks: string[] = [];
for (const descriptor of METRIC_CHECK_PATTERNS) {
const value = filing.metrics?.[descriptor.key];
if (value === null || value === undefined || !Number.isFinite(value)) {
checks.push(
`${descriptor.label}: SEC API metric unavailable for this filing.`,
);
continue;
}
const hasMention = descriptor.patterns.some((pattern) =>
pattern.test(normalizedText),
);
if (hasMention) {
checks.push(
`${descriptor.label}: SEC API value ${Math.round(value).toLocaleString("en-US")} appears referenced in filing narrative.`,
);
} else {
checks.push(
`${descriptor.label}: SEC API value ${Math.round(value).toLocaleString("en-US")} was not confidently located in sampled filing text.`,
);
}
}
return uniqueExtractionList(checks);
}
function deterministicExtractionFallback(filing: Filing): FilingExtraction {
const metrics = filing.metrics;
return {
summary: `${filing.company_name} ${filing.filing_type} filed on ${filing.filing_date}. Deterministic extraction fallback was used because filing text parsing was unavailable or invalid.`,
keyPoints: [
`${filing.filing_type} filing recorded for ${filing.ticker}.`,
metricSnapshotLine("Revenue", metrics?.revenue),
metricSnapshotLine("Net income", metrics?.netIncome),
metricSnapshotLine("Total assets", metrics?.totalAssets),
],
redFlags: [
metricSnapshotLine("Cash", metrics?.cash),
metricSnapshotLine("Debt", metrics?.debt),
filing.primary_document
? "Primary document is indexed and available for review."
: "Primary document reference is unavailable in current filing metadata.",
],
followUpQuestions: [
"What changed versus the prior filing in guidance, margins, or liquidity?",
"Are any material risks under-emphasized relative to historical filings?",
"Should portfolio exposure be adjusted before the next reporting cycle?",
],
portfolioSignals: [
"Validate trend direction using at least two prior filings.",
"Cross-check leverage and liquidity metrics against position sizing rules.",
"Track language shifts around guidance or demand assumptions.",
],
segmentSpecificData: [
"Segment-level disclosures were not parsed in deterministic fallback mode.",
],
geographicRevenueBreakdown: [
"Geographic revenue disclosures were not parsed in deterministic fallback mode.",
],
companySpecificData: [
"Company-specific operating KPIs (for example same-store sales) were not parsed in deterministic fallback mode.",
],
secApiCrossChecks: [
`${metricSnapshotLine("Revenue", metrics?.revenue)} (SEC API baseline; text verification unavailable).`,
`${metricSnapshotLine("Net income", metrics?.netIncome)} (SEC API baseline; text verification unavailable).`,
],
confidence: 0.2,
};
}
function buildRuleBasedExtraction(
filing: Filing,
filingText: string,
): FilingExtraction {
const baseline = deterministicExtractionFallback(filing);
const segmentSpecificData = collectTextSignals(filingText, SEGMENT_PATTERNS);
const geographicRevenueBreakdown = collectTextSignals(
filingText,
GEOGRAPHIC_PATTERNS,
);
const companySpecificData = collectTextSignals(
filingText,
COMPANY_SPECIFIC_PATTERNS,
);
const secApiCrossChecks = buildSecApiCrossChecks(filing, filingText);
const segmentLead = segmentSpecificData[0]
? `Segment detail: ${segmentSpecificData[0]}`
: null;
const geographicLead = geographicRevenueBreakdown[0]
? `Geographic detail: ${geographicRevenueBreakdown[0]}`
: null;
const companyLead = companySpecificData[0]
? `Company-specific KPI: ${companySpecificData[0]}`
: null;
return {
summary: `${filing.company_name} ${filing.filing_type} filed on ${filing.filing_date}. SEC API metrics were retained as the baseline and filing text was scanned for segment and company-specific disclosures.`,
keyPoints: uniqueExtractionList([
...baseline.keyPoints,
segmentLead,
geographicLead,
companyLead,
]),
redFlags: uniqueExtractionList([
...baseline.redFlags,
secApiCrossChecks.find((line) => /not confidently located/i.test(line)),
]),
followUpQuestions: uniqueExtractionList([
...baseline.followUpQuestions,
segmentSpecificData.length > 0
? "How do segment trends change the consolidated margin outlook?"
: "Does management provide segment-level KPIs in supplemental exhibits?",
]),
portfolioSignals: uniqueExtractionList([
...baseline.portfolioSignals,
companySpecificData.length > 0
? "Incorporate company-specific KPI direction into near-term position sizing."
: "Track future filings for explicit operating KPI disclosures.",
]),
segmentSpecificData:
segmentSpecificData.length > 0
? segmentSpecificData
: baseline.segmentSpecificData,
geographicRevenueBreakdown:
geographicRevenueBreakdown.length > 0
? geographicRevenueBreakdown
: baseline.geographicRevenueBreakdown,
companySpecificData:
companySpecificData.length > 0
? companySpecificData
: baseline.companySpecificData,
secApiCrossChecks:
secApiCrossChecks.length > 0
? secApiCrossChecks
: baseline.secApiCrossChecks,
confidence:
segmentSpecificData.length +
geographicRevenueBreakdown.length +
companySpecificData.length >
0
? 0.4
: 0.3,
};
}
function preferExtractionList(primary: string[], fallback: string[]) {
return primary.length > 0 ? primary : fallback;
}
function mergeExtractionWithFallback(
primary: FilingExtraction,
fallback: FilingExtraction,
): FilingExtraction {
return {
summary: primary.summary || fallback.summary,
keyPoints: preferExtractionList(primary.keyPoints, fallback.keyPoints),
redFlags: preferExtractionList(primary.redFlags, fallback.redFlags),
followUpQuestions: preferExtractionList(
primary.followUpQuestions,
fallback.followUpQuestions,
),
portfolioSignals: preferExtractionList(
primary.portfolioSignals,
fallback.portfolioSignals,
),
segmentSpecificData: preferExtractionList(
primary.segmentSpecificData,
fallback.segmentSpecificData,
),
geographicRevenueBreakdown: preferExtractionList(
primary.geographicRevenueBreakdown,
fallback.geographicRevenueBreakdown,
),
companySpecificData: preferExtractionList(
primary.companySpecificData,
fallback.companySpecificData,
),
secApiCrossChecks: preferExtractionList(
primary.secApiCrossChecks,
fallback.secApiCrossChecks,
),
confidence: Math.min(Math.max(primary.confidence, 0), 1),
};
}
function extractionPrompt(filing: Filing, filingText: string) {
return [
"Extract structured signals from the SEC filing text.",
`Company: ${filing.company_name} (${filing.ticker})`,
`Form: ${filing.filing_type}`,
`Filed: ${filing.filing_date}`,
`SEC API baseline metrics: ${JSON.stringify(filing.metrics ?? {})}`,
"Use SEC API metrics as canonical numeric values and validate whether each appears consistent with filing text context.",
"Prioritize company-specific and segment-specific disclosures not covered by SEC endpoint fields (for example same-store sales, geographic mix, segment margin).",
"Return ONLY valid JSON with exactly these keys and no extra keys:",
'{"summary":"string","keyPoints":["string"],"redFlags":["string"],"followUpQuestions":["string"],"portfolioSignals":["string"],"segmentSpecificData":["string"],"geographicRevenueBreakdown":["string"],"companySpecificData":["string"],"secApiCrossChecks":["string"],"confidence":0}',
`Rules: every array max ${EXTRACTION_MAX_ITEMS} items; each item <= ${EXTRACTION_ITEM_MAX_LENGTH} chars; summary <= ${EXTRACTION_SUMMARY_MAX_LENGTH} chars; confidence between 0 and 1.`,
"Filing text follows:",
filingText,
].join("\n\n");
}
function reportPrompt(
filing: Filing,
extraction: FilingExtraction,
extractionMeta: FilingExtractionMeta,
) {
return [
"You are a fiscal research assistant focused on regulatory signals.",
`Analyze this SEC filing from ${filing.company_name} (${filing.ticker}).`,
`Form: ${filing.filing_type}`,
`Filed: ${filing.filing_date}`,
`SEC API baseline metrics: ${JSON.stringify(filing.metrics ?? {})}`,
`Structured extraction context (${extractionMeta.source}): ${JSON.stringify(extraction)}`,
"Use SEC API values as the baseline financials and explicitly reference segment/company-specific details from extraction.",
"Return concise sections: Thesis, Red Flags, Follow-up Questions, Portfolio Impact.",
].join("\n");
}
function filingLinks(filing: {
filingUrl: string | null;
submissionUrl: string | null;
}) {
const links: Array<{ link_type: string; url: string }> = [];
if (filing.filingUrl) {
links.push({ link_type: "primary_document", url: filing.filingUrl });
}
if (filing.submissionUrl) {
links.push({ link_type: "submission_index", url: filing.submissionUrl });
}
return links;
}
async function processSyncFilings(task: Task) {
const ticker = parseTicker(task.payload.ticker);
const limit = parseLimit(task.payload.limit, 20, 1, 50);
const category = parseOptionalText(task.payload.category);
const tags = parseTags(task.payload.tags);
const scopeLabel = [
category,
tags.length > 0 ? `tags: ${tags.join(", ")}` : null,
]
.filter((entry): entry is string => Boolean(entry))
.join(" | ");
let searchTaskId: string | null = null;
const tickerSubject = { ticker };
await setProjectionStage(
task,
"sync.fetch_filings",
`Fetching up to ${limit} filings for ${ticker}${scopeLabel ? ` (${scopeLabel})` : ""}`,
{ subject: tickerSubject },
);
const filings = await fetchRecentFilings(ticker, limit);
await setProjectionStage(
task,
"sync.persist_filings",
`Persisting ${filings.length} filings and source links`,
{
counters: { fetched: filings.length },
subject: tickerSubject,
},
);
const saveResult = await upsertFilingsRecords(
filings.map((filing) => ({
ticker: filing.ticker,
filing_type: filing.filingType,
filing_date: filing.filingDate,
accession_number: filing.accessionNumber,
cik: filing.cik,
company_name: filing.companyName,
filing_url: filing.filingUrl,
submission_url: filing.submissionUrl,
primary_document: filing.primaryDocument,
metrics: null,
links: filingLinks(filing),
})),
);
let taxonomySnapshotsHydrated = 0;
let taxonomySnapshotsFailed = 0;
const hydrateCandidates = (
await listFilingsRecords({
ticker,
limit: Math.min(Math.max(limit * 3, 40), STATEMENT_HYDRATION_MAX_FILINGS),
})
).filter((filing): filing is Filing & { filing_type: "10-K" | "10-Q" } => {
return isFinancialMetricsForm(filing.filing_type);
});
await setProjectionStage(
task,
"sync.discover_assets",
`Discovering taxonomy assets for ${hydrateCandidates.length} candidate filings`,
buildProgressContext({
current: 0,
total: hydrateCandidates.length,
unit: "filings",
counters: {
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,
hydrated: 0,
failed: 0,
},
subject: tickerSubject,
}),
);
for (let index = 0; index < hydrateCandidates.length; index += 1) {
const filing = hydrateCandidates[index];
const existingSnapshot = await getFilingTaxonomySnapshotByFilingId(
filing.id,
);
const shouldRefresh =
!existingSnapshot ||
Date.parse(existingSnapshot.updated_at) < Date.parse(filing.updated_at);
if (!shouldRefresh) {
continue;
}
const stageContext = (stage: TaskStage) =>
buildProgressContext({
current: index + 1,
total: hydrateCandidates.length,
unit: "filings",
counters: {
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,
hydrated: taxonomySnapshotsHydrated,
failed: taxonomySnapshotsFailed,
},
subject: {
ticker,
accessionNumber: filing.accession_number,
label: stage,
},
});
try {
await setProjectionStage(
task,
"sync.extract_taxonomy",
`Extracting XBRL taxonomy for ${filing.accession_number}`,
stageContext("sync.extract_taxonomy"),
);
const snapshot = await hydrateFilingTaxonomySnapshot({
filingId: filing.id,
ticker: filing.ticker,
cik: filing.cik,
accessionNumber: filing.accession_number,
filingDate: filing.filing_date,
filingType: filing.filing_type,
filingUrl: filing.filing_url,
primaryDocument: filing.primary_document ?? null,
});
let pdfValidation = {
validation_result: snapshot.validation_result,
metric_validations: snapshot.metric_validations,
};
try {
pdfValidation = await validateMetricsWithPdfLlm({
metrics: snapshot.derived_metrics,
assets: snapshot.assets,
});
} catch (error) {
const message =
error instanceof Error
? error.message
: "PDF metric validation failed";
pdfValidation = {
validation_result: {
status: "error",
checks: [],
validatedAt: new Date().toISOString(),
},
metric_validations: snapshot.metric_validations.map((check) => ({
...check,
error: check.error ?? message,
})),
};
}
const normalizedSnapshot = {
...snapshot,
validation_result: pdfValidation.validation_result,
metric_validations: pdfValidation.metric_validations,
...normalizeFilingTaxonomySnapshotPayload(snapshot),
};
await setProjectionStage(
task,
"sync.normalize_taxonomy",
`Materializing statements for ${filing.accession_number}`,
stageContext("sync.normalize_taxonomy"),
);
await setProjectionStage(
task,
"sync.derive_metrics",
`Deriving taxonomy metrics for ${filing.accession_number}`,
stageContext("sync.derive_metrics"),
);
await setProjectionStage(
task,
"sync.validate_pdf_metrics",
`Validating metrics via PDF + LLM for ${filing.accession_number}`,
stageContext("sync.validate_pdf_metrics"),
);
await setProjectionStage(
task,
"sync.persist_taxonomy",
`Persisting taxonomy snapshot for ${filing.accession_number}`,
stageContext("sync.persist_taxonomy"),
);
await upsertFilingTaxonomySnapshot(normalizedSnapshot);
await updateFilingMetricsById(
filing.id,
normalizedSnapshot.derived_metrics,
);
await deleteCompanyFinancialBundlesForTicker(filing.ticker);
taxonomySnapshotsHydrated += 1;
} catch (error) {
const now = new Date().toISOString();
await upsertFilingTaxonomySnapshot({
filing_id: filing.id,
ticker: filing.ticker,
filing_date: filing.filing_date,
filing_type: filing.filing_type,
parse_status: "failed",
parse_error:
error instanceof Error ? error.message : "Taxonomy hydration failed",
source: "legacy_html_fallback",
parser_engine: "fiscal-xbrl",
parser_version: "unknown",
taxonomy_regime: "unknown",
fiscal_pack: "core",
periods: [],
faithful_rows: {
income: [],
balance: [],
cash_flow: [],
equity: [],
comprehensive_income: [],
disclosure: [],
},
statement_rows: {
income: [],
balance: [],
cash_flow: [],
equity: [],
comprehensive_income: [],
disclosure: [],
},
surface_rows: {
income: [],
balance: [],
cash_flow: [],
equity: [],
comprehensive_income: [],
disclosure: [],
},
detail_rows: {
income: {},
balance: {},
cash_flow: {},
equity: {},
comprehensive_income: {},
disclosure: {},
},
kpi_rows: [],
computed_definitions: [],
contexts: [],
derived_metrics: filing.metrics ?? null,
validation_result: {
status: "error",
checks: [],
validatedAt: now,
},
normalization_summary: {
surfaceRowCount: 0,
detailRowCount: 0,
kpiRowCount: 0,
unmappedRowCount: 0,
materialUnmappedRowCount: 0,
warnings: [],
},
facts_count: 0,
concepts_count: 0,
dimensions_count: 0,
assets: [],
concepts: [],
facts: [],
metric_validations: [],
});
await deleteCompanyFinancialBundlesForTicker(filing.ticker);
taxonomySnapshotsFailed += 1;
}
await Bun.sleep(STATEMENT_HYDRATION_DELAY_MS);
}
try {
const searchTask = await enqueueTask({
userId: task.user_id,
taskType: "index_search",
payload: {
ticker,
sourceKinds: ["filing_document", "filing_brief"],
},
priority: 55,
resourceKey: `index_search:ticker:${ticker}`,
});
searchTaskId = searchTask.id;
} catch (error) {
console.error(`[search-index-sync] failed for ${ticker}:`, error);
}
const result = {
ticker,
category,
tags,
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,
taxonomySnapshotsHydrated,
taxonomySnapshotsFailed,
searchTaskId,
};
return buildTaskOutcome(
result,
`Synced ${filings.length} filings for ${ticker}, hydrated ${taxonomySnapshotsHydrated} taxonomy snapshots, failed ${taxonomySnapshotsFailed}.`,
buildProgressContext({
current: hydrateCandidates.length,
total: hydrateCandidates.length || 1,
unit: "filings",
counters: {
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,
hydrated: taxonomySnapshotsHydrated,
failed: taxonomySnapshotsFailed,
},
subject: tickerSubject,
}),
);
}
async function processRefreshPrices(task: Task) {
const userId = task.user_id;
if (!userId) {
throw new Error("Task is missing user scope");
}
await setProjectionStage(
task,
"refresh.load_holdings",
"Loading holdings for price refresh",
);
const userHoldings = await listHoldingsForPriceRefresh(userId);
const tickers = [...new Set(userHoldings.map((entry) => entry.ticker))];
const quotes = new Map<string, number>();
const failedTickers: string[] = [];
const staleTickers: string[] = [];
const baseContext = {
counters: {
holdings: userHoldings.length,
},
} satisfies TaskStageContext;
await setProjectionStage(
task,
"refresh.load_holdings",
`Loaded ${userHoldings.length} holdings across ${tickers.length} tickers`,
baseContext,
);
await setProjectionStage(
task,
"refresh.fetch_quotes",
`Fetching quotes for ${tickers.length} tickers`,
buildProgressContext({
current: 0,
total: tickers.length,
unit: "tickers",
counters: {
holdings: userHoldings.length,
},
}),
);
for (let index = 0; index < tickers.length; index += 1) {
const ticker = tickers[index];
const quoteResult = await getQuote(ticker);
if (quoteResult.value !== null) {
quotes.set(ticker, quoteResult.value);
if (quoteResult.stale) {
staleTickers.push(ticker);
}
} else {
failedTickers.push(ticker);
}
await setProjectionStage(
task,
"refresh.fetch_quotes",
`Fetching quotes for ${tickers.length} tickers`,
buildProgressContext({
current: index + 1,
total: tickers.length,
unit: "tickers",
counters: {
holdings: userHoldings.length,
failed: failedTickers.length,
stale: staleTickers.length,
},
subject: { ticker },
}),
);
}
await setProjectionStage(
task,
"refresh.persist_prices",
`Writing refreshed prices for ${quotes.size} tickers across ${userHoldings.length} holdings`,
{
counters: {
holdings: userHoldings.length,
failed: failedTickers.length,
stale: staleTickers.length,
},
},
);
const updatedCount = await applyRefreshedPrices(
userId,
quotes,
new Date().toISOString(),
);
const result = {
updatedCount,
totalTickers: tickers.length,
failedTickers,
staleTickers,
};
const messageParts = [
`Refreshed prices for ${quotes.size}/${tickers.length} tickers`,
];
if (failedTickers.length > 0) {
messageParts.push(`(${failedTickers.length} unavailable)`);
}
if (staleTickers.length > 0) {
messageParts.push(`(${staleTickers.length} stale)`);
}
return buildTaskOutcome(
result,
`${messageParts.join(" ")} across ${userHoldings.length} holdings.`,
{
progress: {
current: tickers.length,
total: tickers.length || 1,
unit: "tickers",
},
counters: {
holdings: userHoldings.length,
updatedCount,
failed: failedTickers.length,
stale: staleTickers.length,
},
},
);
}
async function processAnalyzeFiling(task: Task) {
const accessionNumber =
typeof task.payload.accessionNumber === "string"
? task.payload.accessionNumber
: "";
if (!accessionNumber) {
throw new Error("accessionNumber is required");
}
await setProjectionStage(
task,
"analyze.load_filing",
`Loading filing ${accessionNumber}`,
{
subject: {
accessionNumber,
},
},
);
const filing = await getFilingByAccession(accessionNumber);
if (!filing) {
throw new Error(`Filing ${accessionNumber} not found`);
}
const analyzeSubject = {
ticker: filing.ticker,
accessionNumber,
label: filing.filing_type,
};
const defaultExtraction = deterministicExtractionFallback(filing);
let extraction = defaultExtraction;
let extractionMeta: FilingExtractionMeta = {
provider: "deterministic-fallback",
model: "metadata-fallback",
source: "metadata_fallback",
generatedAt: new Date().toISOString(),
};
let filingDocument: Awaited<
ReturnType<typeof fetchPrimaryFilingText>
> | null = null;
try {
await setProjectionStage(
task,
"analyze.fetch_document",
"Fetching primary filing document",
{
subject: analyzeSubject,
},
);
filingDocument = await fetchPrimaryFilingText({
filingUrl: filing.filing_url,
cik: filing.cik,
accessionNumber: filing.accession_number,
primaryDocument: filing.primary_document ?? null,
});
} catch {
filingDocument = null;
}
if (filingDocument?.text) {
await setProjectionStage(
task,
"analyze.extract",
"Generating extraction context from filing text",
{
subject: analyzeSubject,
},
);
const ruleBasedExtraction = buildRuleBasedExtraction(
filing,
filingDocument.text,
);
const extractionResult = await runAiAnalysis(
extractionPrompt(filing, filingDocument.text),
"Return strict JSON only.",
{ workload: "extraction" },
);
const parsed = parseExtractionPayload(extractionResult.text);
if (!parsed) {
throw new Error("Extraction output invalid JSON schema");
}
extraction = mergeExtractionWithFallback(parsed, ruleBasedExtraction);
extractionMeta = {
provider: "zhipu",
model: extractionResult.model,
source: filingDocument.source,
generatedAt: new Date().toISOString(),
};
}
await setProjectionStage(
task,
"analyze.generate_report",
"Generating final filing analysis report",
{
subject: analyzeSubject,
},
);
const analysis = await runAiAnalysis(
reportPrompt(filing, extraction, extractionMeta),
"Use concise institutional analyst language.",
{ workload: "report" },
);
await setProjectionStage(
task,
"analyze.persist_report",
"Persisting filing analysis output",
{
subject: analyzeSubject,
},
);
await saveFilingAnalysis(accessionNumber, {
provider: analysis.provider,
model: analysis.model,
text: analysis.text,
extraction,
extractionMeta,
});
let searchTaskId: string | null = null;
try {
const searchTask = await enqueueTask({
userId: task.user_id,
taskType: "index_search",
payload: {
accessionNumber,
sourceKinds: ["filing_brief"],
},
priority: 58,
resourceKey: `index_search:filing_brief:${accessionNumber}`,
});
searchTaskId = searchTask.id;
} catch (error) {
console.error(
`[search-index-analyze] failed for ${accessionNumber}:`,
error,
);
}
const result = {
ticker: filing.ticker,
accessionNumber,
filingType: filing.filing_type,
provider: analysis.provider,
model: analysis.model,
extractionProvider: extractionMeta.provider,
extractionModel: extractionMeta.model,
searchTaskId,
};
return buildTaskOutcome(
result,
`Analysis report generated for ${filing.ticker} ${filing.filing_type} ${accessionNumber}.`,
{
subject: analyzeSubject,
},
);
}
async function processIndexSearch(task: Task) {
await setProjectionStage(
task,
"search.collect_sources",
"Collecting source records for search indexing",
);
const ticker = parseOptionalText(task.payload.ticker);
const accessionNumber = parseOptionalText(task.payload.accessionNumber);
const journalEntryId =
task.payload.journalEntryId === undefined
? null
: Number(task.payload.journalEntryId);
const deleteSourceRefs = Array.isArray(task.payload.deleteSourceRefs)
? task.payload.deleteSourceRefs.filter(
(
entry,
): entry is {
sourceKind: string;
sourceRef: string;
scope: string;
userId?: string | null;
} => {
return Boolean(
entry &&
typeof entry === "object" &&
typeof (entry as { sourceKind?: unknown }).sourceKind ===
"string" &&
typeof (entry as { sourceRef?: unknown }).sourceRef === "string" &&
typeof (entry as { scope?: unknown }).scope === "string",
);
},
)
: [];
const sourceKinds = parseOptionalStringArray(task.payload.sourceKinds).filter(
(
sourceKind,
): sourceKind is "filing_document" | "filing_brief" | "research_note" => {
return (
sourceKind === "filing_document" ||
sourceKind === "filing_brief" ||
sourceKind === "research_note"
);
},
);
const validatedJournalEntryId =
typeof journalEntryId === "number" &&
Number.isInteger(journalEntryId) &&
journalEntryId > 0
? journalEntryId
: null;
const result = await indexSearchDocuments({
userId: task.user_id,
ticker,
accessionNumber,
journalEntryId: validatedJournalEntryId,
sourceKinds: sourceKinds.length > 0 ? sourceKinds : undefined,
deleteSourceRefs: deleteSourceRefs.map((entry) => ({
sourceKind: entry.sourceKind as
| "filing_document"
| "filing_brief"
| "research_note",
sourceRef: entry.sourceRef,
scope: entry.scope === "user" ? "user" : "global",
userId: typeof entry.userId === "string" ? entry.userId : null,
})),
onStage: async (stage, detail, context) => {
switch (stage) {
case "collect":
await setProjectionStage(
task,
"search.collect_sources",
detail,
context ?? {
subject: ticker
? { ticker }
: accessionNumber
? { accessionNumber }
: null,
},
);
break;
case "fetch":
await setProjectionStage(
task,
"search.fetch_documents",
detail,
context ?? null,
);
break;
case "chunk":
await setProjectionStage(
task,
"search.chunk",
detail,
context ?? null,
);
break;
case "embed":
await setProjectionStage(
task,
"search.embed",
detail,
context ?? null,
);
break;
case "persist":
await setProjectionStage(
task,
"search.persist",
detail,
context ?? null,
);
break;
}
},
});
const taskResult = {
ticker,
accessionNumber,
journalEntryId: validatedJournalEntryId,
...result,
};
return buildTaskOutcome(
taskResult,
`Indexed ${result.indexed} sources, embedded ${result.chunksEmbedded} chunks, skipped ${result.skipped}, deleted ${result.deleted} stale documents.`,
{
progress: {
current: result.sourcesCollected,
total: result.sourcesCollected || 1,
unit: "sources",
},
counters: {
sourcesCollected: result.sourcesCollected,
indexed: result.indexed,
chunksEmbedded: result.chunksEmbedded,
skipped: result.skipped,
deleted: result.deleted,
},
subject: ticker
? { ticker }
: accessionNumber
? { accessionNumber }
: null,
},
);
}
function holdingDigest(holdings: Holding[]) {
return holdings.map((holding) => ({
ticker: holding.ticker,
shares: holding.shares,
avgCost: holding.avg_cost,
currentPrice: holding.current_price,
marketValue: holding.market_value,
gainLoss: holding.gain_loss,
gainLossPct: holding.gain_loss_pct,
}));
}
async function processPortfolioInsights(task: Task) {
const userId = task.user_id;
if (!userId) {
throw new Error("Task is missing user scope");
}
await setProjectionStage(
task,
"insights.load_holdings",
"Loading holdings for portfolio insight generation",
);
const userHoldings = await listUserHoldings(userId);
const summary = buildPortfolioSummary(userHoldings);
const holdingsContext = {
counters: {
holdings: userHoldings.length,
},
} satisfies TaskStageContext;
await setProjectionStage(
task,
"insights.load_holdings",
`Loaded ${userHoldings.length} holdings for portfolio insight generation`,
holdingsContext,
);
const prompt = [
"Generate portfolio intelligence with actionable recommendations.",
`Portfolio summary: ${JSON.stringify(summary)}`,
`Holdings: ${JSON.stringify(holdingDigest(userHoldings))}`,
"Respond with: 1) health score (0-100), 2) top 3 risks, 3) top 3 opportunities, 4) next actions in 7 days.",
].join("\n");
await setProjectionStage(
task,
"insights.generate",
"Generating portfolio AI insight",
holdingsContext,
);
const analysis = await runAiAnalysis(
prompt,
"Act as a risk-aware buy-side analyst.",
{ workload: "report" },
);
await setProjectionStage(
task,
"insights.persist",
"Persisting generated portfolio insight",
holdingsContext,
);
await createPortfolioInsight({
userId,
provider: analysis.provider,
model: analysis.model,
content: analysis.text,
});
const result = {
provider: analysis.provider,
model: analysis.model,
summary,
};
return buildTaskOutcome(
result,
`Generated portfolio insight for ${summary.positions} holdings.`,
{
counters: {
holdings: summary.positions,
},
},
);
}
export const __taskProcessorInternals = {
parseExtractionPayload,
deterministicExtractionFallback,
isFinancialMetricsForm,
};
export async function runTaskProcessor(task: Task) {
switch (task.task_type) {
case "sync_filings":
return await processSyncFilings(task);
case "refresh_prices":
return await processRefreshPrices(task);
case "analyze_filing":
return await processAnalyzeFiling(task);
case "portfolio_insights":
return await processPortfolioInsights(task);
case "index_search":
return await processIndexSearch(task);
default:
throw new Error(`Unsupported task type: ${task.task_type}`);
}
}