Add search and RAG workspace flows
This commit is contained in:
@@ -9,6 +9,7 @@ import type {
|
||||
import { runAiAnalysis } from '@/lib/server/ai';
|
||||
import { buildPortfolioSummary } from '@/lib/server/portfolio';
|
||||
import { getQuote } from '@/lib/server/prices';
|
||||
import { indexSearchDocuments } from '@/lib/server/search';
|
||||
import {
|
||||
getFilingByAccession,
|
||||
listFilingsRecords,
|
||||
@@ -34,6 +35,7 @@ import {
|
||||
fetchPrimaryFilingText,
|
||||
fetchRecentFilings
|
||||
} from '@/lib/server/sec';
|
||||
import { enqueueTask } from '@/lib/server/tasks';
|
||||
import { hydrateFilingTaxonomySnapshot } from '@/lib/server/taxonomy/engine';
|
||||
|
||||
const EXTRACTION_REQUIRED_KEYS = [
|
||||
@@ -167,6 +169,17 @@ function parseOptionalText(raw: unknown) {
|
||||
return normalized.length > 0 ? normalized : null;
|
||||
}
|
||||
|
||||
function parseOptionalStringArray(raw: unknown) {
|
||||
if (!Array.isArray(raw)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return raw
|
||||
.filter((entry): entry is string => typeof entry === 'string')
|
||||
.map((entry) => entry.trim())
|
||||
.filter((entry) => entry.length > 0);
|
||||
}
|
||||
|
||||
function parseTags(raw: unknown) {
|
||||
if (!Array.isArray(raw)) {
|
||||
return [];
|
||||
@@ -562,6 +575,8 @@ async function processSyncFilings(task: Task) {
|
||||
.filter((entry): entry is string => Boolean(entry))
|
||||
.join(' | ');
|
||||
|
||||
let searchTaskId: string | null = null;
|
||||
|
||||
await setProjectionStage(
|
||||
task,
|
||||
'sync.fetch_filings',
|
||||
@@ -667,6 +682,22 @@ async function processSyncFilings(task: Task) {
|
||||
await Bun.sleep(STATEMENT_HYDRATION_DELAY_MS);
|
||||
}
|
||||
|
||||
try {
|
||||
const searchTask = await enqueueTask({
|
||||
userId: task.user_id,
|
||||
taskType: 'index_search',
|
||||
payload: {
|
||||
ticker,
|
||||
sourceKinds: ['filing_document', 'filing_brief']
|
||||
},
|
||||
priority: 55,
|
||||
resourceKey: `index_search:ticker:${ticker}`
|
||||
});
|
||||
searchTaskId = searchTask.id;
|
||||
} catch (error) {
|
||||
console.error(`[search-index-sync] failed for ${ticker}:`, error);
|
||||
}
|
||||
|
||||
return {
|
||||
ticker,
|
||||
category,
|
||||
@@ -675,7 +706,8 @@ async function processSyncFilings(task: Task) {
|
||||
inserted: saveResult.inserted,
|
||||
updated: saveResult.updated,
|
||||
taxonomySnapshotsHydrated,
|
||||
taxonomySnapshotsFailed
|
||||
taxonomySnapshotsFailed,
|
||||
searchTaskId
|
||||
};
|
||||
}
|
||||
|
||||
@@ -782,12 +814,108 @@ async function processAnalyzeFiling(task: Task) {
|
||||
extractionMeta
|
||||
});
|
||||
|
||||
let searchTaskId: string | null = null;
|
||||
try {
|
||||
const searchTask = await enqueueTask({
|
||||
userId: task.user_id,
|
||||
taskType: 'index_search',
|
||||
payload: {
|
||||
accessionNumber,
|
||||
sourceKinds: ['filing_brief']
|
||||
},
|
||||
priority: 58,
|
||||
resourceKey: `index_search:filing_brief:${accessionNumber}`
|
||||
});
|
||||
searchTaskId = searchTask.id;
|
||||
} catch (error) {
|
||||
console.error(`[search-index-analyze] failed for ${accessionNumber}:`, error);
|
||||
}
|
||||
|
||||
return {
|
||||
accessionNumber,
|
||||
provider: analysis.provider,
|
||||
model: analysis.model,
|
||||
extractionProvider: extractionMeta.provider,
|
||||
extractionModel: extractionMeta.model
|
||||
extractionModel: extractionMeta.model,
|
||||
searchTaskId
|
||||
};
|
||||
}
|
||||
|
||||
async function processIndexSearch(task: Task) {
|
||||
await setProjectionStage(task, 'search.collect_sources', 'Collecting source records for search indexing');
|
||||
|
||||
const ticker = parseOptionalText(task.payload.ticker);
|
||||
const accessionNumber = parseOptionalText(task.payload.accessionNumber);
|
||||
const journalEntryId = task.payload.journalEntryId === undefined
|
||||
? null
|
||||
: Number(task.payload.journalEntryId);
|
||||
const deleteSourceRefs = Array.isArray(task.payload.deleteSourceRefs)
|
||||
? task.payload.deleteSourceRefs
|
||||
.filter((entry): entry is {
|
||||
sourceKind: string;
|
||||
sourceRef: string;
|
||||
scope: string;
|
||||
userId?: string | null;
|
||||
} => {
|
||||
return Boolean(
|
||||
entry
|
||||
&& typeof entry === 'object'
|
||||
&& typeof (entry as { sourceKind?: unknown }).sourceKind === 'string'
|
||||
&& typeof (entry as { sourceRef?: unknown }).sourceRef === 'string'
|
||||
&& typeof (entry as { scope?: unknown }).scope === 'string'
|
||||
);
|
||||
})
|
||||
: [];
|
||||
const sourceKinds = parseOptionalStringArray(task.payload.sourceKinds)
|
||||
.filter((sourceKind): sourceKind is 'filing_document' | 'filing_brief' | 'research_note' => {
|
||||
return sourceKind === 'filing_document'
|
||||
|| sourceKind === 'filing_brief'
|
||||
|| sourceKind === 'research_note';
|
||||
});
|
||||
const validatedJournalEntryId = typeof journalEntryId === 'number'
|
||||
&& Number.isInteger(journalEntryId)
|
||||
&& journalEntryId > 0
|
||||
? journalEntryId
|
||||
: null;
|
||||
|
||||
const result = await indexSearchDocuments({
|
||||
userId: task.user_id,
|
||||
ticker,
|
||||
accessionNumber,
|
||||
journalEntryId: validatedJournalEntryId,
|
||||
sourceKinds: sourceKinds.length > 0 ? sourceKinds : undefined,
|
||||
deleteSourceRefs: deleteSourceRefs.map((entry) => ({
|
||||
sourceKind: entry.sourceKind as 'filing_document' | 'filing_brief' | 'research_note',
|
||||
sourceRef: entry.sourceRef,
|
||||
scope: entry.scope === 'user' ? 'user' : 'global',
|
||||
userId: typeof entry.userId === 'string' ? entry.userId : null
|
||||
})),
|
||||
onStage: async (stage, detail) => {
|
||||
switch (stage) {
|
||||
case 'collect':
|
||||
await setProjectionStage(task, 'search.collect_sources', detail);
|
||||
break;
|
||||
case 'fetch':
|
||||
await setProjectionStage(task, 'search.fetch_documents', detail);
|
||||
break;
|
||||
case 'chunk':
|
||||
await setProjectionStage(task, 'search.chunk', detail);
|
||||
break;
|
||||
case 'embed':
|
||||
await setProjectionStage(task, 'search.embed', detail);
|
||||
break;
|
||||
case 'persist':
|
||||
await setProjectionStage(task, 'search.persist', detail);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
ticker,
|
||||
accessionNumber,
|
||||
journalEntryId: validatedJournalEntryId,
|
||||
...result
|
||||
};
|
||||
}
|
||||
|
||||
@@ -858,6 +986,8 @@ export async function runTaskProcessor(task: Task) {
|
||||
return toTaskResult(await processAnalyzeFiling(task));
|
||||
case 'portfolio_insights':
|
||||
return toTaskResult(await processPortfolioInsights(task));
|
||||
case 'index_search':
|
||||
return toTaskResult(await processIndexSearch(task));
|
||||
default:
|
||||
throw new Error(`Unsupported task type: ${task.task_type}`);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user