Files
Neon-Desk/lib/server/task-processors.ts

1699 lines
47 KiB
TypeScript

import type {
Filing,
FilingExtraction,
FilingExtractionMeta,
Holding,
Task,
TaskStage,
TaskStageContext,
} from "@/lib/types";
import { runAiAnalysis } from "@/lib/server/ai";
import { buildPortfolioSummary } from "@/lib/server/portfolio";
import { getQuote } from "@/lib/server/prices";
import { indexSearchDocuments } from "@/lib/server/search";
import {
getFilingByAccession,
listFilingsRecords,
saveFilingAnalysis,
updateFilingMetricsById,
upsertFilingsRecords,
} from "@/lib/server/repos/filings";
import { deleteCompanyFinancialBundlesForTicker } from "@/lib/server/repos/company-financial-bundles";
import {
getFilingTaxonomySnapshotByFilingId,
normalizeFilingTaxonomySnapshotPayload,
upsertFilingTaxonomySnapshot,
} from "@/lib/server/repos/filing-taxonomy";
import {
applyRefreshedPrices,
listHoldingsForPriceRefresh,
listUserHoldings,
} from "@/lib/server/repos/holdings";
import { createPortfolioInsight } from "@/lib/server/repos/insights";
import { updateTaskStage } from "@/lib/server/repos/tasks";
import { fetchPrimaryFilingText, fetchRecentFilings } from "@/lib/server/sec";
import {
generateIssuerOverlayForTicker,
recordIssuerOverlayBuildFailure,
} from "@/lib/server/issuer-overlays";
import {
getActiveIssuerOverlayDefinition,
getIssuerOverlay,
} from "@/lib/server/repos/issuer-overlays";
import { enqueueTask } from "@/lib/server/tasks";
import { hydrateFilingTaxonomySnapshot } from "@/lib/server/taxonomy/engine";
import { validateMetricsWithPdfLlm } from "@/lib/server/taxonomy/pdf-validation";
const EXTRACTION_REQUIRED_KEYS = [
"summary",
"keyPoints",
"redFlags",
"followUpQuestions",
"portfolioSignals",
"segmentSpecificData",
"geographicRevenueBreakdown",
"companySpecificData",
"secApiCrossChecks",
"confidence",
] as const;
const EXTRACTION_MAX_ITEMS = 6;
const EXTRACTION_ITEM_MAX_LENGTH = 280;
const EXTRACTION_SUMMARY_MAX_LENGTH = 900;
const STATEMENT_HYDRATION_DELAY_MS = 120;
const STATEMENT_HYDRATION_MAX_FILINGS = 80;
const SEGMENT_PATTERNS = [
/\boperating segment\b/i,
/\bsegment revenue\b/i,
/\bsegment margin\b/i,
/\bsegment profit\b/i,
/\bbusiness segment\b/i,
/\breportable segment\b/i,
];
const GEOGRAPHIC_PATTERNS = [
/\bgeographic\b/i,
/\bamericas\b/i,
/\bemea\b/i,
/\bapac\b/i,
/\basia pacific\b/i,
/\bnorth america\b/i,
/\beurope\b/i,
/\bchina\b/i,
/\binternational\b/i,
];
const COMPANY_SPECIFIC_PATTERNS = [
/\bsame[- ]store\b/i,
/\bcomparable[- ]store\b/i,
/\bcomp sales\b/i,
/\borganic sales\b/i,
/\bbookings\b/i,
/\bbacklog\b/i,
/\barpu\b/i,
/\bmau\b/i,
/\bdau\b/i,
/\bsubscriber\b/i,
/\boccupancy\b/i,
/\brevpar\b/i,
/\bretention\b/i,
/\bchurn\b/i,
];
type FilingMetricKey = keyof NonNullable<Filing["metrics"]>;
function isFinancialMetricsForm(
filingType: string,
): filingType is "10-K" | "10-Q" {
return filingType === "10-K" || filingType === "10-Q";
}
const METRIC_CHECK_PATTERNS: Array<{
key: FilingMetricKey;
label: string;
patterns: RegExp[];
}> = [
{
key: "revenue",
label: "Revenue",
patterns: [/\brevenue\b/i, /\bsales\b/i],
},
{
key: "netIncome",
label: "Net income",
patterns: [/\bnet income\b/i, /\bprofit\b/i],
},
{
key: "totalAssets",
label: "Total assets",
patterns: [/\btotal assets\b/i, /\bassets\b/i],
},
{
key: "cash",
label: "Cash",
patterns: [/\bcash\b/i, /\bcash equivalents\b/i],
},
{
key: "debt",
label: "Debt",
patterns: [/\bdebt\b/i, /\bborrowings\b/i, /\bliabilit(?:y|ies)\b/i],
},
];
function toTaskResult(value: unknown): Record<string, unknown> {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return { value };
}
return value as Record<string, unknown>;
}
export type TaskExecutionOutcome = {
result: Record<string, unknown>;
completionDetail: string;
completionContext?: TaskStageContext | null;
};
type TaxonomyHydrationRunResult = {
hydrated: number;
failed: number;
touchedFilingIds: Set<number>;
};
function buildTaskOutcome(
result: unknown,
completionDetail: string,
completionContext: TaskStageContext | null = null,
): TaskExecutionOutcome {
return {
result: toTaskResult(result),
completionDetail,
completionContext,
};
}
function shouldRefreshTaxonomySnapshot(input: {
existingSnapshot: Awaited<
ReturnType<typeof getFilingTaxonomySnapshotByFilingId>
> | null;
filingUpdatedAt: string;
overlayRevisionId: number | null;
}) {
if (!input.existingSnapshot) {
return true;
}
if (
Date.parse(input.existingSnapshot.updated_at) <
Date.parse(input.filingUpdatedAt)
) {
return true;
}
return (
(input.existingSnapshot.issuer_overlay_revision_id ?? null) !==
input.overlayRevisionId
);
}
async function hydrateTaxonomySnapshotsForCandidates(input: {
task: Task;
ticker: string;
filingsCount: number;
saveResult: { inserted: number; updated: number };
candidates: Array<Filing & { filing_type: "10-K" | "10-Q" }>;
overlayRevisionId: number | null;
}) {
const overlayDefinition =
input.overlayRevisionId === null
? null
: await getActiveIssuerOverlayDefinition(input.ticker);
let hydrated = 0;
let failed = 0;
const touchedFilingIds = new Set<number>();
for (let index = 0; index < input.candidates.length; index += 1) {
const filing = input.candidates[index];
const existingSnapshot = await getFilingTaxonomySnapshotByFilingId(
filing.id,
);
if (
!shouldRefreshTaxonomySnapshot({
existingSnapshot,
filingUpdatedAt: filing.updated_at,
overlayRevisionId: input.overlayRevisionId,
})
) {
continue;
}
touchedFilingIds.add(filing.id);
const stageContext = (stage: TaskStage) =>
buildProgressContext({
current: index + 1,
total: input.candidates.length,
unit: "filings",
counters: {
fetched: input.filingsCount,
inserted: input.saveResult.inserted,
updated: input.saveResult.updated,
hydrated,
failed,
},
subject: {
ticker: input.ticker,
accessionNumber: filing.accession_number,
label: stage,
},
});
try {
await setProjectionStage(
input.task,
"sync.extract_taxonomy",
`Extracting XBRL taxonomy for ${filing.accession_number}`,
stageContext("sync.extract_taxonomy"),
);
const snapshot = await hydrateFilingTaxonomySnapshot({
filingId: filing.id,
ticker: filing.ticker,
cik: filing.cik,
accessionNumber: filing.accession_number,
filingDate: filing.filing_date,
filingType: filing.filing_type,
filingUrl: filing.filing_url,
primaryDocument: filing.primary_document ?? null,
issuerOverlay: overlayDefinition,
});
let pdfValidation = {
validation_result: snapshot.validation_result,
metric_validations: snapshot.metric_validations,
};
try {
pdfValidation = await validateMetricsWithPdfLlm({
metrics: snapshot.derived_metrics,
assets: snapshot.assets,
});
} catch (error) {
const message =
error instanceof Error
? error.message
: "PDF metric validation failed";
pdfValidation = {
validation_result: {
status: "error",
checks: [],
validatedAt: new Date().toISOString(),
},
metric_validations: snapshot.metric_validations.map((check) => ({
...check,
error: check.error ?? message,
})),
};
}
const normalizedSnapshot = {
...snapshot,
validation_result: pdfValidation.validation_result,
metric_validations: pdfValidation.metric_validations,
issuer_overlay_revision_id: input.overlayRevisionId,
...normalizeFilingTaxonomySnapshotPayload(snapshot),
};
await setProjectionStage(
input.task,
"sync.normalize_taxonomy",
`Materializing statements for ${filing.accession_number}`,
stageContext("sync.normalize_taxonomy"),
);
await setProjectionStage(
input.task,
"sync.derive_metrics",
`Deriving taxonomy metrics for ${filing.accession_number}`,
stageContext("sync.derive_metrics"),
);
await setProjectionStage(
input.task,
"sync.validate_pdf_metrics",
`Validating metrics via PDF + LLM for ${filing.accession_number}`,
stageContext("sync.validate_pdf_metrics"),
);
await setProjectionStage(
input.task,
"sync.persist_taxonomy",
`Persisting taxonomy snapshot for ${filing.accession_number}`,
stageContext("sync.persist_taxonomy"),
);
await upsertFilingTaxonomySnapshot(normalizedSnapshot);
await updateFilingMetricsById(
filing.id,
normalizedSnapshot.derived_metrics,
);
await deleteCompanyFinancialBundlesForTicker(filing.ticker);
hydrated += 1;
} catch (error) {
const now = new Date().toISOString();
await upsertFilingTaxonomySnapshot({
filing_id: filing.id,
ticker: filing.ticker,
filing_date: filing.filing_date,
filing_type: filing.filing_type,
parse_status: "failed",
parse_error:
error instanceof Error ? error.message : "Taxonomy hydration failed",
source: "legacy_html_fallback",
parser_engine: "fiscal-xbrl",
parser_version: "unknown",
taxonomy_regime: "unknown",
fiscal_pack: "core",
periods: [],
faithful_rows: {
income: [],
balance: [],
cash_flow: [],
equity: [],
comprehensive_income: [],
disclosure: [],
},
statement_rows: {
income: [],
balance: [],
cash_flow: [],
equity: [],
comprehensive_income: [],
disclosure: [],
},
surface_rows: {
income: [],
balance: [],
cash_flow: [],
equity: [],
comprehensive_income: [],
disclosure: [],
},
detail_rows: {
income: {},
balance: {},
cash_flow: {},
equity: {},
comprehensive_income: {},
disclosure: {},
},
kpi_rows: [],
computed_definitions: [],
contexts: [],
derived_metrics: filing.metrics ?? null,
validation_result: {
status: "error",
checks: [],
validatedAt: now,
},
normalization_summary: {
surfaceRowCount: 0,
detailRowCount: 0,
kpiRowCount: 0,
unmappedRowCount: 0,
materialUnmappedRowCount: 0,
residualPrimaryCount: 0,
residualDisclosureCount: 0,
unsupportedConceptCount: 0,
issuerOverlayMatchCount: 0,
warnings: [],
},
issuer_overlay_revision_id: input.overlayRevisionId,
facts_count: 0,
concepts_count: 0,
dimensions_count: 0,
assets: [],
concepts: [],
facts: [],
metric_validations: [],
});
await deleteCompanyFinancialBundlesForTicker(filing.ticker);
failed += 1;
}
await Bun.sleep(STATEMENT_HYDRATION_DELAY_MS);
}
return {
hydrated,
failed,
touchedFilingIds,
} satisfies TaxonomyHydrationRunResult;
}
async function setProjectionStage(
task: Task,
stage: TaskStage,
detail: string | null = null,
context: TaskStageContext | null = null,
) {
await updateTaskStage(task.id, stage, detail, context);
}
function buildProgressContext(input: {
current: number;
total: number;
unit: string;
counters?: Record<string, number>;
subject?: TaskStageContext["subject"];
}): TaskStageContext {
return {
progress: {
current: input.current,
total: input.total,
unit: input.unit,
},
counters: input.counters,
subject: input.subject,
};
}
function parseTicker(raw: unknown) {
if (typeof raw !== "string" || raw.trim().length < 1) {
throw new Error("Ticker is required");
}
return raw.trim().toUpperCase();
}
function parseLimit(raw: unknown, fallback: number, min: number, max: number) {
const numberValue = typeof raw === "number" ? raw : Number(raw);
if (!Number.isFinite(numberValue)) {
return fallback;
}
const intValue = Math.trunc(numberValue);
return Math.min(Math.max(intValue, min), max);
}
function parseOptionalText(raw: unknown) {
if (typeof raw !== "string") {
return null;
}
const normalized = raw.trim();
return normalized.length > 0 ? normalized : null;
}
function parseOptionalStringArray(raw: unknown) {
if (!Array.isArray(raw)) {
return [];
}
return raw
.filter((entry): entry is string => typeof entry === "string")
.map((entry) => entry.trim())
.filter((entry) => entry.length > 0);
}
function parseTags(raw: unknown) {
if (!Array.isArray(raw)) {
return [];
}
const unique = new Set<string>();
for (const entry of raw) {
if (typeof entry !== "string") {
continue;
}
const tag = entry.trim();
if (!tag) {
continue;
}
unique.add(tag);
}
return [...unique];
}
function sanitizeExtractionText(value: unknown, maxLength: number) {
if (typeof value !== "string") {
return null;
}
const collapsed = value.replace(/\s+/g, " ").trim();
if (!collapsed) {
return null;
}
return collapsed.slice(0, maxLength);
}
function sanitizeExtractionList(value: unknown) {
if (!Array.isArray(value)) {
return null;
}
const cleaned: string[] = [];
for (const entry of value) {
const normalized = sanitizeExtractionText(
entry,
EXTRACTION_ITEM_MAX_LENGTH,
);
if (!normalized) {
continue;
}
cleaned.push(normalized);
if (cleaned.length >= EXTRACTION_MAX_ITEMS) {
break;
}
}
return cleaned;
}
function uniqueExtractionList(items: Array<string | null | undefined>) {
const seen = new Set<string>();
const unique: string[] = [];
for (const item of items) {
const normalized = sanitizeExtractionText(item, EXTRACTION_ITEM_MAX_LENGTH);
if (!normalized) {
continue;
}
const signature = normalized.toLowerCase();
if (seen.has(signature)) {
continue;
}
seen.add(signature);
unique.push(normalized);
if (unique.length >= EXTRACTION_MAX_ITEMS) {
break;
}
}
return unique;
}
function collectTextSignals(filingText: string, patterns: RegExp[]) {
const lines = filingText
.replace(/\r/g, "\n")
.split(/\n+/)
.map((line) => line.replace(/\s+/g, " ").trim())
.filter((line) => line.length >= 24);
const matches: string[] = [];
for (const line of lines) {
if (!patterns.some((pattern) => pattern.test(line))) {
continue;
}
matches.push(line);
if (matches.length >= EXTRACTION_MAX_ITEMS * 2) {
break;
}
}
return uniqueExtractionList(matches);
}
function parseExtractionPayload(raw: string): FilingExtraction | null {
const fencedJson = raw.match(/```(?:json)?\s*([\s\S]*?)```/i)?.[1];
const candidate =
fencedJson ??
(() => {
const start = raw.indexOf("{");
const end = raw.lastIndexOf("}");
return start >= 0 && end > start ? raw.slice(start, end + 1) : null;
})();
if (!candidate) {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(candidate);
} catch {
return null;
}
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
return null;
}
const payload = parsed as Record<string, unknown>;
const keys = Object.keys(payload);
if (keys.length !== EXTRACTION_REQUIRED_KEYS.length) {
return null;
}
for (const key of EXTRACTION_REQUIRED_KEYS) {
if (!(key in payload)) {
return null;
}
}
for (const key of keys) {
if (
!EXTRACTION_REQUIRED_KEYS.includes(
key as (typeof EXTRACTION_REQUIRED_KEYS)[number],
)
) {
return null;
}
}
const summary = sanitizeExtractionText(
payload.summary,
EXTRACTION_SUMMARY_MAX_LENGTH,
);
const keyPoints = sanitizeExtractionList(payload.keyPoints);
const redFlags = sanitizeExtractionList(payload.redFlags);
const followUpQuestions = sanitizeExtractionList(payload.followUpQuestions);
const portfolioSignals = sanitizeExtractionList(payload.portfolioSignals);
const segmentSpecificData = sanitizeExtractionList(
payload.segmentSpecificData,
);
const geographicRevenueBreakdown = sanitizeExtractionList(
payload.geographicRevenueBreakdown,
);
const companySpecificData = sanitizeExtractionList(
payload.companySpecificData,
);
const secApiCrossChecks = sanitizeExtractionList(payload.secApiCrossChecks);
const confidenceRaw =
typeof payload.confidence === "number"
? payload.confidence
: Number(payload.confidence);
if (
!summary ||
!keyPoints ||
!redFlags ||
!followUpQuestions ||
!portfolioSignals ||
!segmentSpecificData ||
!geographicRevenueBreakdown ||
!companySpecificData ||
!secApiCrossChecks ||
!Number.isFinite(confidenceRaw)
) {
return null;
}
return {
summary,
keyPoints,
redFlags,
followUpQuestions,
portfolioSignals,
segmentSpecificData,
geographicRevenueBreakdown,
companySpecificData,
secApiCrossChecks,
confidence: Math.min(Math.max(confidenceRaw, 0), 1),
};
}
function metricSnapshotLine(label: string, value: number | null | undefined) {
if (value === null || value === undefined || !Number.isFinite(value)) {
return `${label}: not reported`;
}
return `${label}: ${Math.round(value).toLocaleString("en-US")}`;
}
function buildSecApiCrossChecks(filing: Filing, filingText: string) {
const normalizedText = filingText.toLowerCase();
const checks: string[] = [];
for (const descriptor of METRIC_CHECK_PATTERNS) {
const value = filing.metrics?.[descriptor.key];
if (value === null || value === undefined || !Number.isFinite(value)) {
checks.push(
`${descriptor.label}: SEC API metric unavailable for this filing.`,
);
continue;
}
const hasMention = descriptor.patterns.some((pattern) =>
pattern.test(normalizedText),
);
if (hasMention) {
checks.push(
`${descriptor.label}: SEC API value ${Math.round(value).toLocaleString("en-US")} appears referenced in filing narrative.`,
);
} else {
checks.push(
`${descriptor.label}: SEC API value ${Math.round(value).toLocaleString("en-US")} was not confidently located in sampled filing text.`,
);
}
}
return uniqueExtractionList(checks);
}
function deterministicExtractionFallback(filing: Filing): FilingExtraction {
const metrics = filing.metrics;
return {
summary: `${filing.company_name} ${filing.filing_type} filed on ${filing.filing_date}. Deterministic extraction fallback was used because filing text parsing was unavailable or invalid.`,
keyPoints: [
`${filing.filing_type} filing recorded for ${filing.ticker}.`,
metricSnapshotLine("Revenue", metrics?.revenue),
metricSnapshotLine("Net income", metrics?.netIncome),
metricSnapshotLine("Total assets", metrics?.totalAssets),
],
redFlags: [
metricSnapshotLine("Cash", metrics?.cash),
metricSnapshotLine("Debt", metrics?.debt),
filing.primary_document
? "Primary document is indexed and available for review."
: "Primary document reference is unavailable in current filing metadata.",
],
followUpQuestions: [
"What changed versus the prior filing in guidance, margins, or liquidity?",
"Are any material risks under-emphasized relative to historical filings?",
"Should portfolio exposure be adjusted before the next reporting cycle?",
],
portfolioSignals: [
"Validate trend direction using at least two prior filings.",
"Cross-check leverage and liquidity metrics against position sizing rules.",
"Track language shifts around guidance or demand assumptions.",
],
segmentSpecificData: [
"Segment-level disclosures were not parsed in deterministic fallback mode.",
],
geographicRevenueBreakdown: [
"Geographic revenue disclosures were not parsed in deterministic fallback mode.",
],
companySpecificData: [
"Company-specific operating KPIs (for example same-store sales) were not parsed in deterministic fallback mode.",
],
secApiCrossChecks: [
`${metricSnapshotLine("Revenue", metrics?.revenue)} (SEC API baseline; text verification unavailable).`,
`${metricSnapshotLine("Net income", metrics?.netIncome)} (SEC API baseline; text verification unavailable).`,
],
confidence: 0.2,
};
}
function buildRuleBasedExtraction(
filing: Filing,
filingText: string,
): FilingExtraction {
const baseline = deterministicExtractionFallback(filing);
const segmentSpecificData = collectTextSignals(filingText, SEGMENT_PATTERNS);
const geographicRevenueBreakdown = collectTextSignals(
filingText,
GEOGRAPHIC_PATTERNS,
);
const companySpecificData = collectTextSignals(
filingText,
COMPANY_SPECIFIC_PATTERNS,
);
const secApiCrossChecks = buildSecApiCrossChecks(filing, filingText);
const segmentLead = segmentSpecificData[0]
? `Segment detail: ${segmentSpecificData[0]}`
: null;
const geographicLead = geographicRevenueBreakdown[0]
? `Geographic detail: ${geographicRevenueBreakdown[0]}`
: null;
const companyLead = companySpecificData[0]
? `Company-specific KPI: ${companySpecificData[0]}`
: null;
return {
summary: `${filing.company_name} ${filing.filing_type} filed on ${filing.filing_date}. SEC API metrics were retained as the baseline and filing text was scanned for segment and company-specific disclosures.`,
keyPoints: uniqueExtractionList([
...baseline.keyPoints,
segmentLead,
geographicLead,
companyLead,
]),
redFlags: uniqueExtractionList([
...baseline.redFlags,
secApiCrossChecks.find((line) => /not confidently located/i.test(line)),
]),
followUpQuestions: uniqueExtractionList([
...baseline.followUpQuestions,
segmentSpecificData.length > 0
? "How do segment trends change the consolidated margin outlook?"
: "Does management provide segment-level KPIs in supplemental exhibits?",
]),
portfolioSignals: uniqueExtractionList([
...baseline.portfolioSignals,
companySpecificData.length > 0
? "Incorporate company-specific KPI direction into near-term position sizing."
: "Track future filings for explicit operating KPI disclosures.",
]),
segmentSpecificData:
segmentSpecificData.length > 0
? segmentSpecificData
: baseline.segmentSpecificData,
geographicRevenueBreakdown:
geographicRevenueBreakdown.length > 0
? geographicRevenueBreakdown
: baseline.geographicRevenueBreakdown,
companySpecificData:
companySpecificData.length > 0
? companySpecificData
: baseline.companySpecificData,
secApiCrossChecks:
secApiCrossChecks.length > 0
? secApiCrossChecks
: baseline.secApiCrossChecks,
confidence:
segmentSpecificData.length +
geographicRevenueBreakdown.length +
companySpecificData.length >
0
? 0.4
: 0.3,
};
}
function preferExtractionList(primary: string[], fallback: string[]) {
return primary.length > 0 ? primary : fallback;
}
function mergeExtractionWithFallback(
primary: FilingExtraction,
fallback: FilingExtraction,
): FilingExtraction {
return {
summary: primary.summary || fallback.summary,
keyPoints: preferExtractionList(primary.keyPoints, fallback.keyPoints),
redFlags: preferExtractionList(primary.redFlags, fallback.redFlags),
followUpQuestions: preferExtractionList(
primary.followUpQuestions,
fallback.followUpQuestions,
),
portfolioSignals: preferExtractionList(
primary.portfolioSignals,
fallback.portfolioSignals,
),
segmentSpecificData: preferExtractionList(
primary.segmentSpecificData,
fallback.segmentSpecificData,
),
geographicRevenueBreakdown: preferExtractionList(
primary.geographicRevenueBreakdown,
fallback.geographicRevenueBreakdown,
),
companySpecificData: preferExtractionList(
primary.companySpecificData,
fallback.companySpecificData,
),
secApiCrossChecks: preferExtractionList(
primary.secApiCrossChecks,
fallback.secApiCrossChecks,
),
confidence: Math.min(Math.max(primary.confidence, 0), 1),
};
}
function extractionPrompt(filing: Filing, filingText: string) {
return [
"Extract structured signals from the SEC filing text.",
`Company: ${filing.company_name} (${filing.ticker})`,
`Form: ${filing.filing_type}`,
`Filed: ${filing.filing_date}`,
`SEC API baseline metrics: ${JSON.stringify(filing.metrics ?? {})}`,
"Use SEC API metrics as canonical numeric values and validate whether each appears consistent with filing text context.",
"Prioritize company-specific and segment-specific disclosures not covered by SEC endpoint fields (for example same-store sales, geographic mix, segment margin).",
"Return ONLY valid JSON with exactly these keys and no extra keys:",
'{"summary":"string","keyPoints":["string"],"redFlags":["string"],"followUpQuestions":["string"],"portfolioSignals":["string"],"segmentSpecificData":["string"],"geographicRevenueBreakdown":["string"],"companySpecificData":["string"],"secApiCrossChecks":["string"],"confidence":0}',
`Rules: every array max ${EXTRACTION_MAX_ITEMS} items; each item <= ${EXTRACTION_ITEM_MAX_LENGTH} chars; summary <= ${EXTRACTION_SUMMARY_MAX_LENGTH} chars; confidence between 0 and 1.`,
"Filing text follows:",
filingText,
].join("\n\n");
}
function reportPrompt(
filing: Filing,
extraction: FilingExtraction,
extractionMeta: FilingExtractionMeta,
) {
return [
"You are a fiscal research assistant focused on regulatory signals.",
`Analyze this SEC filing from ${filing.company_name} (${filing.ticker}).`,
`Form: ${filing.filing_type}`,
`Filed: ${filing.filing_date}`,
`SEC API baseline metrics: ${JSON.stringify(filing.metrics ?? {})}`,
`Structured extraction context (${extractionMeta.source}): ${JSON.stringify(extraction)}`,
"Use SEC API values as the baseline financials and explicitly reference segment/company-specific details from extraction.",
"Return concise sections: Thesis, Red Flags, Follow-up Questions, Portfolio Impact.",
].join("\n");
}
function filingLinks(filing: {
filingUrl: string | null;
submissionUrl: string | null;
}) {
const links: Array<{ link_type: string; url: string }> = [];
if (filing.filingUrl) {
links.push({ link_type: "primary_document", url: filing.filingUrl });
}
if (filing.submissionUrl) {
links.push({ link_type: "submission_index", url: filing.submissionUrl });
}
return links;
}
async function processSyncFilings(task: Task) {
const ticker = parseTicker(task.payload.ticker);
const limit = parseLimit(task.payload.limit, 20, 1, 50);
const category = parseOptionalText(task.payload.category);
const tags = parseTags(task.payload.tags);
const scopeLabel = [
category,
tags.length > 0 ? `tags: ${tags.join(", ")}` : null,
]
.filter((entry): entry is string => Boolean(entry))
.join(" | ");
let searchTaskId: string | null = null;
const tickerSubject = { ticker };
await setProjectionStage(
task,
"sync.fetch_filings",
`Fetching up to ${limit} filings for ${ticker}${scopeLabel ? ` (${scopeLabel})` : ""}`,
{ subject: tickerSubject },
);
const filings = await fetchRecentFilings(ticker, limit);
await setProjectionStage(
task,
"sync.persist_filings",
`Persisting ${filings.length} filings and source links`,
{
counters: { fetched: filings.length },
subject: tickerSubject,
},
);
const saveResult = await upsertFilingsRecords(
filings.map((filing) => ({
ticker: filing.ticker,
filing_type: filing.filingType,
filing_date: filing.filingDate,
accession_number: filing.accessionNumber,
cik: filing.cik,
company_name: filing.companyName,
filing_url: filing.filingUrl,
submission_url: filing.submissionUrl,
primary_document: filing.primaryDocument,
metrics: null,
links: filingLinks(filing),
})),
);
let taxonomySnapshotsHydrated = 0;
let taxonomySnapshotsFailed = 0;
const hydrateCandidates = (
await listFilingsRecords({
ticker,
limit: Math.min(Math.max(limit * 3, 40), STATEMENT_HYDRATION_MAX_FILINGS),
})
).filter((filing): filing is Filing & { filing_type: "10-K" | "10-Q" } => {
return isFinancialMetricsForm(filing.filing_type);
});
await setProjectionStage(
task,
"sync.discover_assets",
`Discovering taxonomy assets for ${hydrateCandidates.length} candidate filings`,
buildProgressContext({
current: 0,
total: hydrateCandidates.length,
unit: "filings",
counters: {
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,
hydrated: 0,
failed: 0,
},
subject: tickerSubject,
}),
);
const currentOverlay = await getIssuerOverlay(ticker);
const firstPass = await hydrateTaxonomySnapshotsForCandidates({
task,
ticker,
filingsCount: filings.length,
saveResult,
candidates: hydrateCandidates,
overlayRevisionId: currentOverlay?.active_revision_id ?? null,
});
taxonomySnapshotsHydrated += firstPass.hydrated;
taxonomySnapshotsFailed += firstPass.failed;
let overlayPublished = false;
let activeOverlayRevisionId = currentOverlay?.active_revision_id ?? null;
try {
await setProjectionStage(
task,
"sync.persist_taxonomy",
`Building issuer overlay for ${ticker}`,
{
counters: {
sampledFilings: Math.min(hydrateCandidates.length, 12),
hydrated: taxonomySnapshotsHydrated,
failed: taxonomySnapshotsFailed,
},
subject: tickerSubject,
},
);
const overlayResult = await generateIssuerOverlayForTicker(ticker);
overlayPublished = overlayResult.published;
activeOverlayRevisionId = overlayResult.activeRevisionId;
} catch (error) {
await recordIssuerOverlayBuildFailure(ticker, error);
console.error(`[issuer-overlay] failed for ${ticker}:`, error);
}
if (
overlayPublished &&
activeOverlayRevisionId !== null &&
activeOverlayRevisionId !== currentOverlay?.active_revision_id
) {
const prioritizedCandidates = [
...hydrateCandidates.filter((filing) =>
firstPass.touchedFilingIds.has(filing.id),
),
...hydrateCandidates.filter(
(filing) => !firstPass.touchedFilingIds.has(filing.id),
),
];
const uniqueCandidates = prioritizedCandidates.filter(
(filing, index, rows) => {
return (
rows.findIndex((candidate) => candidate.id === filing.id) === index
);
},
);
const rehydrateCandidates = uniqueCandidates.slice(
0,
Math.max(firstPass.touchedFilingIds.size, 8),
);
const secondPass = await hydrateTaxonomySnapshotsForCandidates({
task,
ticker,
filingsCount: filings.length,
saveResult,
candidates: rehydrateCandidates,
overlayRevisionId: activeOverlayRevisionId,
});
taxonomySnapshotsHydrated += secondPass.hydrated;
taxonomySnapshotsFailed += secondPass.failed;
}
try {
const searchTask = await enqueueTask({
userId: task.user_id,
taskType: "index_search",
payload: {
ticker,
sourceKinds: ["filing_document", "filing_brief"],
},
priority: 55,
resourceKey: `index_search:ticker:${ticker}`,
});
searchTaskId = searchTask.id;
} catch (error) {
console.error(`[search-index-sync] failed for ${ticker}:`, error);
}
const result = {
ticker,
category,
tags,
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,
taxonomySnapshotsHydrated,
taxonomySnapshotsFailed,
overlayPublished,
activeOverlayRevisionId,
searchTaskId,
};
return buildTaskOutcome(
result,
`Synced ${filings.length} filings for ${ticker}, hydrated ${taxonomySnapshotsHydrated} taxonomy snapshots, failed ${taxonomySnapshotsFailed}.`,
buildProgressContext({
current: hydrateCandidates.length,
total: hydrateCandidates.length || 1,
unit: "filings",
counters: {
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,
hydrated: taxonomySnapshotsHydrated,
failed: taxonomySnapshotsFailed,
},
subject: tickerSubject,
}),
);
}
async function processRefreshPrices(task: Task) {
const userId = task.user_id;
if (!userId) {
throw new Error("Task is missing user scope");
}
await setProjectionStage(
task,
"refresh.load_holdings",
"Loading holdings for price refresh",
);
const userHoldings = await listHoldingsForPriceRefresh(userId);
const tickers = [...new Set(userHoldings.map((entry) => entry.ticker))];
const quotes = new Map<string, number>();
const failedTickers: string[] = [];
const staleTickers: string[] = [];
const baseContext = {
counters: {
holdings: userHoldings.length,
},
} satisfies TaskStageContext;
await setProjectionStage(
task,
"refresh.load_holdings",
`Loaded ${userHoldings.length} holdings across ${tickers.length} tickers`,
baseContext,
);
await setProjectionStage(
task,
"refresh.fetch_quotes",
`Fetching quotes for ${tickers.length} tickers`,
buildProgressContext({
current: 0,
total: tickers.length,
unit: "tickers",
counters: {
holdings: userHoldings.length,
},
}),
);
for (let index = 0; index < tickers.length; index += 1) {
const ticker = tickers[index];
const quoteResult = await getQuote(ticker);
if (quoteResult.value !== null) {
quotes.set(ticker, quoteResult.value);
if (quoteResult.stale) {
staleTickers.push(ticker);
}
} else {
failedTickers.push(ticker);
}
await setProjectionStage(
task,
"refresh.fetch_quotes",
`Fetching quotes for ${tickers.length} tickers`,
buildProgressContext({
current: index + 1,
total: tickers.length,
unit: "tickers",
counters: {
holdings: userHoldings.length,
failed: failedTickers.length,
stale: staleTickers.length,
},
subject: { ticker },
}),
);
}
await setProjectionStage(
task,
"refresh.persist_prices",
`Writing refreshed prices for ${quotes.size} tickers across ${userHoldings.length} holdings`,
{
counters: {
holdings: userHoldings.length,
failed: failedTickers.length,
stale: staleTickers.length,
},
},
);
const updatedCount = await applyRefreshedPrices(
userId,
quotes,
new Date().toISOString(),
);
const result = {
updatedCount,
totalTickers: tickers.length,
failedTickers,
staleTickers,
};
const messageParts = [
`Refreshed prices for ${quotes.size}/${tickers.length} tickers`,
];
if (failedTickers.length > 0) {
messageParts.push(`(${failedTickers.length} unavailable)`);
}
if (staleTickers.length > 0) {
messageParts.push(`(${staleTickers.length} stale)`);
}
return buildTaskOutcome(
result,
`${messageParts.join(" ")} across ${userHoldings.length} holdings.`,
{
progress: {
current: tickers.length,
total: tickers.length || 1,
unit: "tickers",
},
counters: {
holdings: userHoldings.length,
updatedCount,
failed: failedTickers.length,
stale: staleTickers.length,
},
},
);
}
async function processAnalyzeFiling(task: Task) {
const accessionNumber =
typeof task.payload.accessionNumber === "string"
? task.payload.accessionNumber
: "";
if (!accessionNumber) {
throw new Error("accessionNumber is required");
}
await setProjectionStage(
task,
"analyze.load_filing",
`Loading filing ${accessionNumber}`,
{
subject: {
accessionNumber,
},
},
);
const filing = await getFilingByAccession(accessionNumber);
if (!filing) {
throw new Error(`Filing ${accessionNumber} not found`);
}
const analyzeSubject = {
ticker: filing.ticker,
accessionNumber,
label: filing.filing_type,
};
const defaultExtraction = deterministicExtractionFallback(filing);
let extraction = defaultExtraction;
let extractionMeta: FilingExtractionMeta = {
provider: "deterministic-fallback",
model: "metadata-fallback",
source: "metadata_fallback",
generatedAt: new Date().toISOString(),
};
let filingDocument: Awaited<
ReturnType<typeof fetchPrimaryFilingText>
> | null = null;
try {
await setProjectionStage(
task,
"analyze.fetch_document",
"Fetching primary filing document",
{
subject: analyzeSubject,
},
);
filingDocument = await fetchPrimaryFilingText({
filingUrl: filing.filing_url,
cik: filing.cik,
accessionNumber: filing.accession_number,
primaryDocument: filing.primary_document ?? null,
});
} catch {
filingDocument = null;
}
if (filingDocument?.text) {
await setProjectionStage(
task,
"analyze.extract",
"Generating extraction context from filing text",
{
subject: analyzeSubject,
},
);
const ruleBasedExtraction = buildRuleBasedExtraction(
filing,
filingDocument.text,
);
const extractionResult = await runAiAnalysis(
extractionPrompt(filing, filingDocument.text),
"Return strict JSON only.",
{ workload: "extraction" },
);
const parsed = parseExtractionPayload(extractionResult.text);
if (!parsed) {
throw new Error("Extraction output invalid JSON schema");
}
extraction = mergeExtractionWithFallback(parsed, ruleBasedExtraction);
extractionMeta = {
provider: "zhipu",
model: extractionResult.model,
source: filingDocument.source,
generatedAt: new Date().toISOString(),
};
}
await setProjectionStage(
task,
"analyze.generate_report",
"Generating final filing analysis report",
{
subject: analyzeSubject,
},
);
const analysis = await runAiAnalysis(
reportPrompt(filing, extraction, extractionMeta),
"Use concise institutional analyst language.",
{ workload: "report" },
);
await setProjectionStage(
task,
"analyze.persist_report",
"Persisting filing analysis output",
{
subject: analyzeSubject,
},
);
await saveFilingAnalysis(accessionNumber, {
provider: analysis.provider,
model: analysis.model,
text: analysis.text,
extraction,
extractionMeta,
});
let searchTaskId: string | null = null;
try {
const searchTask = await enqueueTask({
userId: task.user_id,
taskType: "index_search",
payload: {
accessionNumber,
sourceKinds: ["filing_brief"],
},
priority: 58,
resourceKey: `index_search:filing_brief:${accessionNumber}`,
});
searchTaskId = searchTask.id;
} catch (error) {
console.error(
`[search-index-analyze] failed for ${accessionNumber}:`,
error,
);
}
const result = {
ticker: filing.ticker,
accessionNumber,
filingType: filing.filing_type,
provider: analysis.provider,
model: analysis.model,
extractionProvider: extractionMeta.provider,
extractionModel: extractionMeta.model,
searchTaskId,
};
return buildTaskOutcome(
result,
`Analysis report generated for ${filing.ticker} ${filing.filing_type} ${accessionNumber}.`,
{
subject: analyzeSubject,
},
);
}
async function processIndexSearch(task: Task) {
await setProjectionStage(
task,
"search.collect_sources",
"Collecting source records for search indexing",
);
const ticker = parseOptionalText(task.payload.ticker);
const accessionNumber = parseOptionalText(task.payload.accessionNumber);
const journalEntryId =
task.payload.journalEntryId === undefined
? null
: Number(task.payload.journalEntryId);
const deleteSourceRefs = Array.isArray(task.payload.deleteSourceRefs)
? task.payload.deleteSourceRefs.filter(
(
entry,
): entry is {
sourceKind: string;
sourceRef: string;
scope: string;
userId?: string | null;
} => {
return Boolean(
entry &&
typeof entry === "object" &&
typeof (entry as { sourceKind?: unknown }).sourceKind ===
"string" &&
typeof (entry as { sourceRef?: unknown }).sourceRef === "string" &&
typeof (entry as { scope?: unknown }).scope === "string",
);
},
)
: [];
const sourceKinds = parseOptionalStringArray(task.payload.sourceKinds).filter(
(
sourceKind,
): sourceKind is "filing_document" | "filing_brief" | "research_note" => {
return (
sourceKind === "filing_document" ||
sourceKind === "filing_brief" ||
sourceKind === "research_note"
);
},
);
const validatedJournalEntryId =
typeof journalEntryId === "number" &&
Number.isInteger(journalEntryId) &&
journalEntryId > 0
? journalEntryId
: null;
const result = await indexSearchDocuments({
userId: task.user_id,
ticker,
accessionNumber,
journalEntryId: validatedJournalEntryId,
sourceKinds: sourceKinds.length > 0 ? sourceKinds : undefined,
deleteSourceRefs: deleteSourceRefs.map((entry) => ({
sourceKind: entry.sourceKind as
| "filing_document"
| "filing_brief"
| "research_note",
sourceRef: entry.sourceRef,
scope: entry.scope === "user" ? "user" : "global",
userId: typeof entry.userId === "string" ? entry.userId : null,
})),
onStage: async (stage, detail, context) => {
switch (stage) {
case "collect":
await setProjectionStage(
task,
"search.collect_sources",
detail,
context ?? {
subject: ticker
? { ticker }
: accessionNumber
? { accessionNumber }
: null,
},
);
break;
case "fetch":
await setProjectionStage(
task,
"search.fetch_documents",
detail,
context ?? null,
);
break;
case "chunk":
await setProjectionStage(
task,
"search.chunk",
detail,
context ?? null,
);
break;
case "embed":
await setProjectionStage(
task,
"search.embed",
detail,
context ?? null,
);
break;
case "persist":
await setProjectionStage(
task,
"search.persist",
detail,
context ?? null,
);
break;
}
},
});
const taskResult = {
ticker,
accessionNumber,
journalEntryId: validatedJournalEntryId,
...result,
};
return buildTaskOutcome(
taskResult,
`Indexed ${result.indexed} sources, embedded ${result.chunksEmbedded} chunks, skipped ${result.skipped}, deleted ${result.deleted} stale documents.`,
{
progress: {
current: result.sourcesCollected,
total: result.sourcesCollected || 1,
unit: "sources",
},
counters: {
sourcesCollected: result.sourcesCollected,
indexed: result.indexed,
chunksEmbedded: result.chunksEmbedded,
skipped: result.skipped,
deleted: result.deleted,
},
subject: ticker
? { ticker }
: accessionNumber
? { accessionNumber }
: null,
},
);
}
function holdingDigest(holdings: Holding[]) {
return holdings.map((holding) => ({
ticker: holding.ticker,
shares: holding.shares,
avgCost: holding.avg_cost,
currentPrice: holding.current_price,
marketValue: holding.market_value,
gainLoss: holding.gain_loss,
gainLossPct: holding.gain_loss_pct,
}));
}
async function processPortfolioInsights(task: Task) {
const userId = task.user_id;
if (!userId) {
throw new Error("Task is missing user scope");
}
await setProjectionStage(
task,
"insights.load_holdings",
"Loading holdings for portfolio insight generation",
);
const userHoldings = await listUserHoldings(userId);
const summary = buildPortfolioSummary(userHoldings);
const holdingsContext = {
counters: {
holdings: userHoldings.length,
},
} satisfies TaskStageContext;
await setProjectionStage(
task,
"insights.load_holdings",
`Loaded ${userHoldings.length} holdings for portfolio insight generation`,
holdingsContext,
);
const prompt = [
"Generate portfolio intelligence with actionable recommendations.",
`Portfolio summary: ${JSON.stringify(summary)}`,
`Holdings: ${JSON.stringify(holdingDigest(userHoldings))}`,
"Respond with: 1) health score (0-100), 2) top 3 risks, 3) top 3 opportunities, 4) next actions in 7 days.",
].join("\n");
await setProjectionStage(
task,
"insights.generate",
"Generating portfolio AI insight",
holdingsContext,
);
const analysis = await runAiAnalysis(
prompt,
"Act as a risk-aware buy-side analyst.",
{ workload: "report" },
);
await setProjectionStage(
task,
"insights.persist",
"Persisting generated portfolio insight",
holdingsContext,
);
await createPortfolioInsight({
userId,
provider: analysis.provider,
model: analysis.model,
content: analysis.text,
});
const result = {
provider: analysis.provider,
model: analysis.model,
summary,
};
return buildTaskOutcome(
result,
`Generated portfolio insight for ${summary.positions} holdings.`,
{
counters: {
holdings: summary.positions,
},
},
);
}
export const __taskProcessorInternals = {
parseExtractionPayload,
deterministicExtractionFallback,
isFinancialMetricsForm,
};
export async function runTaskProcessor(task: Task) {
switch (task.task_type) {
case "sync_filings":
return await processSyncFilings(task);
case "refresh_prices":
return await processRefreshPrices(task);
case "analyze_filing":
return await processAnalyzeFiling(task);
case "portfolio_insights":
return await processPortfolioInsights(task);
case "index_search":
return await processIndexSearch(task);
default:
throw new Error(`Unsupported task type: ${task.task_type}`);
}
}