455 lines
13 KiB
TypeScript
455 lines
13 KiB
TypeScript
import type {
|
|
Filing,
|
|
FilingExtraction,
|
|
FilingExtractionMeta,
|
|
Holding,
|
|
Task
|
|
} from '@/lib/types';
|
|
import { runAiAnalysis } from '@/lib/server/ai';
|
|
import { buildPortfolioSummary } from '@/lib/server/portfolio';
|
|
import { getQuote } from '@/lib/server/prices';
|
|
import {
|
|
getFilingByAccession,
|
|
saveFilingAnalysis,
|
|
upsertFilingsRecords
|
|
} from '@/lib/server/repos/filings';
|
|
import {
|
|
applyRefreshedPrices,
|
|
listHoldingsForPriceRefresh,
|
|
listUserHoldings
|
|
} from '@/lib/server/repos/holdings';
|
|
import { createPortfolioInsight } from '@/lib/server/repos/insights';
|
|
import {
|
|
fetchFilingMetrics,
|
|
fetchPrimaryFilingText,
|
|
fetchRecentFilings
|
|
} from '@/lib/server/sec';
|
|
|
|
const EXTRACTION_REQUIRED_KEYS = [
|
|
'summary',
|
|
'keyPoints',
|
|
'redFlags',
|
|
'followUpQuestions',
|
|
'portfolioSignals',
|
|
'confidence'
|
|
] as const;
|
|
const EXTRACTION_MAX_ITEMS = 6;
|
|
const EXTRACTION_ITEM_MAX_LENGTH = 280;
|
|
const EXTRACTION_SUMMARY_MAX_LENGTH = 900;
|
|
|
|
function toTaskResult(value: unknown): Record<string, unknown> {
|
|
if (!value || typeof value !== 'object' || Array.isArray(value)) {
|
|
return { value };
|
|
}
|
|
|
|
return value as Record<string, unknown>;
|
|
}
|
|
|
|
function parseTicker(raw: unknown) {
|
|
if (typeof raw !== 'string' || raw.trim().length < 1) {
|
|
throw new Error('Ticker is required');
|
|
}
|
|
|
|
return raw.trim().toUpperCase();
|
|
}
|
|
|
|
function parseLimit(raw: unknown, fallback: number, min: number, max: number) {
|
|
const numberValue = typeof raw === 'number' ? raw : Number(raw);
|
|
|
|
if (!Number.isFinite(numberValue)) {
|
|
return fallback;
|
|
}
|
|
|
|
const intValue = Math.trunc(numberValue);
|
|
return Math.min(Math.max(intValue, min), max);
|
|
}
|
|
|
|
function sanitizeExtractionText(value: unknown, maxLength: number) {
|
|
if (typeof value !== 'string') {
|
|
return null;
|
|
}
|
|
|
|
const collapsed = value.replace(/\s+/g, ' ').trim();
|
|
if (!collapsed) {
|
|
return null;
|
|
}
|
|
|
|
return collapsed.slice(0, maxLength);
|
|
}
|
|
|
|
function sanitizeExtractionList(value: unknown) {
|
|
if (!Array.isArray(value)) {
|
|
return null;
|
|
}
|
|
|
|
const cleaned: string[] = [];
|
|
|
|
for (const entry of value) {
|
|
const normalized = sanitizeExtractionText(entry, EXTRACTION_ITEM_MAX_LENGTH);
|
|
if (!normalized) {
|
|
continue;
|
|
}
|
|
|
|
cleaned.push(normalized);
|
|
if (cleaned.length >= EXTRACTION_MAX_ITEMS) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
return cleaned;
|
|
}
|
|
|
|
function parseExtractionPayload(raw: string): FilingExtraction | null {
|
|
const fencedJson = raw.match(/```(?:json)?\s*([\s\S]*?)```/i)?.[1];
|
|
const candidate = fencedJson ?? (() => {
|
|
const start = raw.indexOf('{');
|
|
const end = raw.lastIndexOf('}');
|
|
return start >= 0 && end > start ? raw.slice(start, end + 1) : null;
|
|
})();
|
|
|
|
if (!candidate) {
|
|
return null;
|
|
}
|
|
|
|
let parsed: unknown;
|
|
try {
|
|
parsed = JSON.parse(candidate);
|
|
} catch {
|
|
return null;
|
|
}
|
|
|
|
if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) {
|
|
return null;
|
|
}
|
|
|
|
const payload = parsed as Record<string, unknown>;
|
|
const keys = Object.keys(payload);
|
|
if (keys.length !== EXTRACTION_REQUIRED_KEYS.length) {
|
|
return null;
|
|
}
|
|
|
|
for (const key of EXTRACTION_REQUIRED_KEYS) {
|
|
if (!(key in payload)) {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
for (const key of keys) {
|
|
if (!EXTRACTION_REQUIRED_KEYS.includes(key as (typeof EXTRACTION_REQUIRED_KEYS)[number])) {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
const summary = sanitizeExtractionText(payload.summary, EXTRACTION_SUMMARY_MAX_LENGTH);
|
|
const keyPoints = sanitizeExtractionList(payload.keyPoints);
|
|
const redFlags = sanitizeExtractionList(payload.redFlags);
|
|
const followUpQuestions = sanitizeExtractionList(payload.followUpQuestions);
|
|
const portfolioSignals = sanitizeExtractionList(payload.portfolioSignals);
|
|
const confidenceRaw = typeof payload.confidence === 'number'
|
|
? payload.confidence
|
|
: Number(payload.confidence);
|
|
|
|
if (!summary || !keyPoints || !redFlags || !followUpQuestions || !portfolioSignals || !Number.isFinite(confidenceRaw)) {
|
|
return null;
|
|
}
|
|
|
|
return {
|
|
summary,
|
|
keyPoints,
|
|
redFlags,
|
|
followUpQuestions,
|
|
portfolioSignals,
|
|
confidence: Math.min(Math.max(confidenceRaw, 0), 1)
|
|
};
|
|
}
|
|
|
|
function metricSnapshotLine(label: string, value: number | null | undefined) {
|
|
if (value === null || value === undefined || !Number.isFinite(value)) {
|
|
return `${label}: not reported`;
|
|
}
|
|
|
|
return `${label}: ${Math.round(value).toLocaleString('en-US')}`;
|
|
}
|
|
|
|
function deterministicExtractionFallback(filing: Filing): FilingExtraction {
|
|
const metrics = filing.metrics;
|
|
|
|
return {
|
|
summary: `${filing.company_name} ${filing.filing_type} filed on ${filing.filing_date}. Deterministic extraction fallback used due unavailable or invalid local parsing output.`,
|
|
keyPoints: [
|
|
`${filing.filing_type} filing recorded for ${filing.ticker}.`,
|
|
metricSnapshotLine('Revenue', metrics?.revenue),
|
|
metricSnapshotLine('Net income', metrics?.netIncome),
|
|
metricSnapshotLine('Total assets', metrics?.totalAssets)
|
|
],
|
|
redFlags: [
|
|
metricSnapshotLine('Cash', metrics?.cash),
|
|
metricSnapshotLine('Debt', metrics?.debt),
|
|
filing.primary_document ? 'Primary document is indexed and available for review.' : 'Primary document reference is unavailable in current filing metadata.'
|
|
],
|
|
followUpQuestions: [
|
|
'What changed versus the prior filing in guidance, margins, or liquidity?',
|
|
'Are any material risks under-emphasized relative to historical filings?',
|
|
'Should portfolio exposure be adjusted before the next reporting cycle?'
|
|
],
|
|
portfolioSignals: [
|
|
'Validate trend direction using at least two prior filings.',
|
|
'Cross-check leverage and liquidity metrics against position sizing rules.',
|
|
'Track language shifts around guidance or demand assumptions.'
|
|
],
|
|
confidence: 0.2
|
|
};
|
|
}
|
|
|
|
function extractionPrompt(filing: Filing, filingText: string) {
|
|
return [
|
|
'Extract structured signals from the SEC filing text.',
|
|
`Company: ${filing.company_name} (${filing.ticker})`,
|
|
`Form: ${filing.filing_type}`,
|
|
`Filed: ${filing.filing_date}`,
|
|
'Return ONLY valid JSON with exactly these keys and no extra keys:',
|
|
'{"summary":"string","keyPoints":["string"],"redFlags":["string"],"followUpQuestions":["string"],"portfolioSignals":["string"],"confidence":0}',
|
|
`Rules: keyPoints/redFlags/followUpQuestions/portfolioSignals arrays max ${EXTRACTION_MAX_ITEMS} items; each item <= ${EXTRACTION_ITEM_MAX_LENGTH} chars; summary <= ${EXTRACTION_SUMMARY_MAX_LENGTH} chars; confidence between 0 and 1.`,
|
|
'Filing text follows:',
|
|
filingText
|
|
].join('\n\n');
|
|
}
|
|
|
|
function reportPrompt(
|
|
filing: Filing,
|
|
extraction: FilingExtraction,
|
|
extractionMeta: FilingExtractionMeta
|
|
) {
|
|
return [
|
|
'You are a fiscal research assistant focused on regulatory signals.',
|
|
`Analyze this SEC filing from ${filing.company_name} (${filing.ticker}).`,
|
|
`Form: ${filing.filing_type}`,
|
|
`Filed: ${filing.filing_date}`,
|
|
`Metrics: ${JSON.stringify(filing.metrics ?? {})}`,
|
|
`Structured extraction context (${extractionMeta.source}): ${JSON.stringify(extraction)}`,
|
|
'Return concise sections: Thesis, Red Flags, Follow-up Questions, Portfolio Impact.'
|
|
].join('\n');
|
|
}
|
|
|
|
function filingLinks(filing: {
|
|
filingUrl: string | null;
|
|
submissionUrl: string | null;
|
|
}) {
|
|
const links: Array<{ link_type: string; url: string }> = [];
|
|
|
|
if (filing.filingUrl) {
|
|
links.push({ link_type: 'primary_document', url: filing.filingUrl });
|
|
}
|
|
|
|
if (filing.submissionUrl) {
|
|
links.push({ link_type: 'submission_index', url: filing.submissionUrl });
|
|
}
|
|
|
|
return links;
|
|
}
|
|
|
|
async function processSyncFilings(task: Task) {
|
|
const ticker = parseTicker(task.payload.ticker);
|
|
const limit = parseLimit(task.payload.limit, 20, 1, 50);
|
|
const filings = await fetchRecentFilings(ticker, limit);
|
|
const metricsByCik = new Map<string, Filing['metrics']>();
|
|
|
|
for (const filing of filings) {
|
|
if (!metricsByCik.has(filing.cik)) {
|
|
const metrics = await fetchFilingMetrics(filing.cik, filing.ticker);
|
|
metricsByCik.set(filing.cik, metrics);
|
|
}
|
|
}
|
|
|
|
const saveResult = await upsertFilingsRecords(
|
|
filings.map((filing) => ({
|
|
ticker: filing.ticker,
|
|
filing_type: filing.filingType,
|
|
filing_date: filing.filingDate,
|
|
accession_number: filing.accessionNumber,
|
|
cik: filing.cik,
|
|
company_name: filing.companyName,
|
|
filing_url: filing.filingUrl,
|
|
submission_url: filing.submissionUrl,
|
|
primary_document: filing.primaryDocument,
|
|
metrics: metricsByCik.get(filing.cik) ?? null,
|
|
links: filingLinks(filing)
|
|
}))
|
|
);
|
|
|
|
return {
|
|
ticker,
|
|
fetched: filings.length,
|
|
inserted: saveResult.inserted,
|
|
updated: saveResult.updated
|
|
};
|
|
}
|
|
|
|
async function processRefreshPrices(task: Task) {
|
|
const userId = task.user_id;
|
|
if (!userId) {
|
|
throw new Error('Task is missing user scope');
|
|
}
|
|
|
|
const userHoldings = await listHoldingsForPriceRefresh(userId);
|
|
const tickers = [...new Set(userHoldings.map((entry) => entry.ticker))];
|
|
const quotes = new Map<string, number>();
|
|
|
|
for (const ticker of tickers) {
|
|
const quote = await getQuote(ticker);
|
|
quotes.set(ticker, quote);
|
|
}
|
|
|
|
const updatedCount = await applyRefreshedPrices(userId, quotes, new Date().toISOString());
|
|
|
|
return {
|
|
updatedCount,
|
|
totalTickers: tickers.length
|
|
};
|
|
}
|
|
|
|
async function processAnalyzeFiling(task: Task) {
|
|
const accessionNumber = typeof task.payload.accessionNumber === 'string'
|
|
? task.payload.accessionNumber
|
|
: '';
|
|
|
|
if (!accessionNumber) {
|
|
throw new Error('accessionNumber is required');
|
|
}
|
|
|
|
const filing = await getFilingByAccession(accessionNumber);
|
|
|
|
if (!filing) {
|
|
throw new Error(`Filing ${accessionNumber} not found`);
|
|
}
|
|
|
|
const defaultExtraction = deterministicExtractionFallback(filing);
|
|
let extraction = defaultExtraction;
|
|
let extractionMeta: FilingExtractionMeta = {
|
|
provider: 'deterministic-fallback',
|
|
model: 'metadata-fallback',
|
|
source: 'metadata_fallback',
|
|
generatedAt: new Date().toISOString()
|
|
};
|
|
|
|
try {
|
|
const filingDocument = await fetchPrimaryFilingText({
|
|
filingUrl: filing.filing_url,
|
|
cik: filing.cik,
|
|
accessionNumber: filing.accession_number,
|
|
primaryDocument: filing.primary_document ?? null
|
|
});
|
|
|
|
if (filingDocument?.text) {
|
|
const extractionResult = await runAiAnalysis(
|
|
extractionPrompt(filing, filingDocument.text),
|
|
'Return strict JSON only.',
|
|
{ workload: 'extraction' }
|
|
);
|
|
|
|
const parsed = parseExtractionPayload(extractionResult.text);
|
|
if (parsed) {
|
|
extraction = parsed;
|
|
extractionMeta = {
|
|
provider: extractionResult.provider === 'local-fallback' ? 'deterministic-fallback' : 'ollama',
|
|
model: extractionResult.model,
|
|
source: filingDocument.source,
|
|
generatedAt: new Date().toISOString()
|
|
};
|
|
}
|
|
}
|
|
} catch {
|
|
extraction = defaultExtraction;
|
|
}
|
|
|
|
const analysis = await runAiAnalysis(
|
|
reportPrompt(filing, extraction, extractionMeta),
|
|
'Use concise institutional analyst language.',
|
|
{ workload: 'report' }
|
|
);
|
|
|
|
await saveFilingAnalysis(accessionNumber, {
|
|
provider: analysis.provider,
|
|
model: analysis.model,
|
|
text: analysis.text,
|
|
extraction,
|
|
extractionMeta
|
|
});
|
|
|
|
return {
|
|
accessionNumber,
|
|
provider: analysis.provider,
|
|
model: analysis.model,
|
|
extractionProvider: extractionMeta.provider,
|
|
extractionModel: extractionMeta.model
|
|
};
|
|
}
|
|
|
|
function holdingDigest(holdings: Holding[]) {
|
|
return holdings.map((holding) => ({
|
|
ticker: holding.ticker,
|
|
shares: holding.shares,
|
|
avgCost: holding.avg_cost,
|
|
currentPrice: holding.current_price,
|
|
marketValue: holding.market_value,
|
|
gainLoss: holding.gain_loss,
|
|
gainLossPct: holding.gain_loss_pct
|
|
}));
|
|
}
|
|
|
|
async function processPortfolioInsights(task: Task) {
|
|
const userId = task.user_id;
|
|
if (!userId) {
|
|
throw new Error('Task is missing user scope');
|
|
}
|
|
|
|
const userHoldings = await listUserHoldings(userId);
|
|
const summary = buildPortfolioSummary(userHoldings);
|
|
|
|
const prompt = [
|
|
'Generate portfolio intelligence with actionable recommendations.',
|
|
`Portfolio summary: ${JSON.stringify(summary)}`,
|
|
`Holdings: ${JSON.stringify(holdingDigest(userHoldings))}`,
|
|
'Respond with: 1) health score (0-100), 2) top 3 risks, 3) top 3 opportunities, 4) next actions in 7 days.'
|
|
].join('\n');
|
|
|
|
const analysis = await runAiAnalysis(
|
|
prompt,
|
|
'Act as a risk-aware buy-side analyst.',
|
|
{ workload: 'report' }
|
|
);
|
|
|
|
await createPortfolioInsight({
|
|
userId,
|
|
provider: analysis.provider,
|
|
model: analysis.model,
|
|
content: analysis.text
|
|
});
|
|
|
|
return {
|
|
provider: analysis.provider,
|
|
model: analysis.model,
|
|
summary
|
|
};
|
|
}
|
|
|
|
export const __taskProcessorInternals = {
|
|
parseExtractionPayload,
|
|
deterministicExtractionFallback
|
|
};
|
|
|
|
export async function runTaskProcessor(task: Task) {
|
|
switch (task.task_type) {
|
|
case 'sync_filings':
|
|
return toTaskResult(await processSyncFilings(task));
|
|
case 'refresh_prices':
|
|
return toTaskResult(await processRefreshPrices(task));
|
|
case 'analyze_filing':
|
|
return toTaskResult(await processAnalyzeFiling(task));
|
|
case 'portfolio_insights':
|
|
return toTaskResult(await processPortfolioInsights(task));
|
|
default:
|
|
throw new Error(`Unsupported task type: ${task.task_type}`);
|
|
}
|
|
}
|