import type { Filing, FilingExtraction, FilingExtractionMeta, Holding, Task } from '@/lib/types'; import { runAiAnalysis } from '@/lib/server/ai'; import { buildPortfolioSummary } from '@/lib/server/portfolio'; import { getQuote } from '@/lib/server/prices'; import { getFilingByAccession, listFilingsRecords, saveFilingAnalysis, upsertFilingsRecords } from '@/lib/server/repos/filings'; import { getFilingStatementSnapshotByFilingId, upsertFilingStatementSnapshot } from '@/lib/server/repos/filing-statements'; import { applyRefreshedPrices, listHoldingsForPriceRefresh, listUserHoldings } from '@/lib/server/repos/holdings'; import { createPortfolioInsight } from '@/lib/server/repos/insights'; import { fetchFilingMetricsForFilings, fetchPrimaryFilingText, fetchRecentFilings, hydrateFilingStatementSnapshot } from '@/lib/server/sec'; const EXTRACTION_REQUIRED_KEYS = [ 'summary', 'keyPoints', 'redFlags', 'followUpQuestions', 'portfolioSignals', 'segmentSpecificData', 'geographicRevenueBreakdown', 'companySpecificData', 'secApiCrossChecks', 'confidence' ] as const; const EXTRACTION_MAX_ITEMS = 6; const EXTRACTION_ITEM_MAX_LENGTH = 280; const EXTRACTION_SUMMARY_MAX_LENGTH = 900; const STATEMENT_HYDRATION_DELAY_MS = 120; const STATEMENT_HYDRATION_MAX_FILINGS = 80; const SEGMENT_PATTERNS = [ /\boperating segment\b/i, /\bsegment revenue\b/i, /\bsegment margin\b/i, /\bsegment profit\b/i, /\bbusiness segment\b/i, /\breportable segment\b/i ]; const GEOGRAPHIC_PATTERNS = [ /\bgeographic\b/i, /\bamericas\b/i, /\bemea\b/i, /\bapac\b/i, /\basia pacific\b/i, /\bnorth america\b/i, /\beurope\b/i, /\bchina\b/i, /\binternational\b/i ]; const COMPANY_SPECIFIC_PATTERNS = [ /\bsame[- ]store\b/i, /\bcomparable[- ]store\b/i, /\bcomp sales\b/i, /\borganic sales\b/i, /\bbookings\b/i, /\bbacklog\b/i, /\barpu\b/i, /\bmau\b/i, /\bdau\b/i, /\bsubscriber\b/i, /\boccupancy\b/i, /\brevpar\b/i, /\bretention\b/i, /\bchurn\b/i ]; type FilingMetricKey = keyof NonNullable; const METRIC_CHECK_PATTERNS: Array<{ key: FilingMetricKey; label: string; patterns: RegExp[]; }> = [ { key: 'revenue', label: 'Revenue', patterns: [/\brevenue\b/i, /\bsales\b/i] }, { key: 'netIncome', label: 'Net income', patterns: [/\bnet income\b/i, /\bprofit\b/i] }, { key: 'totalAssets', label: 'Total assets', patterns: [/\btotal assets\b/i, /\bassets\b/i] }, { key: 'cash', label: 'Cash', patterns: [/\bcash\b/i, /\bcash equivalents\b/i] }, { key: 'debt', label: 'Debt', patterns: [/\bdebt\b/i, /\bborrowings\b/i, /\bliabilit(?:y|ies)\b/i] } ]; function isFinancialMetricsForm(form: Filing['filing_type']) { return form === '10-K' || form === '10-Q'; } function toTaskResult(value: unknown): Record { if (!value || typeof value !== 'object' || Array.isArray(value)) { return { value }; } return value as Record; } function parseTicker(raw: unknown) { if (typeof raw !== 'string' || raw.trim().length < 1) { throw new Error('Ticker is required'); } return raw.trim().toUpperCase(); } function parseLimit(raw: unknown, fallback: number, min: number, max: number) { const numberValue = typeof raw === 'number' ? raw : Number(raw); if (!Number.isFinite(numberValue)) { return fallback; } const intValue = Math.trunc(numberValue); return Math.min(Math.max(intValue, min), max); } function sanitizeExtractionText(value: unknown, maxLength: number) { if (typeof value !== 'string') { return null; } const collapsed = value.replace(/\s+/g, ' ').trim(); if (!collapsed) { return null; } return collapsed.slice(0, maxLength); } function sanitizeExtractionList(value: unknown) { if (!Array.isArray(value)) { return null; } const cleaned: string[] = []; for (const entry of value) { const normalized = sanitizeExtractionText(entry, EXTRACTION_ITEM_MAX_LENGTH); if (!normalized) { continue; } cleaned.push(normalized); if (cleaned.length >= EXTRACTION_MAX_ITEMS) { break; } } return cleaned; } function uniqueExtractionList(items: Array) { const seen = new Set(); const unique: string[] = []; for (const item of items) { const normalized = sanitizeExtractionText(item, EXTRACTION_ITEM_MAX_LENGTH); if (!normalized) { continue; } const signature = normalized.toLowerCase(); if (seen.has(signature)) { continue; } seen.add(signature); unique.push(normalized); if (unique.length >= EXTRACTION_MAX_ITEMS) { break; } } return unique; } function collectTextSignals(filingText: string, patterns: RegExp[]) { const lines = filingText .replace(/\r/g, '\n') .split(/\n+/) .map((line) => line.replace(/\s+/g, ' ').trim()) .filter((line) => line.length >= 24); const matches: string[] = []; for (const line of lines) { if (!patterns.some((pattern) => pattern.test(line))) { continue; } matches.push(line); if (matches.length >= EXTRACTION_MAX_ITEMS * 2) { break; } } return uniqueExtractionList(matches); } function parseExtractionPayload(raw: string): FilingExtraction | null { const fencedJson = raw.match(/```(?:json)?\s*([\s\S]*?)```/i)?.[1]; const candidate = fencedJson ?? (() => { const start = raw.indexOf('{'); const end = raw.lastIndexOf('}'); return start >= 0 && end > start ? raw.slice(start, end + 1) : null; })(); if (!candidate) { return null; } let parsed: unknown; try { parsed = JSON.parse(candidate); } catch { return null; } if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) { return null; } const payload = parsed as Record; const keys = Object.keys(payload); if (keys.length !== EXTRACTION_REQUIRED_KEYS.length) { return null; } for (const key of EXTRACTION_REQUIRED_KEYS) { if (!(key in payload)) { return null; } } for (const key of keys) { if (!EXTRACTION_REQUIRED_KEYS.includes(key as (typeof EXTRACTION_REQUIRED_KEYS)[number])) { return null; } } const summary = sanitizeExtractionText(payload.summary, EXTRACTION_SUMMARY_MAX_LENGTH); const keyPoints = sanitizeExtractionList(payload.keyPoints); const redFlags = sanitizeExtractionList(payload.redFlags); const followUpQuestions = sanitizeExtractionList(payload.followUpQuestions); const portfolioSignals = sanitizeExtractionList(payload.portfolioSignals); const segmentSpecificData = sanitizeExtractionList(payload.segmentSpecificData); const geographicRevenueBreakdown = sanitizeExtractionList(payload.geographicRevenueBreakdown); const companySpecificData = sanitizeExtractionList(payload.companySpecificData); const secApiCrossChecks = sanitizeExtractionList(payload.secApiCrossChecks); const confidenceRaw = typeof payload.confidence === 'number' ? payload.confidence : Number(payload.confidence); if ( !summary || !keyPoints || !redFlags || !followUpQuestions || !portfolioSignals || !segmentSpecificData || !geographicRevenueBreakdown || !companySpecificData || !secApiCrossChecks || !Number.isFinite(confidenceRaw) ) { return null; } return { summary, keyPoints, redFlags, followUpQuestions, portfolioSignals, segmentSpecificData, geographicRevenueBreakdown, companySpecificData, secApiCrossChecks, confidence: Math.min(Math.max(confidenceRaw, 0), 1) }; } function metricSnapshotLine(label: string, value: number | null | undefined) { if (value === null || value === undefined || !Number.isFinite(value)) { return `${label}: not reported`; } return `${label}: ${Math.round(value).toLocaleString('en-US')}`; } function buildSecApiCrossChecks(filing: Filing, filingText: string) { const normalizedText = filingText.toLowerCase(); const checks: string[] = []; for (const descriptor of METRIC_CHECK_PATTERNS) { const value = filing.metrics?.[descriptor.key]; if (value === null || value === undefined || !Number.isFinite(value)) { checks.push(`${descriptor.label}: SEC API metric unavailable for this filing.`); continue; } const hasMention = descriptor.patterns.some((pattern) => pattern.test(normalizedText)); if (hasMention) { checks.push( `${descriptor.label}: SEC API value ${Math.round(value).toLocaleString('en-US')} appears referenced in filing narrative.` ); } else { checks.push( `${descriptor.label}: SEC API value ${Math.round(value).toLocaleString('en-US')} was not confidently located in sampled filing text.` ); } } return uniqueExtractionList(checks); } function deterministicExtractionFallback(filing: Filing): FilingExtraction { const metrics = filing.metrics; return { summary: `${filing.company_name} ${filing.filing_type} filed on ${filing.filing_date}. Deterministic extraction fallback was used because filing text parsing was unavailable or invalid.`, keyPoints: [ `${filing.filing_type} filing recorded for ${filing.ticker}.`, metricSnapshotLine('Revenue', metrics?.revenue), metricSnapshotLine('Net income', metrics?.netIncome), metricSnapshotLine('Total assets', metrics?.totalAssets) ], redFlags: [ metricSnapshotLine('Cash', metrics?.cash), metricSnapshotLine('Debt', metrics?.debt), filing.primary_document ? 'Primary document is indexed and available for review.' : 'Primary document reference is unavailable in current filing metadata.' ], followUpQuestions: [ 'What changed versus the prior filing in guidance, margins, or liquidity?', 'Are any material risks under-emphasized relative to historical filings?', 'Should portfolio exposure be adjusted before the next reporting cycle?' ], portfolioSignals: [ 'Validate trend direction using at least two prior filings.', 'Cross-check leverage and liquidity metrics against position sizing rules.', 'Track language shifts around guidance or demand assumptions.' ], segmentSpecificData: [ 'Segment-level disclosures were not parsed in deterministic fallback mode.' ], geographicRevenueBreakdown: [ 'Geographic revenue disclosures were not parsed in deterministic fallback mode.' ], companySpecificData: [ 'Company-specific operating KPIs (for example same-store sales) were not parsed in deterministic fallback mode.' ], secApiCrossChecks: [ `${metricSnapshotLine('Revenue', metrics?.revenue)} (SEC API baseline; text verification unavailable).`, `${metricSnapshotLine('Net income', metrics?.netIncome)} (SEC API baseline; text verification unavailable).` ], confidence: 0.2 }; } function buildRuleBasedExtraction(filing: Filing, filingText: string): FilingExtraction { const baseline = deterministicExtractionFallback(filing); const segmentSpecificData = collectTextSignals(filingText, SEGMENT_PATTERNS); const geographicRevenueBreakdown = collectTextSignals(filingText, GEOGRAPHIC_PATTERNS); const companySpecificData = collectTextSignals(filingText, COMPANY_SPECIFIC_PATTERNS); const secApiCrossChecks = buildSecApiCrossChecks(filing, filingText); const segmentLead = segmentSpecificData[0] ? `Segment detail: ${segmentSpecificData[0]}` : null; const geographicLead = geographicRevenueBreakdown[0] ? `Geographic detail: ${geographicRevenueBreakdown[0]}` : null; const companyLead = companySpecificData[0] ? `Company-specific KPI: ${companySpecificData[0]}` : null; return { summary: `${filing.company_name} ${filing.filing_type} filed on ${filing.filing_date}. SEC API metrics were retained as the baseline and filing text was scanned for segment and company-specific disclosures.`, keyPoints: uniqueExtractionList([ ...baseline.keyPoints, segmentLead, geographicLead, companyLead ]), redFlags: uniqueExtractionList([ ...baseline.redFlags, secApiCrossChecks.find((line) => /not confidently located/i.test(line)) ]), followUpQuestions: uniqueExtractionList([ ...baseline.followUpQuestions, segmentSpecificData.length > 0 ? 'How do segment trends change the consolidated margin outlook?' : 'Does management provide segment-level KPIs in supplemental exhibits?' ]), portfolioSignals: uniqueExtractionList([ ...baseline.portfolioSignals, companySpecificData.length > 0 ? 'Incorporate company-specific KPI direction into near-term position sizing.' : 'Track future filings for explicit operating KPI disclosures.' ]), segmentSpecificData: segmentSpecificData.length > 0 ? segmentSpecificData : baseline.segmentSpecificData, geographicRevenueBreakdown: geographicRevenueBreakdown.length > 0 ? geographicRevenueBreakdown : baseline.geographicRevenueBreakdown, companySpecificData: companySpecificData.length > 0 ? companySpecificData : baseline.companySpecificData, secApiCrossChecks: secApiCrossChecks.length > 0 ? secApiCrossChecks : baseline.secApiCrossChecks, confidence: segmentSpecificData.length + geographicRevenueBreakdown.length + companySpecificData.length > 0 ? 0.4 : 0.3 }; } function preferExtractionList(primary: string[], fallback: string[]) { return primary.length > 0 ? primary : fallback; } function mergeExtractionWithFallback(primary: FilingExtraction, fallback: FilingExtraction): FilingExtraction { return { summary: primary.summary || fallback.summary, keyPoints: preferExtractionList(primary.keyPoints, fallback.keyPoints), redFlags: preferExtractionList(primary.redFlags, fallback.redFlags), followUpQuestions: preferExtractionList(primary.followUpQuestions, fallback.followUpQuestions), portfolioSignals: preferExtractionList(primary.portfolioSignals, fallback.portfolioSignals), segmentSpecificData: preferExtractionList(primary.segmentSpecificData, fallback.segmentSpecificData), geographicRevenueBreakdown: preferExtractionList(primary.geographicRevenueBreakdown, fallback.geographicRevenueBreakdown), companySpecificData: preferExtractionList(primary.companySpecificData, fallback.companySpecificData), secApiCrossChecks: preferExtractionList(primary.secApiCrossChecks, fallback.secApiCrossChecks), confidence: Math.min(Math.max(primary.confidence, 0), 1) }; } function extractionPrompt(filing: Filing, filingText: string) { return [ 'Extract structured signals from the SEC filing text.', `Company: ${filing.company_name} (${filing.ticker})`, `Form: ${filing.filing_type}`, `Filed: ${filing.filing_date}`, `SEC API baseline metrics: ${JSON.stringify(filing.metrics ?? {})}`, 'Use SEC API metrics as canonical numeric values and validate whether each appears consistent with filing text context.', 'Prioritize company-specific and segment-specific disclosures not covered by SEC endpoint fields (for example same-store sales, geographic mix, segment margin).', 'Return ONLY valid JSON with exactly these keys and no extra keys:', '{"summary":"string","keyPoints":["string"],"redFlags":["string"],"followUpQuestions":["string"],"portfolioSignals":["string"],"segmentSpecificData":["string"],"geographicRevenueBreakdown":["string"],"companySpecificData":["string"],"secApiCrossChecks":["string"],"confidence":0}', `Rules: every array max ${EXTRACTION_MAX_ITEMS} items; each item <= ${EXTRACTION_ITEM_MAX_LENGTH} chars; summary <= ${EXTRACTION_SUMMARY_MAX_LENGTH} chars; confidence between 0 and 1.`, 'Filing text follows:', filingText ].join('\n\n'); } function reportPrompt( filing: Filing, extraction: FilingExtraction, extractionMeta: FilingExtractionMeta ) { return [ 'You are a fiscal research assistant focused on regulatory signals.', `Analyze this SEC filing from ${filing.company_name} (${filing.ticker}).`, `Form: ${filing.filing_type}`, `Filed: ${filing.filing_date}`, `SEC API baseline metrics: ${JSON.stringify(filing.metrics ?? {})}`, `Structured extraction context (${extractionMeta.source}): ${JSON.stringify(extraction)}`, 'Use SEC API values as the baseline financials and explicitly reference segment/company-specific details from extraction.', 'Return concise sections: Thesis, Red Flags, Follow-up Questions, Portfolio Impact.' ].join('\n'); } function filingLinks(filing: { filingUrl: string | null; submissionUrl: string | null; }) { const links: Array<{ link_type: string; url: string }> = []; if (filing.filingUrl) { links.push({ link_type: 'primary_document', url: filing.filingUrl }); } if (filing.submissionUrl) { links.push({ link_type: 'submission_index', url: filing.submissionUrl }); } return links; } async function processSyncFilings(task: Task) { const ticker = parseTicker(task.payload.ticker); const limit = parseLimit(task.payload.limit, 20, 1, 50); const filings = await fetchRecentFilings(ticker, limit); const metricsByAccession = new Map(); const filingsByCik = new Map(); for (const filing of filings) { const group = filingsByCik.get(filing.cik); if (group) { group.push(filing); continue; } filingsByCik.set(filing.cik, [filing]); } for (const [cik, filingsForCik] of filingsByCik) { const filingsForFinancialMetrics = filingsForCik.filter((filing) => isFinancialMetricsForm(filing.filingType)); if (filingsForFinancialMetrics.length === 0) { continue; } const metricsMap = await fetchFilingMetricsForFilings( cik, filingsForCik[0]?.ticker ?? ticker, filingsForFinancialMetrics.map((filing) => ({ accessionNumber: filing.accessionNumber, filingDate: filing.filingDate, filingType: filing.filingType })) ); for (const [accessionNumber, metrics] of metricsMap.entries()) { metricsByAccession.set(accessionNumber, metrics); } } const saveResult = await upsertFilingsRecords( filings.map((filing) => ({ ticker: filing.ticker, filing_type: filing.filingType, filing_date: filing.filingDate, accession_number: filing.accessionNumber, cik: filing.cik, company_name: filing.companyName, filing_url: filing.filingUrl, submission_url: filing.submissionUrl, primary_document: filing.primaryDocument, metrics: metricsByAccession.get(filing.accessionNumber) ?? null, links: filingLinks(filing) })) ); let statementSnapshotsHydrated = 0; let statementSnapshotsFailed = 0; const hydrateCandidates = (await listFilingsRecords({ ticker, limit: Math.min(Math.max(limit * 3, 40), STATEMENT_HYDRATION_MAX_FILINGS) })) .filter((filing): filing is Filing & { filing_type: '10-K' | '10-Q' } => { return filing.filing_type === '10-K' || filing.filing_type === '10-Q'; }); for (const filing of hydrateCandidates) { const existingSnapshot = await getFilingStatementSnapshotByFilingId(filing.id); const shouldRefresh = !existingSnapshot || Date.parse(existingSnapshot.updated_at) < Date.parse(filing.updated_at); if (!shouldRefresh) { continue; } try { const snapshot = await hydrateFilingStatementSnapshot({ filingId: filing.id, ticker: filing.ticker, cik: filing.cik, accessionNumber: filing.accession_number, filingDate: filing.filing_date, filingType: filing.filing_type, filingUrl: filing.filing_url, primaryDocument: filing.primary_document ?? null, metrics: filing.metrics }); await upsertFilingStatementSnapshot(snapshot); statementSnapshotsHydrated += 1; } catch (error) { await upsertFilingStatementSnapshot({ filing_id: filing.id, ticker: filing.ticker, filing_date: filing.filing_date, filing_type: filing.filing_type, period_end: filing.filing_date, statement_bundle: null, standardized_bundle: null, dimension_bundle: null, parse_status: 'failed', parse_error: error instanceof Error ? error.message : 'Statement hydration failed', source: 'companyfacts_fallback' }); statementSnapshotsFailed += 1; } await Bun.sleep(STATEMENT_HYDRATION_DELAY_MS); } return { ticker, fetched: filings.length, inserted: saveResult.inserted, updated: saveResult.updated, statementSnapshotsHydrated, statementSnapshotsFailed }; } async function processRefreshPrices(task: Task) { const userId = task.user_id; if (!userId) { throw new Error('Task is missing user scope'); } const userHoldings = await listHoldingsForPriceRefresh(userId); const tickers = [...new Set(userHoldings.map((entry) => entry.ticker))]; const quotes = new Map(); for (const ticker of tickers) { const quote = await getQuote(ticker); quotes.set(ticker, quote); } const updatedCount = await applyRefreshedPrices(userId, quotes, new Date().toISOString()); return { updatedCount, totalTickers: tickers.length }; } async function processAnalyzeFiling(task: Task) { const accessionNumber = typeof task.payload.accessionNumber === 'string' ? task.payload.accessionNumber : ''; if (!accessionNumber) { throw new Error('accessionNumber is required'); } const filing = await getFilingByAccession(accessionNumber); if (!filing) { throw new Error(`Filing ${accessionNumber} not found`); } const defaultExtraction = deterministicExtractionFallback(filing); let extraction = defaultExtraction; let extractionMeta: FilingExtractionMeta = { provider: 'deterministic-fallback', model: 'metadata-fallback', source: 'metadata_fallback', generatedAt: new Date().toISOString() }; try { const filingDocument = await fetchPrimaryFilingText({ filingUrl: filing.filing_url, cik: filing.cik, accessionNumber: filing.accession_number, primaryDocument: filing.primary_document ?? null }); if (filingDocument?.text) { const ruleBasedExtraction = buildRuleBasedExtraction(filing, filingDocument.text); extraction = ruleBasedExtraction; extractionMeta = { provider: 'deterministic-fallback', model: 'filing-rule-based', source: filingDocument.source, generatedAt: new Date().toISOString() }; const extractionResult = await runAiAnalysis( extractionPrompt(filing, filingDocument.text), 'Return strict JSON only.', { workload: 'extraction' } ); const parsed = parseExtractionPayload(extractionResult.text); if (parsed) { extraction = mergeExtractionWithFallback(parsed, ruleBasedExtraction); extractionMeta = { provider: extractionResult.provider === 'local-fallback' ? 'deterministic-fallback' : 'ollama', model: extractionResult.model, source: filingDocument.source, generatedAt: new Date().toISOString() }; } } } catch { extraction = defaultExtraction; extractionMeta = { provider: 'deterministic-fallback', model: 'metadata-fallback', source: 'metadata_fallback', generatedAt: new Date().toISOString() }; } const analysis = await runAiAnalysis( reportPrompt(filing, extraction, extractionMeta), 'Use concise institutional analyst language.', { workload: 'report' } ); await saveFilingAnalysis(accessionNumber, { provider: analysis.provider, model: analysis.model, text: analysis.text, extraction, extractionMeta }); return { accessionNumber, provider: analysis.provider, model: analysis.model, extractionProvider: extractionMeta.provider, extractionModel: extractionMeta.model }; } function holdingDigest(holdings: Holding[]) { return holdings.map((holding) => ({ ticker: holding.ticker, shares: holding.shares, avgCost: holding.avg_cost, currentPrice: holding.current_price, marketValue: holding.market_value, gainLoss: holding.gain_loss, gainLossPct: holding.gain_loss_pct })); } async function processPortfolioInsights(task: Task) { const userId = task.user_id; if (!userId) { throw new Error('Task is missing user scope'); } const userHoldings = await listUserHoldings(userId); const summary = buildPortfolioSummary(userHoldings); const prompt = [ 'Generate portfolio intelligence with actionable recommendations.', `Portfolio summary: ${JSON.stringify(summary)}`, `Holdings: ${JSON.stringify(holdingDigest(userHoldings))}`, 'Respond with: 1) health score (0-100), 2) top 3 risks, 3) top 3 opportunities, 4) next actions in 7 days.' ].join('\n'); const analysis = await runAiAnalysis( prompt, 'Act as a risk-aware buy-side analyst.', { workload: 'report' } ); await createPortfolioInsight({ userId, provider: analysis.provider, model: analysis.model, content: analysis.text }); return { provider: analysis.provider, model: analysis.model, summary }; } export const __taskProcessorInternals = { parseExtractionPayload, deterministicExtractionFallback, isFinancialMetricsForm }; export async function runTaskProcessor(task: Task) { switch (task.task_type) { case 'sync_filings': return toTaskResult(await processSyncFilings(task)); case 'refresh_prices': return toTaskResult(await processRefreshPrices(task)); case 'analyze_filing': return toTaskResult(await processAnalyzeFiling(task)); case 'portfolio_insights': return toTaskResult(await processPortfolioInsights(task)); default: throw new Error(`Unsupported task type: ${task.task_type}`); } }