import { randomUUID } from 'node:crypto'; import type { Filing, Holding, PortfolioInsight, Task, TaskStatus, TaskType } from '@/lib/types'; import { runOpenClawAnalysis } from '@/lib/server/openclaw'; import { buildPortfolioSummary, recalculateHolding } from '@/lib/server/portfolio'; import { getQuote } from '@/lib/server/prices'; import { fetchFilingMetrics, fetchRecentFilings } from '@/lib/server/sec'; import { getStoreSnapshot, withStore } from '@/lib/server/store'; type EnqueueTaskInput = { taskType: TaskType; payload?: Record; priority?: number; maxAttempts?: number; }; const activeTaskRuns = new Set(); function nowIso() { return new Date().toISOString(); } function toTaskResult(value: unknown): Record { if (!value || typeof value !== 'object' || Array.isArray(value)) { return { value }; } return value as Record; } function parseTicker(raw: unknown) { if (typeof raw !== 'string' || raw.trim().length < 1) { throw new Error('Ticker is required'); } return raw.trim().toUpperCase(); } function parseLimit(raw: unknown, fallback: number, min: number, max: number) { const numberValue = typeof raw === 'number' ? raw : Number(raw); if (!Number.isFinite(numberValue)) { return fallback; } const intValue = Math.trunc(numberValue); return Math.min(Math.max(intValue, min), max); } function queueTaskRun(taskId: string, delayMs = 40) { setTimeout(() => { void processTask(taskId); }, delayMs); } async function markTask(taskId: string, mutator: (task: Task) => void) { await withStore((store) => { const index = store.tasks.findIndex((task) => task.id === taskId); if (index < 0) { return; } const task = store.tasks[index]; mutator(task); task.updated_at = nowIso(); }); } async function processSyncFilings(task: Task) { const ticker = parseTicker(task.payload.ticker); const limit = parseLimit(task.payload.limit, 20, 1, 50); const filings = await fetchRecentFilings(ticker, limit); const metricsByCik = new Map(); for (const filing of filings) { if (!metricsByCik.has(filing.cik)) { const metrics = await fetchFilingMetrics(filing.cik, filing.ticker); metricsByCik.set(filing.cik, metrics); } } let insertedCount = 0; let updatedCount = 0; await withStore((store) => { for (const filing of filings) { const existingIndex = store.filings.findIndex((entry) => entry.accession_number === filing.accessionNumber); const timestamp = nowIso(); const metrics = metricsByCik.get(filing.cik) ?? null; if (existingIndex >= 0) { const existing = store.filings[existingIndex]; store.filings[existingIndex] = { ...existing, ticker: filing.ticker, cik: filing.cik, filing_type: filing.filingType, filing_date: filing.filingDate, company_name: filing.companyName, filing_url: filing.filingUrl, metrics, updated_at: timestamp }; updatedCount += 1; } else { store.counters.filings += 1; store.filings.unshift({ id: store.counters.filings, ticker: filing.ticker, filing_type: filing.filingType, filing_date: filing.filingDate, accession_number: filing.accessionNumber, cik: filing.cik, company_name: filing.companyName, filing_url: filing.filingUrl, metrics, analysis: null, created_at: timestamp, updated_at: timestamp }); insertedCount += 1; } } store.filings.sort((a, b) => { const byDate = Date.parse(b.filing_date) - Date.parse(a.filing_date); return Number.isFinite(byDate) && byDate !== 0 ? byDate : Date.parse(b.updated_at) - Date.parse(a.updated_at); }); }); return { ticker, fetched: filings.length, inserted: insertedCount, updated: updatedCount }; } async function processRefreshPrices() { const snapshot = await getStoreSnapshot(); const tickers = [...new Set(snapshot.holdings.map((holding) => holding.ticker))]; const quotes = new Map(); for (const ticker of tickers) { const quote = await getQuote(ticker); quotes.set(ticker, quote); } let updatedCount = 0; const updateTime = nowIso(); await withStore((store) => { store.holdings = store.holdings.map((holding) => { const quote = quotes.get(holding.ticker); if (quote === undefined) { return holding; } updatedCount += 1; return recalculateHolding({ ...holding, current_price: quote.toFixed(6), last_price_at: updateTime, updated_at: updateTime }); }); }); return { updatedCount, totalTickers: tickers.length }; } async function processAnalyzeFiling(task: Task) { const accessionNumber = typeof task.payload.accessionNumber === 'string' ? task.payload.accessionNumber : ''; if (!accessionNumber) { throw new Error('accessionNumber is required'); } const snapshot = await getStoreSnapshot(); const filing = snapshot.filings.find((entry) => entry.accession_number === accessionNumber); if (!filing) { throw new Error(`Filing ${accessionNumber} not found`); } const prompt = [ 'You are a fiscal research assistant focused on regulatory signals.', `Analyze this SEC filing from ${filing.company_name} (${filing.ticker}).`, `Form: ${filing.filing_type}`, `Filed: ${filing.filing_date}`, `Metrics: ${JSON.stringify(filing.metrics ?? {})}`, 'Return concise sections: Thesis, Red Flags, Follow-up Questions, Portfolio Impact.' ].join('\n'); const analysis = await runOpenClawAnalysis(prompt, 'Use concise institutional analyst language.'); await withStore((store) => { const index = store.filings.findIndex((entry) => entry.accession_number === accessionNumber); if (index < 0) { return; } store.filings[index] = { ...store.filings[index], analysis: { provider: analysis.provider, model: analysis.model, text: analysis.text }, updated_at: nowIso() }; }); return { accessionNumber, provider: analysis.provider, model: analysis.model }; } function holdingDigest(holdings: Holding[]) { return holdings.map((holding) => ({ ticker: holding.ticker, shares: holding.shares, avgCost: holding.avg_cost, currentPrice: holding.current_price, marketValue: holding.market_value, gainLoss: holding.gain_loss, gainLossPct: holding.gain_loss_pct })); } async function processPortfolioInsights() { const snapshot = await getStoreSnapshot(); const summary = buildPortfolioSummary(snapshot.holdings); const prompt = [ 'Generate portfolio intelligence with actionable recommendations.', `Portfolio summary: ${JSON.stringify(summary)}`, `Holdings: ${JSON.stringify(holdingDigest(snapshot.holdings))}`, 'Respond with: 1) health score (0-100), 2) top 3 risks, 3) top 3 opportunities, 4) next actions in 7 days.' ].join('\n'); const analysis = await runOpenClawAnalysis(prompt, 'Act as a risk-aware buy-side analyst.'); const createdAt = nowIso(); await withStore((store) => { store.counters.insights += 1; const insight: PortfolioInsight = { id: store.counters.insights, user_id: 1, provider: analysis.provider, model: analysis.model, content: analysis.text, created_at: createdAt }; store.insights.unshift(insight); }); return { provider: analysis.provider, model: analysis.model, summary }; } async function runTaskProcessor(task: Task) { switch (task.task_type) { case 'sync_filings': return await processSyncFilings(task); case 'refresh_prices': return await processRefreshPrices(); case 'analyze_filing': return await processAnalyzeFiling(task); case 'portfolio_insights': return await processPortfolioInsights(); default: throw new Error(`Unsupported task type: ${task.task_type}`); } } async function processTask(taskId: string) { if (activeTaskRuns.has(taskId)) { return; } activeTaskRuns.add(taskId); try { const task = await withStore((store) => { const index = store.tasks.findIndex((entry) => entry.id === taskId); if (index < 0) { return null; } const target = store.tasks[index]; if (target.status !== 'queued') { return null; } target.status = 'running'; target.attempts += 1; target.updated_at = nowIso(); return { ...target }; }); if (!task) { return; } try { const result = toTaskResult(await runTaskProcessor(task)); await markTask(taskId, (target) => { target.status = 'completed'; target.result = result; target.error = null; target.finished_at = nowIso(); }); } catch (error) { const reason = error instanceof Error ? error.message : 'Task failed unexpectedly'; const shouldRetry = task.attempts < task.max_attempts; if (shouldRetry) { await markTask(taskId, (target) => { target.status = 'queued'; target.error = reason; }); queueTaskRun(taskId, 1200); } else { await markTask(taskId, (target) => { target.status = 'failed'; target.error = reason; target.finished_at = nowIso(); }); } } } finally { activeTaskRuns.delete(taskId); } } export async function enqueueTask(input: EnqueueTaskInput) { const createdAt = nowIso(); const task: Task = { id: randomUUID(), task_type: input.taskType, status: 'queued', priority: input.priority ?? 50, payload: input.payload ?? {}, result: null, error: null, attempts: 0, max_attempts: input.maxAttempts ?? 3, created_at: createdAt, updated_at: createdAt, finished_at: null }; await withStore((store) => { store.tasks.unshift(task); store.tasks.sort((a, b) => { if (a.priority !== b.priority) { return b.priority - a.priority; } return Date.parse(b.created_at) - Date.parse(a.created_at); }); }); queueTaskRun(task.id); return task; } export async function getTaskById(taskId: string) { const snapshot = await getStoreSnapshot(); return snapshot.tasks.find((task) => task.id === taskId) ?? null; } export async function listRecentTasks(limit = 20, statuses?: TaskStatus[]) { const safeLimit = Math.min(Math.max(Math.trunc(limit), 1), 200); const snapshot = await getStoreSnapshot(); const filtered = statuses && statuses.length > 0 ? snapshot.tasks.filter((task) => statuses.includes(task.status)) : snapshot.tasks; return filtered .slice() .sort((a, b) => Date.parse(b.created_at) - Date.parse(a.created_at)) .slice(0, safeLimit); }