import type { Filing, FilingExtraction, FilingExtractionMeta, Holding, Task, TaskStage, TaskStageContext, } from "@/lib/types"; import { runAiAnalysis } from "@/lib/server/ai"; import { buildPortfolioSummary } from "@/lib/server/portfolio"; import { getQuote } from "@/lib/server/prices"; import { indexSearchDocuments } from "@/lib/server/search"; import { getFilingByAccession, listFilingsRecords, saveFilingAnalysis, updateFilingMetricsById, upsertFilingsRecords, } from "@/lib/server/repos/filings"; import { deleteCompanyFinancialBundlesForTicker } from "@/lib/server/repos/company-financial-bundles"; import { getFilingTaxonomySnapshotByFilingId, normalizeFilingTaxonomySnapshotPayload, upsertFilingTaxonomySnapshot, } from "@/lib/server/repos/filing-taxonomy"; import { applyRefreshedPrices, listHoldingsForPriceRefresh, listUserHoldings, } from "@/lib/server/repos/holdings"; import { createPortfolioInsight } from "@/lib/server/repos/insights"; import { updateTaskStage } from "@/lib/server/repos/tasks"; import { fetchPrimaryFilingText, fetchRecentFilings } from "@/lib/server/sec"; import { generateIssuerOverlayForTicker, recordIssuerOverlayBuildFailure, } from "@/lib/server/issuer-overlays"; import { getActiveIssuerOverlayDefinition, getIssuerOverlay, } from "@/lib/server/repos/issuer-overlays"; import { enqueueTask } from "@/lib/server/tasks"; import { hydrateFilingTaxonomySnapshot } from "@/lib/server/taxonomy/engine"; import { validateMetricsWithPdfLlm } from "@/lib/server/taxonomy/pdf-validation"; const EXTRACTION_REQUIRED_KEYS = [ "summary", "keyPoints", "redFlags", "followUpQuestions", "portfolioSignals", "segmentSpecificData", "geographicRevenueBreakdown", "companySpecificData", "secApiCrossChecks", "confidence", ] as const; const EXTRACTION_MAX_ITEMS = 6; const EXTRACTION_ITEM_MAX_LENGTH = 280; const EXTRACTION_SUMMARY_MAX_LENGTH = 900; const STATEMENT_HYDRATION_DELAY_MS = 120; const STATEMENT_HYDRATION_MAX_FILINGS = 80; const SEGMENT_PATTERNS = [ /\boperating segment\b/i, /\bsegment revenue\b/i, /\bsegment margin\b/i, /\bsegment profit\b/i, /\bbusiness segment\b/i, /\breportable segment\b/i, ]; const GEOGRAPHIC_PATTERNS = [ /\bgeographic\b/i, /\bamericas\b/i, /\bemea\b/i, /\bapac\b/i, /\basia pacific\b/i, /\bnorth america\b/i, /\beurope\b/i, /\bchina\b/i, /\binternational\b/i, ]; const COMPANY_SPECIFIC_PATTERNS = [ /\bsame[- ]store\b/i, /\bcomparable[- ]store\b/i, /\bcomp sales\b/i, /\borganic sales\b/i, /\bbookings\b/i, /\bbacklog\b/i, /\barpu\b/i, /\bmau\b/i, /\bdau\b/i, /\bsubscriber\b/i, /\boccupancy\b/i, /\brevpar\b/i, /\bretention\b/i, /\bchurn\b/i, ]; type FilingMetricKey = keyof NonNullable; function isFinancialMetricsForm( filingType: string, ): filingType is "10-K" | "10-Q" { return filingType === "10-K" || filingType === "10-Q"; } const METRIC_CHECK_PATTERNS: Array<{ key: FilingMetricKey; label: string; patterns: RegExp[]; }> = [ { key: "revenue", label: "Revenue", patterns: [/\brevenue\b/i, /\bsales\b/i], }, { key: "netIncome", label: "Net income", patterns: [/\bnet income\b/i, /\bprofit\b/i], }, { key: "totalAssets", label: "Total assets", patterns: [/\btotal assets\b/i, /\bassets\b/i], }, { key: "cash", label: "Cash", patterns: [/\bcash\b/i, /\bcash equivalents\b/i], }, { key: "debt", label: "Debt", patterns: [/\bdebt\b/i, /\bborrowings\b/i, /\bliabilit(?:y|ies)\b/i], }, ]; function toTaskResult(value: unknown): Record { if (!value || typeof value !== "object" || Array.isArray(value)) { return { value }; } return value as Record; } export type TaskExecutionOutcome = { result: Record; completionDetail: string; completionContext?: TaskStageContext | null; }; type TaxonomyHydrationRunResult = { hydrated: number; failed: number; touchedFilingIds: Set; }; function buildTaskOutcome( result: unknown, completionDetail: string, completionContext: TaskStageContext | null = null, ): TaskExecutionOutcome { return { result: toTaskResult(result), completionDetail, completionContext, }; } function shouldRefreshTaxonomySnapshot(input: { existingSnapshot: Awaited< ReturnType > | null; filingUpdatedAt: string; overlayRevisionId: number | null; }) { if (!input.existingSnapshot) { return true; } if ( Date.parse(input.existingSnapshot.updated_at) < Date.parse(input.filingUpdatedAt) ) { return true; } return ( (input.existingSnapshot.issuer_overlay_revision_id ?? null) !== input.overlayRevisionId ); } async function hydrateTaxonomySnapshotsForCandidates(input: { task: Task; ticker: string; filingsCount: number; saveResult: { inserted: number; updated: number }; candidates: Array; overlayRevisionId: number | null; }) { const overlayDefinition = input.overlayRevisionId === null ? null : await getActiveIssuerOverlayDefinition(input.ticker); let hydrated = 0; let failed = 0; const touchedFilingIds = new Set(); for (let index = 0; index < input.candidates.length; index += 1) { const filing = input.candidates[index]; const existingSnapshot = await getFilingTaxonomySnapshotByFilingId( filing.id, ); if ( !shouldRefreshTaxonomySnapshot({ existingSnapshot, filingUpdatedAt: filing.updated_at, overlayRevisionId: input.overlayRevisionId, }) ) { continue; } touchedFilingIds.add(filing.id); const stageContext = (stage: TaskStage) => buildProgressContext({ current: index + 1, total: input.candidates.length, unit: "filings", counters: { fetched: input.filingsCount, inserted: input.saveResult.inserted, updated: input.saveResult.updated, hydrated, failed, }, subject: { ticker: input.ticker, accessionNumber: filing.accession_number, label: stage, }, }); try { await setProjectionStage( input.task, "sync.extract_taxonomy", `Extracting XBRL taxonomy for ${filing.accession_number}`, stageContext("sync.extract_taxonomy"), ); const snapshot = await hydrateFilingTaxonomySnapshot({ filingId: filing.id, ticker: filing.ticker, cik: filing.cik, accessionNumber: filing.accession_number, filingDate: filing.filing_date, filingType: filing.filing_type, filingUrl: filing.filing_url, primaryDocument: filing.primary_document ?? null, issuerOverlay: overlayDefinition, }); let pdfValidation = { validation_result: snapshot.validation_result, metric_validations: snapshot.metric_validations, }; try { pdfValidation = await validateMetricsWithPdfLlm({ metrics: snapshot.derived_metrics, assets: snapshot.assets, }); } catch (error) { const message = error instanceof Error ? error.message : "PDF metric validation failed"; pdfValidation = { validation_result: { status: "error", checks: [], validatedAt: new Date().toISOString(), }, metric_validations: snapshot.metric_validations.map((check) => ({ ...check, error: check.error ?? message, })), }; } const normalizedSnapshot = { ...snapshot, validation_result: pdfValidation.validation_result, metric_validations: pdfValidation.metric_validations, issuer_overlay_revision_id: input.overlayRevisionId, ...normalizeFilingTaxonomySnapshotPayload(snapshot), }; await setProjectionStage( input.task, "sync.normalize_taxonomy", `Materializing statements for ${filing.accession_number}`, stageContext("sync.normalize_taxonomy"), ); await setProjectionStage( input.task, "sync.derive_metrics", `Deriving taxonomy metrics for ${filing.accession_number}`, stageContext("sync.derive_metrics"), ); await setProjectionStage( input.task, "sync.validate_pdf_metrics", `Validating metrics via PDF + LLM for ${filing.accession_number}`, stageContext("sync.validate_pdf_metrics"), ); await setProjectionStage( input.task, "sync.persist_taxonomy", `Persisting taxonomy snapshot for ${filing.accession_number}`, stageContext("sync.persist_taxonomy"), ); await upsertFilingTaxonomySnapshot(normalizedSnapshot); await updateFilingMetricsById( filing.id, normalizedSnapshot.derived_metrics, ); await deleteCompanyFinancialBundlesForTicker(filing.ticker); hydrated += 1; } catch (error) { const now = new Date().toISOString(); await upsertFilingTaxonomySnapshot({ filing_id: filing.id, ticker: filing.ticker, filing_date: filing.filing_date, filing_type: filing.filing_type, parse_status: "failed", parse_error: error instanceof Error ? error.message : "Taxonomy hydration failed", source: "legacy_html_fallback", parser_engine: "fiscal-xbrl", parser_version: "unknown", taxonomy_regime: "unknown", fiscal_pack: "core", periods: [], faithful_rows: { income: [], balance: [], cash_flow: [], equity: [], comprehensive_income: [], disclosure: [], }, statement_rows: { income: [], balance: [], cash_flow: [], equity: [], comprehensive_income: [], disclosure: [], }, surface_rows: { income: [], balance: [], cash_flow: [], equity: [], comprehensive_income: [], disclosure: [], }, detail_rows: { income: {}, balance: {}, cash_flow: {}, equity: {}, comprehensive_income: {}, disclosure: {}, }, kpi_rows: [], computed_definitions: [], contexts: [], derived_metrics: filing.metrics ?? null, validation_result: { status: "error", checks: [], validatedAt: now, }, normalization_summary: { surfaceRowCount: 0, detailRowCount: 0, kpiRowCount: 0, unmappedRowCount: 0, materialUnmappedRowCount: 0, residualPrimaryCount: 0, residualDisclosureCount: 0, unsupportedConceptCount: 0, issuerOverlayMatchCount: 0, warnings: [], }, issuer_overlay_revision_id: input.overlayRevisionId, facts_count: 0, concepts_count: 0, dimensions_count: 0, assets: [], concepts: [], facts: [], metric_validations: [], }); await deleteCompanyFinancialBundlesForTicker(filing.ticker); failed += 1; } await Bun.sleep(STATEMENT_HYDRATION_DELAY_MS); } return { hydrated, failed, touchedFilingIds, } satisfies TaxonomyHydrationRunResult; } async function setProjectionStage( task: Task, stage: TaskStage, detail: string | null = null, context: TaskStageContext | null = null, ) { await updateTaskStage(task.id, stage, detail, context); } function buildProgressContext(input: { current: number; total: number; unit: string; counters?: Record; subject?: TaskStageContext["subject"]; }): TaskStageContext { return { progress: { current: input.current, total: input.total, unit: input.unit, }, counters: input.counters, subject: input.subject, }; } function parseTicker(raw: unknown) { if (typeof raw !== "string" || raw.trim().length < 1) { throw new Error("Ticker is required"); } return raw.trim().toUpperCase(); } function parseLimit(raw: unknown, fallback: number, min: number, max: number) { const numberValue = typeof raw === "number" ? raw : Number(raw); if (!Number.isFinite(numberValue)) { return fallback; } const intValue = Math.trunc(numberValue); return Math.min(Math.max(intValue, min), max); } function parseOptionalText(raw: unknown) { if (typeof raw !== "string") { return null; } const normalized = raw.trim(); return normalized.length > 0 ? normalized : null; } function parseOptionalStringArray(raw: unknown) { if (!Array.isArray(raw)) { return []; } return raw .filter((entry): entry is string => typeof entry === "string") .map((entry) => entry.trim()) .filter((entry) => entry.length > 0); } function parseTags(raw: unknown) { if (!Array.isArray(raw)) { return []; } const unique = new Set(); for (const entry of raw) { if (typeof entry !== "string") { continue; } const tag = entry.trim(); if (!tag) { continue; } unique.add(tag); } return [...unique]; } function sanitizeExtractionText(value: unknown, maxLength: number) { if (typeof value !== "string") { return null; } const collapsed = value.replace(/\s+/g, " ").trim(); if (!collapsed) { return null; } return collapsed.slice(0, maxLength); } function sanitizeExtractionList(value: unknown) { if (!Array.isArray(value)) { return null; } const cleaned: string[] = []; for (const entry of value) { const normalized = sanitizeExtractionText( entry, EXTRACTION_ITEM_MAX_LENGTH, ); if (!normalized) { continue; } cleaned.push(normalized); if (cleaned.length >= EXTRACTION_MAX_ITEMS) { break; } } return cleaned; } function uniqueExtractionList(items: Array) { const seen = new Set(); const unique: string[] = []; for (const item of items) { const normalized = sanitizeExtractionText(item, EXTRACTION_ITEM_MAX_LENGTH); if (!normalized) { continue; } const signature = normalized.toLowerCase(); if (seen.has(signature)) { continue; } seen.add(signature); unique.push(normalized); if (unique.length >= EXTRACTION_MAX_ITEMS) { break; } } return unique; } function collectTextSignals(filingText: string, patterns: RegExp[]) { const lines = filingText .replace(/\r/g, "\n") .split(/\n+/) .map((line) => line.replace(/\s+/g, " ").trim()) .filter((line) => line.length >= 24); const matches: string[] = []; for (const line of lines) { if (!patterns.some((pattern) => pattern.test(line))) { continue; } matches.push(line); if (matches.length >= EXTRACTION_MAX_ITEMS * 2) { break; } } return uniqueExtractionList(matches); } function parseExtractionPayload(raw: string): FilingExtraction | null { const fencedJson = raw.match(/```(?:json)?\s*([\s\S]*?)```/i)?.[1]; const candidate = fencedJson ?? (() => { const start = raw.indexOf("{"); const end = raw.lastIndexOf("}"); return start >= 0 && end > start ? raw.slice(start, end + 1) : null; })(); if (!candidate) { return null; } let parsed: unknown; try { parsed = JSON.parse(candidate); } catch { return null; } if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { return null; } const payload = parsed as Record; const keys = Object.keys(payload); if (keys.length !== EXTRACTION_REQUIRED_KEYS.length) { return null; } for (const key of EXTRACTION_REQUIRED_KEYS) { if (!(key in payload)) { return null; } } for (const key of keys) { if ( !EXTRACTION_REQUIRED_KEYS.includes( key as (typeof EXTRACTION_REQUIRED_KEYS)[number], ) ) { return null; } } const summary = sanitizeExtractionText( payload.summary, EXTRACTION_SUMMARY_MAX_LENGTH, ); const keyPoints = sanitizeExtractionList(payload.keyPoints); const redFlags = sanitizeExtractionList(payload.redFlags); const followUpQuestions = sanitizeExtractionList(payload.followUpQuestions); const portfolioSignals = sanitizeExtractionList(payload.portfolioSignals); const segmentSpecificData = sanitizeExtractionList( payload.segmentSpecificData, ); const geographicRevenueBreakdown = sanitizeExtractionList( payload.geographicRevenueBreakdown, ); const companySpecificData = sanitizeExtractionList( payload.companySpecificData, ); const secApiCrossChecks = sanitizeExtractionList(payload.secApiCrossChecks); const confidenceRaw = typeof payload.confidence === "number" ? payload.confidence : Number(payload.confidence); if ( !summary || !keyPoints || !redFlags || !followUpQuestions || !portfolioSignals || !segmentSpecificData || !geographicRevenueBreakdown || !companySpecificData || !secApiCrossChecks || !Number.isFinite(confidenceRaw) ) { return null; } return { summary, keyPoints, redFlags, followUpQuestions, portfolioSignals, segmentSpecificData, geographicRevenueBreakdown, companySpecificData, secApiCrossChecks, confidence: Math.min(Math.max(confidenceRaw, 0), 1), }; } function metricSnapshotLine(label: string, value: number | null | undefined) { if (value === null || value === undefined || !Number.isFinite(value)) { return `${label}: not reported`; } return `${label}: ${Math.round(value).toLocaleString("en-US")}`; } function buildSecApiCrossChecks(filing: Filing, filingText: string) { const normalizedText = filingText.toLowerCase(); const checks: string[] = []; for (const descriptor of METRIC_CHECK_PATTERNS) { const value = filing.metrics?.[descriptor.key]; if (value === null || value === undefined || !Number.isFinite(value)) { checks.push( `${descriptor.label}: SEC API metric unavailable for this filing.`, ); continue; } const hasMention = descriptor.patterns.some((pattern) => pattern.test(normalizedText), ); if (hasMention) { checks.push( `${descriptor.label}: SEC API value ${Math.round(value).toLocaleString("en-US")} appears referenced in filing narrative.`, ); } else { checks.push( `${descriptor.label}: SEC API value ${Math.round(value).toLocaleString("en-US")} was not confidently located in sampled filing text.`, ); } } return uniqueExtractionList(checks); } function deterministicExtractionFallback(filing: Filing): FilingExtraction { const metrics = filing.metrics; return { summary: `${filing.company_name} ${filing.filing_type} filed on ${filing.filing_date}. Deterministic extraction fallback was used because filing text parsing was unavailable or invalid.`, keyPoints: [ `${filing.filing_type} filing recorded for ${filing.ticker}.`, metricSnapshotLine("Revenue", metrics?.revenue), metricSnapshotLine("Net income", metrics?.netIncome), metricSnapshotLine("Total assets", metrics?.totalAssets), ], redFlags: [ metricSnapshotLine("Cash", metrics?.cash), metricSnapshotLine("Debt", metrics?.debt), filing.primary_document ? "Primary document is indexed and available for review." : "Primary document reference is unavailable in current filing metadata.", ], followUpQuestions: [ "What changed versus the prior filing in guidance, margins, or liquidity?", "Are any material risks under-emphasized relative to historical filings?", "Should portfolio exposure be adjusted before the next reporting cycle?", ], portfolioSignals: [ "Validate trend direction using at least two prior filings.", "Cross-check leverage and liquidity metrics against position sizing rules.", "Track language shifts around guidance or demand assumptions.", ], segmentSpecificData: [ "Segment-level disclosures were not parsed in deterministic fallback mode.", ], geographicRevenueBreakdown: [ "Geographic revenue disclosures were not parsed in deterministic fallback mode.", ], companySpecificData: [ "Company-specific operating KPIs (for example same-store sales) were not parsed in deterministic fallback mode.", ], secApiCrossChecks: [ `${metricSnapshotLine("Revenue", metrics?.revenue)} (SEC API baseline; text verification unavailable).`, `${metricSnapshotLine("Net income", metrics?.netIncome)} (SEC API baseline; text verification unavailable).`, ], confidence: 0.2, }; } function buildRuleBasedExtraction( filing: Filing, filingText: string, ): FilingExtraction { const baseline = deterministicExtractionFallback(filing); const segmentSpecificData = collectTextSignals(filingText, SEGMENT_PATTERNS); const geographicRevenueBreakdown = collectTextSignals( filingText, GEOGRAPHIC_PATTERNS, ); const companySpecificData = collectTextSignals( filingText, COMPANY_SPECIFIC_PATTERNS, ); const secApiCrossChecks = buildSecApiCrossChecks(filing, filingText); const segmentLead = segmentSpecificData[0] ? `Segment detail: ${segmentSpecificData[0]}` : null; const geographicLead = geographicRevenueBreakdown[0] ? `Geographic detail: ${geographicRevenueBreakdown[0]}` : null; const companyLead = companySpecificData[0] ? `Company-specific KPI: ${companySpecificData[0]}` : null; return { summary: `${filing.company_name} ${filing.filing_type} filed on ${filing.filing_date}. SEC API metrics were retained as the baseline and filing text was scanned for segment and company-specific disclosures.`, keyPoints: uniqueExtractionList([ ...baseline.keyPoints, segmentLead, geographicLead, companyLead, ]), redFlags: uniqueExtractionList([ ...baseline.redFlags, secApiCrossChecks.find((line) => /not confidently located/i.test(line)), ]), followUpQuestions: uniqueExtractionList([ ...baseline.followUpQuestions, segmentSpecificData.length > 0 ? "How do segment trends change the consolidated margin outlook?" : "Does management provide segment-level KPIs in supplemental exhibits?", ]), portfolioSignals: uniqueExtractionList([ ...baseline.portfolioSignals, companySpecificData.length > 0 ? "Incorporate company-specific KPI direction into near-term position sizing." : "Track future filings for explicit operating KPI disclosures.", ]), segmentSpecificData: segmentSpecificData.length > 0 ? segmentSpecificData : baseline.segmentSpecificData, geographicRevenueBreakdown: geographicRevenueBreakdown.length > 0 ? geographicRevenueBreakdown : baseline.geographicRevenueBreakdown, companySpecificData: companySpecificData.length > 0 ? companySpecificData : baseline.companySpecificData, secApiCrossChecks: secApiCrossChecks.length > 0 ? secApiCrossChecks : baseline.secApiCrossChecks, confidence: segmentSpecificData.length + geographicRevenueBreakdown.length + companySpecificData.length > 0 ? 0.4 : 0.3, }; } function preferExtractionList(primary: string[], fallback: string[]) { return primary.length > 0 ? primary : fallback; } function mergeExtractionWithFallback( primary: FilingExtraction, fallback: FilingExtraction, ): FilingExtraction { return { summary: primary.summary || fallback.summary, keyPoints: preferExtractionList(primary.keyPoints, fallback.keyPoints), redFlags: preferExtractionList(primary.redFlags, fallback.redFlags), followUpQuestions: preferExtractionList( primary.followUpQuestions, fallback.followUpQuestions, ), portfolioSignals: preferExtractionList( primary.portfolioSignals, fallback.portfolioSignals, ), segmentSpecificData: preferExtractionList( primary.segmentSpecificData, fallback.segmentSpecificData, ), geographicRevenueBreakdown: preferExtractionList( primary.geographicRevenueBreakdown, fallback.geographicRevenueBreakdown, ), companySpecificData: preferExtractionList( primary.companySpecificData, fallback.companySpecificData, ), secApiCrossChecks: preferExtractionList( primary.secApiCrossChecks, fallback.secApiCrossChecks, ), confidence: Math.min(Math.max(primary.confidence, 0), 1), }; } function extractionPrompt(filing: Filing, filingText: string) { return [ "Extract structured signals from the SEC filing text.", `Company: ${filing.company_name} (${filing.ticker})`, `Form: ${filing.filing_type}`, `Filed: ${filing.filing_date}`, `SEC API baseline metrics: ${JSON.stringify(filing.metrics ?? {})}`, "Use SEC API metrics as canonical numeric values and validate whether each appears consistent with filing text context.", "Prioritize company-specific and segment-specific disclosures not covered by SEC endpoint fields (for example same-store sales, geographic mix, segment margin).", "Return ONLY valid JSON with exactly these keys and no extra keys:", '{"summary":"string","keyPoints":["string"],"redFlags":["string"],"followUpQuestions":["string"],"portfolioSignals":["string"],"segmentSpecificData":["string"],"geographicRevenueBreakdown":["string"],"companySpecificData":["string"],"secApiCrossChecks":["string"],"confidence":0}', `Rules: every array max ${EXTRACTION_MAX_ITEMS} items; each item <= ${EXTRACTION_ITEM_MAX_LENGTH} chars; summary <= ${EXTRACTION_SUMMARY_MAX_LENGTH} chars; confidence between 0 and 1.`, "Filing text follows:", filingText, ].join("\n\n"); } function reportPrompt( filing: Filing, extraction: FilingExtraction, extractionMeta: FilingExtractionMeta, ) { return [ "You are a fiscal research assistant focused on regulatory signals.", `Analyze this SEC filing from ${filing.company_name} (${filing.ticker}).`, `Form: ${filing.filing_type}`, `Filed: ${filing.filing_date}`, `SEC API baseline metrics: ${JSON.stringify(filing.metrics ?? {})}`, `Structured extraction context (${extractionMeta.source}): ${JSON.stringify(extraction)}`, "Use SEC API values as the baseline financials and explicitly reference segment/company-specific details from extraction.", "Return concise sections: Thesis, Red Flags, Follow-up Questions, Portfolio Impact.", ].join("\n"); } function filingLinks(filing: { filingUrl: string | null; submissionUrl: string | null; }) { const links: Array<{ link_type: string; url: string }> = []; if (filing.filingUrl) { links.push({ link_type: "primary_document", url: filing.filingUrl }); } if (filing.submissionUrl) { links.push({ link_type: "submission_index", url: filing.submissionUrl }); } return links; } async function processSyncFilings(task: Task) { const ticker = parseTicker(task.payload.ticker); const limit = parseLimit(task.payload.limit, 20, 1, 50); const category = parseOptionalText(task.payload.category); const tags = parseTags(task.payload.tags); const scopeLabel = [ category, tags.length > 0 ? `tags: ${tags.join(", ")}` : null, ] .filter((entry): entry is string => Boolean(entry)) .join(" | "); let searchTaskId: string | null = null; const tickerSubject = { ticker }; await setProjectionStage( task, "sync.fetch_filings", `Fetching up to ${limit} filings for ${ticker}${scopeLabel ? ` (${scopeLabel})` : ""}`, { subject: tickerSubject }, ); const filings = await fetchRecentFilings(ticker, limit); await setProjectionStage( task, "sync.persist_filings", `Persisting ${filings.length} filings and source links`, { counters: { fetched: filings.length }, subject: tickerSubject, }, ); const saveResult = await upsertFilingsRecords( filings.map((filing) => ({ ticker: filing.ticker, filing_type: filing.filingType, filing_date: filing.filingDate, accession_number: filing.accessionNumber, cik: filing.cik, company_name: filing.companyName, filing_url: filing.filingUrl, submission_url: filing.submissionUrl, primary_document: filing.primaryDocument, metrics: null, links: filingLinks(filing), })), ); let taxonomySnapshotsHydrated = 0; let taxonomySnapshotsFailed = 0; const hydrateCandidates = ( await listFilingsRecords({ ticker, limit: Math.min(Math.max(limit * 3, 40), STATEMENT_HYDRATION_MAX_FILINGS), }) ).filter((filing): filing is Filing & { filing_type: "10-K" | "10-Q" } => { return isFinancialMetricsForm(filing.filing_type); }); await setProjectionStage( task, "sync.discover_assets", `Discovering taxonomy assets for ${hydrateCandidates.length} candidate filings`, buildProgressContext({ current: 0, total: hydrateCandidates.length, unit: "filings", counters: { fetched: filings.length, inserted: saveResult.inserted, updated: saveResult.updated, hydrated: 0, failed: 0, }, subject: tickerSubject, }), ); const currentOverlay = await getIssuerOverlay(ticker); const firstPass = await hydrateTaxonomySnapshotsForCandidates({ task, ticker, filingsCount: filings.length, saveResult, candidates: hydrateCandidates, overlayRevisionId: currentOverlay?.active_revision_id ?? null, }); taxonomySnapshotsHydrated += firstPass.hydrated; taxonomySnapshotsFailed += firstPass.failed; let overlayPublished = false; let activeOverlayRevisionId = currentOverlay?.active_revision_id ?? null; try { await setProjectionStage( task, "sync.persist_taxonomy", `Building issuer overlay for ${ticker}`, { counters: { sampledFilings: Math.min(hydrateCandidates.length, 12), hydrated: taxonomySnapshotsHydrated, failed: taxonomySnapshotsFailed, }, subject: tickerSubject, }, ); const overlayResult = await generateIssuerOverlayForTicker(ticker); overlayPublished = overlayResult.published; activeOverlayRevisionId = overlayResult.activeRevisionId; } catch (error) { await recordIssuerOverlayBuildFailure(ticker, error); console.error(`[issuer-overlay] failed for ${ticker}:`, error); } if ( overlayPublished && activeOverlayRevisionId !== null && activeOverlayRevisionId !== currentOverlay?.active_revision_id ) { const prioritizedCandidates = [ ...hydrateCandidates.filter((filing) => firstPass.touchedFilingIds.has(filing.id), ), ...hydrateCandidates.filter( (filing) => !firstPass.touchedFilingIds.has(filing.id), ), ]; const uniqueCandidates = prioritizedCandidates.filter( (filing, index, rows) => { return ( rows.findIndex((candidate) => candidate.id === filing.id) === index ); }, ); const rehydrateCandidates = uniqueCandidates.slice( 0, Math.max(firstPass.touchedFilingIds.size, 8), ); const secondPass = await hydrateTaxonomySnapshotsForCandidates({ task, ticker, filingsCount: filings.length, saveResult, candidates: rehydrateCandidates, overlayRevisionId: activeOverlayRevisionId, }); taxonomySnapshotsHydrated += secondPass.hydrated; taxonomySnapshotsFailed += secondPass.failed; } try { const searchTask = await enqueueTask({ userId: task.user_id, taskType: "index_search", payload: { ticker, sourceKinds: ["filing_document", "filing_brief"], }, priority: 55, resourceKey: `index_search:ticker:${ticker}`, }); searchTaskId = searchTask.id; } catch (error) { console.error(`[search-index-sync] failed for ${ticker}:`, error); } const result = { ticker, category, tags, fetched: filings.length, inserted: saveResult.inserted, updated: saveResult.updated, taxonomySnapshotsHydrated, taxonomySnapshotsFailed, overlayPublished, activeOverlayRevisionId, searchTaskId, }; return buildTaskOutcome( result, `Synced ${filings.length} filings for ${ticker}, hydrated ${taxonomySnapshotsHydrated} taxonomy snapshots, failed ${taxonomySnapshotsFailed}.`, buildProgressContext({ current: hydrateCandidates.length, total: hydrateCandidates.length || 1, unit: "filings", counters: { fetched: filings.length, inserted: saveResult.inserted, updated: saveResult.updated, hydrated: taxonomySnapshotsHydrated, failed: taxonomySnapshotsFailed, }, subject: tickerSubject, }), ); } async function processRefreshPrices(task: Task) { const userId = task.user_id; if (!userId) { throw new Error("Task is missing user scope"); } await setProjectionStage( task, "refresh.load_holdings", "Loading holdings for price refresh", ); const userHoldings = await listHoldingsForPriceRefresh(userId); const tickers = [...new Set(userHoldings.map((entry) => entry.ticker))]; const quotes = new Map(); const failedTickers: string[] = []; const staleTickers: string[] = []; const baseContext = { counters: { holdings: userHoldings.length, }, } satisfies TaskStageContext; await setProjectionStage( task, "refresh.load_holdings", `Loaded ${userHoldings.length} holdings across ${tickers.length} tickers`, baseContext, ); await setProjectionStage( task, "refresh.fetch_quotes", `Fetching quotes for ${tickers.length} tickers`, buildProgressContext({ current: 0, total: tickers.length, unit: "tickers", counters: { holdings: userHoldings.length, }, }), ); for (let index = 0; index < tickers.length; index += 1) { const ticker = tickers[index]; const quoteResult = await getQuote(ticker); if (quoteResult.value !== null) { quotes.set(ticker, quoteResult.value); if (quoteResult.stale) { staleTickers.push(ticker); } } else { failedTickers.push(ticker); } await setProjectionStage( task, "refresh.fetch_quotes", `Fetching quotes for ${tickers.length} tickers`, buildProgressContext({ current: index + 1, total: tickers.length, unit: "tickers", counters: { holdings: userHoldings.length, failed: failedTickers.length, stale: staleTickers.length, }, subject: { ticker }, }), ); } await setProjectionStage( task, "refresh.persist_prices", `Writing refreshed prices for ${quotes.size} tickers across ${userHoldings.length} holdings`, { counters: { holdings: userHoldings.length, failed: failedTickers.length, stale: staleTickers.length, }, }, ); const updatedCount = await applyRefreshedPrices( userId, quotes, new Date().toISOString(), ); const result = { updatedCount, totalTickers: tickers.length, failedTickers, staleTickers, }; const messageParts = [ `Refreshed prices for ${quotes.size}/${tickers.length} tickers`, ]; if (failedTickers.length > 0) { messageParts.push(`(${failedTickers.length} unavailable)`); } if (staleTickers.length > 0) { messageParts.push(`(${staleTickers.length} stale)`); } return buildTaskOutcome( result, `${messageParts.join(" ")} across ${userHoldings.length} holdings.`, { progress: { current: tickers.length, total: tickers.length || 1, unit: "tickers", }, counters: { holdings: userHoldings.length, updatedCount, failed: failedTickers.length, stale: staleTickers.length, }, }, ); } async function processAnalyzeFiling(task: Task) { const accessionNumber = typeof task.payload.accessionNumber === "string" ? task.payload.accessionNumber : ""; if (!accessionNumber) { throw new Error("accessionNumber is required"); } await setProjectionStage( task, "analyze.load_filing", `Loading filing ${accessionNumber}`, { subject: { accessionNumber, }, }, ); const filing = await getFilingByAccession(accessionNumber); if (!filing) { throw new Error(`Filing ${accessionNumber} not found`); } const analyzeSubject = { ticker: filing.ticker, accessionNumber, label: filing.filing_type, }; const defaultExtraction = deterministicExtractionFallback(filing); let extraction = defaultExtraction; let extractionMeta: FilingExtractionMeta = { provider: "deterministic-fallback", model: "metadata-fallback", source: "metadata_fallback", generatedAt: new Date().toISOString(), }; let filingDocument: Awaited< ReturnType > | null = null; try { await setProjectionStage( task, "analyze.fetch_document", "Fetching primary filing document", { subject: analyzeSubject, }, ); filingDocument = await fetchPrimaryFilingText({ filingUrl: filing.filing_url, cik: filing.cik, accessionNumber: filing.accession_number, primaryDocument: filing.primary_document ?? null, }); } catch { filingDocument = null; } if (filingDocument?.text) { await setProjectionStage( task, "analyze.extract", "Generating extraction context from filing text", { subject: analyzeSubject, }, ); const ruleBasedExtraction = buildRuleBasedExtraction( filing, filingDocument.text, ); const extractionResult = await runAiAnalysis( extractionPrompt(filing, filingDocument.text), "Return strict JSON only.", { workload: "extraction" }, ); const parsed = parseExtractionPayload(extractionResult.text); if (!parsed) { throw new Error("Extraction output invalid JSON schema"); } extraction = mergeExtractionWithFallback(parsed, ruleBasedExtraction); extractionMeta = { provider: "zhipu", model: extractionResult.model, source: filingDocument.source, generatedAt: new Date().toISOString(), }; } await setProjectionStage( task, "analyze.generate_report", "Generating final filing analysis report", { subject: analyzeSubject, }, ); const analysis = await runAiAnalysis( reportPrompt(filing, extraction, extractionMeta), "Use concise institutional analyst language.", { workload: "report" }, ); await setProjectionStage( task, "analyze.persist_report", "Persisting filing analysis output", { subject: analyzeSubject, }, ); await saveFilingAnalysis(accessionNumber, { provider: analysis.provider, model: analysis.model, text: analysis.text, extraction, extractionMeta, }); let searchTaskId: string | null = null; try { const searchTask = await enqueueTask({ userId: task.user_id, taskType: "index_search", payload: { accessionNumber, sourceKinds: ["filing_brief"], }, priority: 58, resourceKey: `index_search:filing_brief:${accessionNumber}`, }); searchTaskId = searchTask.id; } catch (error) { console.error( `[search-index-analyze] failed for ${accessionNumber}:`, error, ); } const result = { ticker: filing.ticker, accessionNumber, filingType: filing.filing_type, provider: analysis.provider, model: analysis.model, extractionProvider: extractionMeta.provider, extractionModel: extractionMeta.model, searchTaskId, }; return buildTaskOutcome( result, `Analysis report generated for ${filing.ticker} ${filing.filing_type} ${accessionNumber}.`, { subject: analyzeSubject, }, ); } async function processIndexSearch(task: Task) { await setProjectionStage( task, "search.collect_sources", "Collecting source records for search indexing", ); const ticker = parseOptionalText(task.payload.ticker); const accessionNumber = parseOptionalText(task.payload.accessionNumber); const journalEntryId = task.payload.journalEntryId === undefined ? null : Number(task.payload.journalEntryId); const deleteSourceRefs = Array.isArray(task.payload.deleteSourceRefs) ? task.payload.deleteSourceRefs.filter( ( entry, ): entry is { sourceKind: string; sourceRef: string; scope: string; userId?: string | null; } => { return Boolean( entry && typeof entry === "object" && typeof (entry as { sourceKind?: unknown }).sourceKind === "string" && typeof (entry as { sourceRef?: unknown }).sourceRef === "string" && typeof (entry as { scope?: unknown }).scope === "string", ); }, ) : []; const sourceKinds = parseOptionalStringArray(task.payload.sourceKinds).filter( ( sourceKind, ): sourceKind is "filing_document" | "filing_brief" | "research_note" => { return ( sourceKind === "filing_document" || sourceKind === "filing_brief" || sourceKind === "research_note" ); }, ); const validatedJournalEntryId = typeof journalEntryId === "number" && Number.isInteger(journalEntryId) && journalEntryId > 0 ? journalEntryId : null; const result = await indexSearchDocuments({ userId: task.user_id, ticker, accessionNumber, journalEntryId: validatedJournalEntryId, sourceKinds: sourceKinds.length > 0 ? sourceKinds : undefined, deleteSourceRefs: deleteSourceRefs.map((entry) => ({ sourceKind: entry.sourceKind as | "filing_document" | "filing_brief" | "research_note", sourceRef: entry.sourceRef, scope: entry.scope === "user" ? "user" : "global", userId: typeof entry.userId === "string" ? entry.userId : null, })), onStage: async (stage, detail, context) => { switch (stage) { case "collect": await setProjectionStage( task, "search.collect_sources", detail, context ?? { subject: ticker ? { ticker } : accessionNumber ? { accessionNumber } : null, }, ); break; case "fetch": await setProjectionStage( task, "search.fetch_documents", detail, context ?? null, ); break; case "chunk": await setProjectionStage( task, "search.chunk", detail, context ?? null, ); break; case "embed": await setProjectionStage( task, "search.embed", detail, context ?? null, ); break; case "persist": await setProjectionStage( task, "search.persist", detail, context ?? null, ); break; } }, }); const taskResult = { ticker, accessionNumber, journalEntryId: validatedJournalEntryId, ...result, }; return buildTaskOutcome( taskResult, `Indexed ${result.indexed} sources, embedded ${result.chunksEmbedded} chunks, skipped ${result.skipped}, deleted ${result.deleted} stale documents.`, { progress: { current: result.sourcesCollected, total: result.sourcesCollected || 1, unit: "sources", }, counters: { sourcesCollected: result.sourcesCollected, indexed: result.indexed, chunksEmbedded: result.chunksEmbedded, skipped: result.skipped, deleted: result.deleted, }, subject: ticker ? { ticker } : accessionNumber ? { accessionNumber } : null, }, ); } function holdingDigest(holdings: Holding[]) { return holdings.map((holding) => ({ ticker: holding.ticker, shares: holding.shares, avgCost: holding.avg_cost, currentPrice: holding.current_price, marketValue: holding.market_value, gainLoss: holding.gain_loss, gainLossPct: holding.gain_loss_pct, })); } async function processPortfolioInsights(task: Task) { const userId = task.user_id; if (!userId) { throw new Error("Task is missing user scope"); } await setProjectionStage( task, "insights.load_holdings", "Loading holdings for portfolio insight generation", ); const userHoldings = await listUserHoldings(userId); const summary = buildPortfolioSummary(userHoldings); const holdingsContext = { counters: { holdings: userHoldings.length, }, } satisfies TaskStageContext; await setProjectionStage( task, "insights.load_holdings", `Loaded ${userHoldings.length} holdings for portfolio insight generation`, holdingsContext, ); const prompt = [ "Generate portfolio intelligence with actionable recommendations.", `Portfolio summary: ${JSON.stringify(summary)}`, `Holdings: ${JSON.stringify(holdingDigest(userHoldings))}`, "Respond with: 1) health score (0-100), 2) top 3 risks, 3) top 3 opportunities, 4) next actions in 7 days.", ].join("\n"); await setProjectionStage( task, "insights.generate", "Generating portfolio AI insight", holdingsContext, ); const analysis = await runAiAnalysis( prompt, "Act as a risk-aware buy-side analyst.", { workload: "report" }, ); await setProjectionStage( task, "insights.persist", "Persisting generated portfolio insight", holdingsContext, ); await createPortfolioInsight({ userId, provider: analysis.provider, model: analysis.model, content: analysis.text, }); const result = { provider: analysis.provider, model: analysis.model, summary, }; return buildTaskOutcome( result, `Generated portfolio insight for ${summary.positions} holdings.`, { counters: { holdings: summary.positions, }, }, ); } export const __taskProcessorInternals = { parseExtractionPayload, deterministicExtractionFallback, isFinancialMetricsForm, }; export async function runTaskProcessor(task: Task) { switch (task.task_type) { case "sync_filings": return await processSyncFilings(task); case "refresh_prices": return await processRefreshPrices(task); case "analyze_filing": return await processAnalyzeFiling(task); case "portfolio_insights": return await processPortfolioInsights(task); case "index_search": return await processIndexSearch(task); default: throw new Error(`Unsupported task type: ${task.task_type}`); } }