Improve job status notifications

This commit is contained in:
2026-03-09 18:53:41 -04:00
parent 1a18ac825d
commit 12a9741eca
22 changed files with 2243 additions and 302 deletions

View File

@@ -4,7 +4,8 @@ import type {
FilingExtractionMeta,
Holding,
Task,
TaskStage
TaskStage,
TaskStageContext
} from '@/lib/types';
import { runAiAnalysis } from '@/lib/server/ai';
import { buildPortfolioSummary } from '@/lib/server/portfolio';
@@ -137,8 +138,49 @@ function toTaskResult(value: unknown): Record<string, unknown> {
return value as Record<string, unknown>;
}
async function setProjectionStage(task: Task, stage: TaskStage, detail: string | null = null) {
await updateTaskStage(task.id, stage, detail);
export type TaskExecutionOutcome = {
result: Record<string, unknown>;
completionDetail: string;
completionContext?: TaskStageContext | null;
};
function buildTaskOutcome(
result: unknown,
completionDetail: string,
completionContext: TaskStageContext | null = null
): TaskExecutionOutcome {
return {
result: toTaskResult(result),
completionDetail,
completionContext
};
}
async function setProjectionStage(
task: Task,
stage: TaskStage,
detail: string | null = null,
context: TaskStageContext | null = null
) {
await updateTaskStage(task.id, stage, detail, context);
}
function buildProgressContext(input: {
current: number;
total: number;
unit: string;
counters?: Record<string, number>;
subject?: TaskStageContext['subject'];
}): TaskStageContext {
return {
progress: {
current: input.current,
total: input.total,
unit: input.unit
},
counters: input.counters,
subject: input.subject
};
}
function parseTicker(raw: unknown) {
@@ -576,15 +618,25 @@ async function processSyncFilings(task: Task) {
.join(' | ');
let searchTaskId: string | null = null;
const tickerSubject = { ticker };
await setProjectionStage(
task,
'sync.fetch_filings',
`Fetching up to ${limit} filings for ${ticker}${scopeLabel ? ` (${scopeLabel})` : ''}`
`Fetching up to ${limit} filings for ${ticker}${scopeLabel ? ` (${scopeLabel})` : ''}`,
{ subject: tickerSubject }
);
const filings = await fetchRecentFilings(ticker, limit);
await setProjectionStage(task, 'sync.persist_filings', 'Persisting filings and links');
await setProjectionStage(
task,
'sync.persist_filings',
`Persisting ${filings.length} filings and source links`,
{
counters: { fetched: filings.length },
subject: tickerSubject
}
);
const saveResult = await upsertFilingsRecords(
filings.map((filing) => ({
ticker: filing.ticker,
@@ -611,8 +663,26 @@ async function processSyncFilings(task: Task) {
return isFinancialMetricsForm(filing.filing_type);
});
await setProjectionStage(task, 'sync.discover_assets', `Discovering taxonomy assets for ${hydrateCandidates.length} candidate filings`);
for (const filing of hydrateCandidates) {
await setProjectionStage(
task,
'sync.discover_assets',
`Discovering taxonomy assets for ${hydrateCandidates.length} candidate filings`,
buildProgressContext({
current: 0,
total: hydrateCandidates.length,
unit: 'filings',
counters: {
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,
hydrated: 0,
failed: 0
},
subject: tickerSubject
})
);
for (let index = 0; index < hydrateCandidates.length; index += 1) {
const filing = hydrateCandidates[index];
const existingSnapshot = await getFilingTaxonomySnapshotByFilingId(filing.id);
const shouldRefresh = !existingSnapshot
|| Date.parse(existingSnapshot.updated_at) < Date.parse(filing.updated_at);
@@ -621,8 +691,31 @@ async function processSyncFilings(task: Task) {
continue;
}
const stageContext = (stage: TaskStage) => buildProgressContext({
current: index + 1,
total: hydrateCandidates.length,
unit: 'filings',
counters: {
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,
hydrated: taxonomySnapshotsHydrated,
failed: taxonomySnapshotsFailed
},
subject: {
ticker,
accessionNumber: filing.accession_number,
label: stage
}
});
try {
await setProjectionStage(task, 'sync.extract_taxonomy', `Extracting XBRL taxonomy for ${filing.accession_number}`);
await setProjectionStage(
task,
'sync.extract_taxonomy',
`Extracting XBRL taxonomy for ${filing.accession_number}`,
stageContext('sync.extract_taxonomy')
);
const snapshot = await hydrateFilingTaxonomySnapshot({
filingId: filing.id,
ticker: filing.ticker,
@@ -634,10 +727,30 @@ async function processSyncFilings(task: Task) {
primaryDocument: filing.primary_document ?? null
});
await setProjectionStage(task, 'sync.normalize_taxonomy', `Materializing statements for ${filing.accession_number}`);
await setProjectionStage(task, 'sync.derive_metrics', `Deriving taxonomy metrics for ${filing.accession_number}`);
await setProjectionStage(task, 'sync.validate_pdf_metrics', `Validating metrics via PDF + LLM for ${filing.accession_number}`);
await setProjectionStage(task, 'sync.persist_taxonomy', `Persisting taxonomy snapshot for ${filing.accession_number}`);
await setProjectionStage(
task,
'sync.normalize_taxonomy',
`Materializing statements for ${filing.accession_number}`,
stageContext('sync.normalize_taxonomy')
);
await setProjectionStage(
task,
'sync.derive_metrics',
`Deriving taxonomy metrics for ${filing.accession_number}`,
stageContext('sync.derive_metrics')
);
await setProjectionStage(
task,
'sync.validate_pdf_metrics',
`Validating metrics via PDF + LLM for ${filing.accession_number}`,
stageContext('sync.validate_pdf_metrics')
);
await setProjectionStage(
task,
'sync.persist_taxonomy',
`Persisting taxonomy snapshot for ${filing.accession_number}`,
stageContext('sync.persist_taxonomy')
);
await upsertFilingTaxonomySnapshot(snapshot);
await updateFilingMetricsById(filing.id, snapshot.derived_metrics);
@@ -698,7 +811,7 @@ async function processSyncFilings(task: Task) {
console.error(`[search-index-sync] failed for ${ticker}:`, error);
}
return {
const result = {
ticker,
category,
tags,
@@ -709,6 +822,24 @@ async function processSyncFilings(task: Task) {
taxonomySnapshotsFailed,
searchTaskId
};
return buildTaskOutcome(
result,
`Synced ${filings.length} filings for ${ticker}, hydrated ${taxonomySnapshotsHydrated} taxonomy snapshots, failed ${taxonomySnapshotsFailed}.`,
buildProgressContext({
current: hydrateCandidates.length,
total: hydrateCandidates.length || 1,
unit: 'filings',
counters: {
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,
hydrated: taxonomySnapshotsHydrated,
failed: taxonomySnapshotsFailed
},
subject: tickerSubject
})
);
}
async function processRefreshPrices(task: Task) {
@@ -721,20 +852,84 @@ async function processRefreshPrices(task: Task) {
const userHoldings = await listHoldingsForPriceRefresh(userId);
const tickers = [...new Set(userHoldings.map((entry) => entry.ticker))];
const quotes = new Map<string, number>();
const baseContext = {
counters: {
holdings: userHoldings.length
}
} satisfies TaskStageContext;
await setProjectionStage(task, 'refresh.fetch_quotes', `Fetching quotes for ${tickers.length} tickers`);
for (const ticker of tickers) {
await setProjectionStage(
task,
'refresh.load_holdings',
`Loaded ${userHoldings.length} holdings across ${tickers.length} tickers`,
baseContext
);
await setProjectionStage(
task,
'refresh.fetch_quotes',
`Fetching quotes for ${tickers.length} tickers`,
buildProgressContext({
current: 0,
total: tickers.length,
unit: 'tickers',
counters: {
holdings: userHoldings.length
}
})
);
for (let index = 0; index < tickers.length; index += 1) {
const ticker = tickers[index];
const quote = await getQuote(ticker);
quotes.set(ticker, quote);
await setProjectionStage(
task,
'refresh.fetch_quotes',
`Fetching quotes for ${tickers.length} tickers`,
buildProgressContext({
current: index + 1,
total: tickers.length,
unit: 'tickers',
counters: {
holdings: userHoldings.length
},
subject: { ticker }
})
);
}
await setProjectionStage(task, 'refresh.persist_prices', 'Writing refreshed prices to holdings');
await setProjectionStage(
task,
'refresh.persist_prices',
`Writing refreshed prices for ${tickers.length} tickers across ${userHoldings.length} holdings`,
{
counters: {
holdings: userHoldings.length
}
}
);
const updatedCount = await applyRefreshedPrices(userId, quotes, new Date().toISOString());
return {
const result = {
updatedCount,
totalTickers: tickers.length
};
return buildTaskOutcome(
result,
`Refreshed prices for ${tickers.length} tickers across ${userHoldings.length} holdings.`,
{
progress: {
current: tickers.length,
total: tickers.length || 1,
unit: 'tickers'
},
counters: {
holdings: userHoldings.length,
updatedCount
}
}
);
}
async function processAnalyzeFiling(task: Task) {
@@ -746,13 +941,23 @@ async function processAnalyzeFiling(task: Task) {
throw new Error('accessionNumber is required');
}
await setProjectionStage(task, 'analyze.load_filing', `Loading filing ${accessionNumber}`);
await setProjectionStage(task, 'analyze.load_filing', `Loading filing ${accessionNumber}`, {
subject: {
accessionNumber
}
});
const filing = await getFilingByAccession(accessionNumber);
if (!filing) {
throw new Error(`Filing ${accessionNumber} not found`);
}
const analyzeSubject = {
ticker: filing.ticker,
accessionNumber,
label: filing.filing_type
};
const defaultExtraction = deterministicExtractionFallback(filing);
let extraction = defaultExtraction;
let extractionMeta: FilingExtractionMeta = {
@@ -764,7 +969,9 @@ async function processAnalyzeFiling(task: Task) {
let filingDocument: Awaited<ReturnType<typeof fetchPrimaryFilingText>> | null = null;
try {
await setProjectionStage(task, 'analyze.fetch_document', 'Fetching primary filing document');
await setProjectionStage(task, 'analyze.fetch_document', 'Fetching primary filing document', {
subject: analyzeSubject
});
filingDocument = await fetchPrimaryFilingText({
filingUrl: filing.filing_url,
cik: filing.cik,
@@ -776,7 +983,9 @@ async function processAnalyzeFiling(task: Task) {
}
if (filingDocument?.text) {
await setProjectionStage(task, 'analyze.extract', 'Generating extraction context from filing text');
await setProjectionStage(task, 'analyze.extract', 'Generating extraction context from filing text', {
subject: analyzeSubject
});
const ruleBasedExtraction = buildRuleBasedExtraction(filing, filingDocument.text);
const extractionResult = await runAiAnalysis(
extractionPrompt(filing, filingDocument.text),
@@ -798,14 +1007,18 @@ async function processAnalyzeFiling(task: Task) {
};
}
await setProjectionStage(task, 'analyze.generate_report', 'Generating final filing analysis report');
await setProjectionStage(task, 'analyze.generate_report', 'Generating final filing analysis report', {
subject: analyzeSubject
});
const analysis = await runAiAnalysis(
reportPrompt(filing, extraction, extractionMeta),
'Use concise institutional analyst language.',
{ workload: 'report' }
);
await setProjectionStage(task, 'analyze.persist_report', 'Persisting filing analysis output');
await setProjectionStage(task, 'analyze.persist_report', 'Persisting filing analysis output', {
subject: analyzeSubject
});
await saveFilingAnalysis(accessionNumber, {
provider: analysis.provider,
model: analysis.model,
@@ -831,14 +1044,24 @@ async function processAnalyzeFiling(task: Task) {
console.error(`[search-index-analyze] failed for ${accessionNumber}:`, error);
}
return {
const result = {
ticker: filing.ticker,
accessionNumber,
filingType: filing.filing_type,
provider: analysis.provider,
model: analysis.model,
extractionProvider: extractionMeta.provider,
extractionModel: extractionMeta.model,
searchTaskId
};
return buildTaskOutcome(
result,
`Analysis report generated for ${filing.ticker} ${filing.filing_type} ${accessionNumber}.`,
{
subject: analyzeSubject
}
);
}
async function processIndexSearch(task: Task) {
@@ -890,33 +1113,55 @@ async function processIndexSearch(task: Task) {
scope: entry.scope === 'user' ? 'user' : 'global',
userId: typeof entry.userId === 'string' ? entry.userId : null
})),
onStage: async (stage, detail) => {
onStage: async (stage, detail, context) => {
switch (stage) {
case 'collect':
await setProjectionStage(task, 'search.collect_sources', detail);
await setProjectionStage(task, 'search.collect_sources', detail, context ?? {
subject: ticker ? { ticker } : accessionNumber ? { accessionNumber } : null
});
break;
case 'fetch':
await setProjectionStage(task, 'search.fetch_documents', detail);
await setProjectionStage(task, 'search.fetch_documents', detail, context ?? null);
break;
case 'chunk':
await setProjectionStage(task, 'search.chunk', detail);
await setProjectionStage(task, 'search.chunk', detail, context ?? null);
break;
case 'embed':
await setProjectionStage(task, 'search.embed', detail);
await setProjectionStage(task, 'search.embed', detail, context ?? null);
break;
case 'persist':
await setProjectionStage(task, 'search.persist', detail);
await setProjectionStage(task, 'search.persist', detail, context ?? null);
break;
}
}
});
return {
const taskResult = {
ticker,
accessionNumber,
journalEntryId: validatedJournalEntryId,
...result
};
return buildTaskOutcome(
taskResult,
`Indexed ${result.indexed} sources, embedded ${result.chunksEmbedded} chunks, skipped ${result.skipped}, deleted ${result.deleted} stale documents.`,
{
progress: {
current: result.sourcesCollected,
total: result.sourcesCollected || 1,
unit: 'sources'
},
counters: {
sourcesCollected: result.sourcesCollected,
indexed: result.indexed,
chunksEmbedded: result.chunksEmbedded,
skipped: result.skipped,
deleted: result.deleted
},
subject: ticker ? { ticker } : accessionNumber ? { accessionNumber } : null
}
);
}
function holdingDigest(holdings: Holding[]) {
@@ -940,6 +1185,18 @@ async function processPortfolioInsights(task: Task) {
await setProjectionStage(task, 'insights.load_holdings', 'Loading holdings for portfolio insight generation');
const userHoldings = await listUserHoldings(userId);
const summary = buildPortfolioSummary(userHoldings);
const holdingsContext = {
counters: {
holdings: userHoldings.length
}
} satisfies TaskStageContext;
await setProjectionStage(
task,
'insights.load_holdings',
`Loaded ${userHoldings.length} holdings for portfolio insight generation`,
holdingsContext
);
const prompt = [
'Generate portfolio intelligence with actionable recommendations.',
@@ -948,14 +1205,14 @@ async function processPortfolioInsights(task: Task) {
'Respond with: 1) health score (0-100), 2) top 3 risks, 3) top 3 opportunities, 4) next actions in 7 days.'
].join('\n');
await setProjectionStage(task, 'insights.generate', 'Generating portfolio AI insight');
await setProjectionStage(task, 'insights.generate', 'Generating portfolio AI insight', holdingsContext);
const analysis = await runAiAnalysis(
prompt,
'Act as a risk-aware buy-side analyst.',
{ workload: 'report' }
);
await setProjectionStage(task, 'insights.persist', 'Persisting generated portfolio insight');
await setProjectionStage(task, 'insights.persist', 'Persisting generated portfolio insight', holdingsContext);
await createPortfolioInsight({
userId,
provider: analysis.provider,
@@ -963,11 +1220,21 @@ async function processPortfolioInsights(task: Task) {
content: analysis.text
});
return {
const result = {
provider: analysis.provider,
model: analysis.model,
summary
};
return buildTaskOutcome(
result,
`Generated portfolio insight for ${summary.positions} holdings.`,
{
counters: {
holdings: summary.positions
}
}
);
}
export const __taskProcessorInternals = {
@@ -979,15 +1246,15 @@ export const __taskProcessorInternals = {
export async function runTaskProcessor(task: Task) {
switch (task.task_type) {
case 'sync_filings':
return toTaskResult(await processSyncFilings(task));
return await processSyncFilings(task);
case 'refresh_prices':
return toTaskResult(await processRefreshPrices(task));
return await processRefreshPrices(task);
case 'analyze_filing':
return toTaskResult(await processAnalyzeFiling(task));
return await processAnalyzeFiling(task);
case 'portfolio_insights':
return toTaskResult(await processPortfolioInsights(task));
return await processPortfolioInsights(task);
case 'index_search':
return toTaskResult(await processIndexSearch(task));
return await processIndexSearch(task);
default:
throw new Error(`Unsupported task type: ${task.task_type}`);
}