import type { Filing, Holding, Task } from '@/lib/types'; import { runOpenClawAnalysis } from '@/lib/server/openclaw'; import { buildPortfolioSummary } from '@/lib/server/portfolio'; import { getQuote } from '@/lib/server/prices'; import { getFilingByAccession, saveFilingAnalysis, upsertFilingsRecords } from '@/lib/server/repos/filings'; import { applyRefreshedPrices, listHoldingsForPriceRefresh, listUserHoldings } from '@/lib/server/repos/holdings'; import { createPortfolioInsight } from '@/lib/server/repos/insights'; import { fetchFilingMetrics, fetchRecentFilings } from '@/lib/server/sec'; function toTaskResult(value: unknown): Record { if (!value || typeof value !== 'object' || Array.isArray(value)) { return { value }; } return value as Record; } function parseTicker(raw: unknown) { if (typeof raw !== 'string' || raw.trim().length < 1) { throw new Error('Ticker is required'); } return raw.trim().toUpperCase(); } function parseLimit(raw: unknown, fallback: number, min: number, max: number) { const numberValue = typeof raw === 'number' ? raw : Number(raw); if (!Number.isFinite(numberValue)) { return fallback; } const intValue = Math.trunc(numberValue); return Math.min(Math.max(intValue, min), max); } function filingLinks(filing: { filingUrl: string | null; submissionUrl: string | null; }) { const links: Array<{ link_type: string; url: string }> = []; if (filing.filingUrl) { links.push({ link_type: 'primary_document', url: filing.filingUrl }); } if (filing.submissionUrl) { links.push({ link_type: 'submission_index', url: filing.submissionUrl }); } return links; } async function processSyncFilings(task: Task) { const ticker = parseTicker(task.payload.ticker); const limit = parseLimit(task.payload.limit, 20, 1, 50); const filings = await fetchRecentFilings(ticker, limit); const metricsByCik = new Map(); for (const filing of filings) { if (!metricsByCik.has(filing.cik)) { const metrics = await fetchFilingMetrics(filing.cik, filing.ticker); metricsByCik.set(filing.cik, metrics); } } const saveResult = await upsertFilingsRecords( filings.map((filing) => ({ ticker: filing.ticker, filing_type: filing.filingType, filing_date: filing.filingDate, accession_number: filing.accessionNumber, cik: filing.cik, company_name: filing.companyName, filing_url: filing.filingUrl, submission_url: filing.submissionUrl, primary_document: filing.primaryDocument, metrics: metricsByCik.get(filing.cik) ?? null, links: filingLinks(filing) })) ); return { ticker, fetched: filings.length, inserted: saveResult.inserted, updated: saveResult.updated }; } async function processRefreshPrices(task: Task) { const userId = task.user_id; if (!userId) { throw new Error('Task is missing user scope'); } const userHoldings = await listHoldingsForPriceRefresh(userId); const tickers = [...new Set(userHoldings.map((entry) => entry.ticker))]; const quotes = new Map(); for (const ticker of tickers) { const quote = await getQuote(ticker); quotes.set(ticker, quote); } const updatedCount = await applyRefreshedPrices(userId, quotes, new Date().toISOString()); return { updatedCount, totalTickers: tickers.length }; } async function processAnalyzeFiling(task: Task) { const accessionNumber = typeof task.payload.accessionNumber === 'string' ? task.payload.accessionNumber : ''; if (!accessionNumber) { throw new Error('accessionNumber is required'); } const filing = await getFilingByAccession(accessionNumber); if (!filing) { throw new Error(`Filing ${accessionNumber} not found`); } const prompt = [ 'You are a fiscal research assistant focused on regulatory signals.', `Analyze this SEC filing from ${filing.company_name} (${filing.ticker}).`, `Form: ${filing.filing_type}`, `Filed: ${filing.filing_date}`, `Metrics: ${JSON.stringify(filing.metrics ?? {})}`, 'Return concise sections: Thesis, Red Flags, Follow-up Questions, Portfolio Impact.' ].join('\n'); const analysis = await runOpenClawAnalysis(prompt, 'Use concise institutional analyst language.'); await saveFilingAnalysis(accessionNumber, { provider: analysis.provider, model: analysis.model, text: analysis.text }); return { accessionNumber, provider: analysis.provider, model: analysis.model }; } function holdingDigest(holdings: Holding[]) { return holdings.map((holding) => ({ ticker: holding.ticker, shares: holding.shares, avgCost: holding.avg_cost, currentPrice: holding.current_price, marketValue: holding.market_value, gainLoss: holding.gain_loss, gainLossPct: holding.gain_loss_pct })); } async function processPortfolioInsights(task: Task) { const userId = task.user_id; if (!userId) { throw new Error('Task is missing user scope'); } const userHoldings = await listUserHoldings(userId); const summary = buildPortfolioSummary(userHoldings); const prompt = [ 'Generate portfolio intelligence with actionable recommendations.', `Portfolio summary: ${JSON.stringify(summary)}`, `Holdings: ${JSON.stringify(holdingDigest(userHoldings))}`, 'Respond with: 1) health score (0-100), 2) top 3 risks, 3) top 3 opportunities, 4) next actions in 7 days.' ].join('\n'); const analysis = await runOpenClawAnalysis(prompt, 'Act as a risk-aware buy-side analyst.'); await createPortfolioInsight({ userId, provider: analysis.provider, model: analysis.model, content: analysis.text }); return { provider: analysis.provider, model: analysis.model, summary }; } export async function runTaskProcessor(task: Task) { switch (task.task_type) { case 'sync_filings': return toTaskResult(await processSyncFilings(task)); case 'refresh_prices': return toTaskResult(await processRefreshPrices(task)); case 'analyze_filing': return toTaskResult(await processAnalyzeFiling(task)); case 'portfolio_insights': return toTaskResult(await processPortfolioInsights(task)); default: throw new Error(`Unsupported task type: ${task.task_type}`); } }