1699 lines
47 KiB
TypeScript
1699 lines
47 KiB
TypeScript
import type {
|
|
Filing,
|
|
FilingExtraction,
|
|
FilingExtractionMeta,
|
|
Holding,
|
|
Task,
|
|
TaskStage,
|
|
TaskStageContext,
|
|
} from "@/lib/types";
|
|
import { runAiAnalysis } from "@/lib/server/ai";
|
|
import { buildPortfolioSummary } from "@/lib/server/portfolio";
|
|
import { getQuote } from "@/lib/server/prices";
|
|
import { indexSearchDocuments } from "@/lib/server/search";
|
|
import {
|
|
getFilingByAccession,
|
|
listFilingsRecords,
|
|
saveFilingAnalysis,
|
|
updateFilingMetricsById,
|
|
upsertFilingsRecords,
|
|
} from "@/lib/server/repos/filings";
|
|
import { deleteCompanyFinancialBundlesForTicker } from "@/lib/server/repos/company-financial-bundles";
|
|
import {
|
|
getFilingTaxonomySnapshotByFilingId,
|
|
normalizeFilingTaxonomySnapshotPayload,
|
|
upsertFilingTaxonomySnapshot,
|
|
} from "@/lib/server/repos/filing-taxonomy";
|
|
import {
|
|
applyRefreshedPrices,
|
|
listHoldingsForPriceRefresh,
|
|
listUserHoldings,
|
|
} from "@/lib/server/repos/holdings";
|
|
import { createPortfolioInsight } from "@/lib/server/repos/insights";
|
|
import { updateTaskStage } from "@/lib/server/repos/tasks";
|
|
import { fetchPrimaryFilingText, fetchRecentFilings } from "@/lib/server/sec";
|
|
import {
|
|
generateIssuerOverlayForTicker,
|
|
recordIssuerOverlayBuildFailure,
|
|
} from "@/lib/server/issuer-overlays";
|
|
import {
|
|
getActiveIssuerOverlayDefinition,
|
|
getIssuerOverlay,
|
|
} from "@/lib/server/repos/issuer-overlays";
|
|
import { enqueueTask } from "@/lib/server/tasks";
|
|
import { hydrateFilingTaxonomySnapshot } from "@/lib/server/taxonomy/engine";
|
|
import { validateMetricsWithPdfLlm } from "@/lib/server/taxonomy/pdf-validation";
|
|
|
|
const EXTRACTION_REQUIRED_KEYS = [
|
|
"summary",
|
|
"keyPoints",
|
|
"redFlags",
|
|
"followUpQuestions",
|
|
"portfolioSignals",
|
|
"segmentSpecificData",
|
|
"geographicRevenueBreakdown",
|
|
"companySpecificData",
|
|
"secApiCrossChecks",
|
|
"confidence",
|
|
] as const;
|
|
const EXTRACTION_MAX_ITEMS = 6;
|
|
const EXTRACTION_ITEM_MAX_LENGTH = 280;
|
|
const EXTRACTION_SUMMARY_MAX_LENGTH = 900;
|
|
const STATEMENT_HYDRATION_DELAY_MS = 120;
|
|
const STATEMENT_HYDRATION_MAX_FILINGS = 80;
|
|
const SEGMENT_PATTERNS = [
|
|
/\boperating segment\b/i,
|
|
/\bsegment revenue\b/i,
|
|
/\bsegment margin\b/i,
|
|
/\bsegment profit\b/i,
|
|
/\bbusiness segment\b/i,
|
|
/\breportable segment\b/i,
|
|
];
|
|
const GEOGRAPHIC_PATTERNS = [
|
|
/\bgeographic\b/i,
|
|
/\bamericas\b/i,
|
|
/\bemea\b/i,
|
|
/\bapac\b/i,
|
|
/\basia pacific\b/i,
|
|
/\bnorth america\b/i,
|
|
/\beurope\b/i,
|
|
/\bchina\b/i,
|
|
/\binternational\b/i,
|
|
];
|
|
const COMPANY_SPECIFIC_PATTERNS = [
|
|
/\bsame[- ]store\b/i,
|
|
/\bcomparable[- ]store\b/i,
|
|
/\bcomp sales\b/i,
|
|
/\borganic sales\b/i,
|
|
/\bbookings\b/i,
|
|
/\bbacklog\b/i,
|
|
/\barpu\b/i,
|
|
/\bmau\b/i,
|
|
/\bdau\b/i,
|
|
/\bsubscriber\b/i,
|
|
/\boccupancy\b/i,
|
|
/\brevpar\b/i,
|
|
/\bretention\b/i,
|
|
/\bchurn\b/i,
|
|
];
|
|
|
|
type FilingMetricKey = keyof NonNullable<Filing["metrics"]>;
|
|
|
|
function isFinancialMetricsForm(
|
|
filingType: string,
|
|
): filingType is "10-K" | "10-Q" {
|
|
return filingType === "10-K" || filingType === "10-Q";
|
|
}
|
|
|
|
const METRIC_CHECK_PATTERNS: Array<{
|
|
key: FilingMetricKey;
|
|
label: string;
|
|
patterns: RegExp[];
|
|
}> = [
|
|
{
|
|
key: "revenue",
|
|
label: "Revenue",
|
|
patterns: [/\brevenue\b/i, /\bsales\b/i],
|
|
},
|
|
{
|
|
key: "netIncome",
|
|
label: "Net income",
|
|
patterns: [/\bnet income\b/i, /\bprofit\b/i],
|
|
},
|
|
{
|
|
key: "totalAssets",
|
|
label: "Total assets",
|
|
patterns: [/\btotal assets\b/i, /\bassets\b/i],
|
|
},
|
|
{
|
|
key: "cash",
|
|
label: "Cash",
|
|
patterns: [/\bcash\b/i, /\bcash equivalents\b/i],
|
|
},
|
|
{
|
|
key: "debt",
|
|
label: "Debt",
|
|
patterns: [/\bdebt\b/i, /\bborrowings\b/i, /\bliabilit(?:y|ies)\b/i],
|
|
},
|
|
];
|
|
|
|
function toTaskResult(value: unknown): Record<string, unknown> {
|
|
if (!value || typeof value !== "object" || Array.isArray(value)) {
|
|
return { value };
|
|
}
|
|
|
|
return value as Record<string, unknown>;
|
|
}
|
|
|
|
export type TaskExecutionOutcome = {
|
|
result: Record<string, unknown>;
|
|
completionDetail: string;
|
|
completionContext?: TaskStageContext | null;
|
|
};
|
|
|
|
type TaxonomyHydrationRunResult = {
|
|
hydrated: number;
|
|
failed: number;
|
|
touchedFilingIds: Set<number>;
|
|
};
|
|
|
|
function buildTaskOutcome(
|
|
result: unknown,
|
|
completionDetail: string,
|
|
completionContext: TaskStageContext | null = null,
|
|
): TaskExecutionOutcome {
|
|
return {
|
|
result: toTaskResult(result),
|
|
completionDetail,
|
|
completionContext,
|
|
};
|
|
}
|
|
|
|
function shouldRefreshTaxonomySnapshot(input: {
|
|
existingSnapshot: Awaited<
|
|
ReturnType<typeof getFilingTaxonomySnapshotByFilingId>
|
|
> | null;
|
|
filingUpdatedAt: string;
|
|
overlayRevisionId: number | null;
|
|
}) {
|
|
if (!input.existingSnapshot) {
|
|
return true;
|
|
}
|
|
|
|
if (
|
|
Date.parse(input.existingSnapshot.updated_at) <
|
|
Date.parse(input.filingUpdatedAt)
|
|
) {
|
|
return true;
|
|
}
|
|
|
|
return (
|
|
(input.existingSnapshot.issuer_overlay_revision_id ?? null) !==
|
|
input.overlayRevisionId
|
|
);
|
|
}
|
|
|
|
async function hydrateTaxonomySnapshotsForCandidates(input: {
|
|
task: Task;
|
|
ticker: string;
|
|
filingsCount: number;
|
|
saveResult: { inserted: number; updated: number };
|
|
candidates: Array<Filing & { filing_type: "10-K" | "10-Q" }>;
|
|
overlayRevisionId: number | null;
|
|
}) {
|
|
const overlayDefinition =
|
|
input.overlayRevisionId === null
|
|
? null
|
|
: await getActiveIssuerOverlayDefinition(input.ticker);
|
|
let hydrated = 0;
|
|
let failed = 0;
|
|
const touchedFilingIds = new Set<number>();
|
|
|
|
for (let index = 0; index < input.candidates.length; index += 1) {
|
|
const filing = input.candidates[index];
|
|
const existingSnapshot = await getFilingTaxonomySnapshotByFilingId(
|
|
filing.id,
|
|
);
|
|
if (
|
|
!shouldRefreshTaxonomySnapshot({
|
|
existingSnapshot,
|
|
filingUpdatedAt: filing.updated_at,
|
|
overlayRevisionId: input.overlayRevisionId,
|
|
})
|
|
) {
|
|
continue;
|
|
}
|
|
|
|
touchedFilingIds.add(filing.id);
|
|
const stageContext = (stage: TaskStage) =>
|
|
buildProgressContext({
|
|
current: index + 1,
|
|
total: input.candidates.length,
|
|
unit: "filings",
|
|
counters: {
|
|
fetched: input.filingsCount,
|
|
inserted: input.saveResult.inserted,
|
|
updated: input.saveResult.updated,
|
|
hydrated,
|
|
failed,
|
|
},
|
|
subject: {
|
|
ticker: input.ticker,
|
|
accessionNumber: filing.accession_number,
|
|
label: stage,
|
|
},
|
|
});
|
|
|
|
try {
|
|
await setProjectionStage(
|
|
input.task,
|
|
"sync.extract_taxonomy",
|
|
`Extracting XBRL taxonomy for ${filing.accession_number}`,
|
|
stageContext("sync.extract_taxonomy"),
|
|
);
|
|
const snapshot = await hydrateFilingTaxonomySnapshot({
|
|
filingId: filing.id,
|
|
ticker: filing.ticker,
|
|
cik: filing.cik,
|
|
accessionNumber: filing.accession_number,
|
|
filingDate: filing.filing_date,
|
|
filingType: filing.filing_type,
|
|
filingUrl: filing.filing_url,
|
|
primaryDocument: filing.primary_document ?? null,
|
|
issuerOverlay: overlayDefinition,
|
|
});
|
|
let pdfValidation = {
|
|
validation_result: snapshot.validation_result,
|
|
metric_validations: snapshot.metric_validations,
|
|
};
|
|
|
|
try {
|
|
pdfValidation = await validateMetricsWithPdfLlm({
|
|
metrics: snapshot.derived_metrics,
|
|
assets: snapshot.assets,
|
|
});
|
|
} catch (error) {
|
|
const message =
|
|
error instanceof Error
|
|
? error.message
|
|
: "PDF metric validation failed";
|
|
pdfValidation = {
|
|
validation_result: {
|
|
status: "error",
|
|
checks: [],
|
|
validatedAt: new Date().toISOString(),
|
|
},
|
|
metric_validations: snapshot.metric_validations.map((check) => ({
|
|
...check,
|
|
error: check.error ?? message,
|
|
})),
|
|
};
|
|
}
|
|
|
|
const normalizedSnapshot = {
|
|
...snapshot,
|
|
validation_result: pdfValidation.validation_result,
|
|
metric_validations: pdfValidation.metric_validations,
|
|
issuer_overlay_revision_id: input.overlayRevisionId,
|
|
...normalizeFilingTaxonomySnapshotPayload(snapshot),
|
|
};
|
|
|
|
await setProjectionStage(
|
|
input.task,
|
|
"sync.normalize_taxonomy",
|
|
`Materializing statements for ${filing.accession_number}`,
|
|
stageContext("sync.normalize_taxonomy"),
|
|
);
|
|
await setProjectionStage(
|
|
input.task,
|
|
"sync.derive_metrics",
|
|
`Deriving taxonomy metrics for ${filing.accession_number}`,
|
|
stageContext("sync.derive_metrics"),
|
|
);
|
|
await setProjectionStage(
|
|
input.task,
|
|
"sync.validate_pdf_metrics",
|
|
`Validating metrics via PDF + LLM for ${filing.accession_number}`,
|
|
stageContext("sync.validate_pdf_metrics"),
|
|
);
|
|
await setProjectionStage(
|
|
input.task,
|
|
"sync.persist_taxonomy",
|
|
`Persisting taxonomy snapshot for ${filing.accession_number}`,
|
|
stageContext("sync.persist_taxonomy"),
|
|
);
|
|
|
|
await upsertFilingTaxonomySnapshot(normalizedSnapshot);
|
|
await updateFilingMetricsById(
|
|
filing.id,
|
|
normalizedSnapshot.derived_metrics,
|
|
);
|
|
await deleteCompanyFinancialBundlesForTicker(filing.ticker);
|
|
hydrated += 1;
|
|
} catch (error) {
|
|
const now = new Date().toISOString();
|
|
await upsertFilingTaxonomySnapshot({
|
|
filing_id: filing.id,
|
|
ticker: filing.ticker,
|
|
filing_date: filing.filing_date,
|
|
filing_type: filing.filing_type,
|
|
parse_status: "failed",
|
|
parse_error:
|
|
error instanceof Error ? error.message : "Taxonomy hydration failed",
|
|
source: "legacy_html_fallback",
|
|
parser_engine: "fiscal-xbrl",
|
|
parser_version: "unknown",
|
|
taxonomy_regime: "unknown",
|
|
fiscal_pack: "core",
|
|
periods: [],
|
|
faithful_rows: {
|
|
income: [],
|
|
balance: [],
|
|
cash_flow: [],
|
|
equity: [],
|
|
comprehensive_income: [],
|
|
disclosure: [],
|
|
},
|
|
statement_rows: {
|
|
income: [],
|
|
balance: [],
|
|
cash_flow: [],
|
|
equity: [],
|
|
comprehensive_income: [],
|
|
disclosure: [],
|
|
},
|
|
surface_rows: {
|
|
income: [],
|
|
balance: [],
|
|
cash_flow: [],
|
|
equity: [],
|
|
comprehensive_income: [],
|
|
disclosure: [],
|
|
},
|
|
detail_rows: {
|
|
income: {},
|
|
balance: {},
|
|
cash_flow: {},
|
|
equity: {},
|
|
comprehensive_income: {},
|
|
disclosure: {},
|
|
},
|
|
kpi_rows: [],
|
|
computed_definitions: [],
|
|
contexts: [],
|
|
derived_metrics: filing.metrics ?? null,
|
|
validation_result: {
|
|
status: "error",
|
|
checks: [],
|
|
validatedAt: now,
|
|
},
|
|
normalization_summary: {
|
|
surfaceRowCount: 0,
|
|
detailRowCount: 0,
|
|
kpiRowCount: 0,
|
|
unmappedRowCount: 0,
|
|
materialUnmappedRowCount: 0,
|
|
residualPrimaryCount: 0,
|
|
residualDisclosureCount: 0,
|
|
unsupportedConceptCount: 0,
|
|
issuerOverlayMatchCount: 0,
|
|
warnings: [],
|
|
},
|
|
issuer_overlay_revision_id: input.overlayRevisionId,
|
|
facts_count: 0,
|
|
concepts_count: 0,
|
|
dimensions_count: 0,
|
|
assets: [],
|
|
concepts: [],
|
|
facts: [],
|
|
metric_validations: [],
|
|
});
|
|
await deleteCompanyFinancialBundlesForTicker(filing.ticker);
|
|
failed += 1;
|
|
}
|
|
|
|
await Bun.sleep(STATEMENT_HYDRATION_DELAY_MS);
|
|
}
|
|
|
|
return {
|
|
hydrated,
|
|
failed,
|
|
touchedFilingIds,
|
|
} satisfies TaxonomyHydrationRunResult;
|
|
}
|
|
|
|
async function setProjectionStage(
|
|
task: Task,
|
|
stage: TaskStage,
|
|
detail: string | null = null,
|
|
context: TaskStageContext | null = null,
|
|
) {
|
|
await updateTaskStage(task.id, stage, detail, context);
|
|
}
|
|
|
|
function buildProgressContext(input: {
|
|
current: number;
|
|
total: number;
|
|
unit: string;
|
|
counters?: Record<string, number>;
|
|
subject?: TaskStageContext["subject"];
|
|
}): TaskStageContext {
|
|
return {
|
|
progress: {
|
|
current: input.current,
|
|
total: input.total,
|
|
unit: input.unit,
|
|
},
|
|
counters: input.counters,
|
|
subject: input.subject,
|
|
};
|
|
}
|
|
|
|
function parseTicker(raw: unknown) {
|
|
if (typeof raw !== "string" || raw.trim().length < 1) {
|
|
throw new Error("Ticker is required");
|
|
}
|
|
|
|
return raw.trim().toUpperCase();
|
|
}
|
|
|
|
function parseLimit(raw: unknown, fallback: number, min: number, max: number) {
|
|
const numberValue = typeof raw === "number" ? raw : Number(raw);
|
|
|
|
if (!Number.isFinite(numberValue)) {
|
|
return fallback;
|
|
}
|
|
|
|
const intValue = Math.trunc(numberValue);
|
|
return Math.min(Math.max(intValue, min), max);
|
|
}
|
|
|
|
function parseOptionalText(raw: unknown) {
|
|
if (typeof raw !== "string") {
|
|
return null;
|
|
}
|
|
|
|
const normalized = raw.trim();
|
|
return normalized.length > 0 ? normalized : null;
|
|
}
|
|
|
|
function parseOptionalStringArray(raw: unknown) {
|
|
if (!Array.isArray(raw)) {
|
|
return [];
|
|
}
|
|
|
|
return raw
|
|
.filter((entry): entry is string => typeof entry === "string")
|
|
.map((entry) => entry.trim())
|
|
.filter((entry) => entry.length > 0);
|
|
}
|
|
|
|
function parseTags(raw: unknown) {
|
|
if (!Array.isArray(raw)) {
|
|
return [];
|
|
}
|
|
|
|
const unique = new Set<string>();
|
|
for (const entry of raw) {
|
|
if (typeof entry !== "string") {
|
|
continue;
|
|
}
|
|
|
|
const tag = entry.trim();
|
|
if (!tag) {
|
|
continue;
|
|
}
|
|
|
|
unique.add(tag);
|
|
}
|
|
|
|
return [...unique];
|
|
}
|
|
|
|
function sanitizeExtractionText(value: unknown, maxLength: number) {
|
|
if (typeof value !== "string") {
|
|
return null;
|
|
}
|
|
|
|
const collapsed = value.replace(/\s+/g, " ").trim();
|
|
if (!collapsed) {
|
|
return null;
|
|
}
|
|
|
|
return collapsed.slice(0, maxLength);
|
|
}
|
|
|
|
function sanitizeExtractionList(value: unknown) {
|
|
if (!Array.isArray(value)) {
|
|
return null;
|
|
}
|
|
|
|
const cleaned: string[] = [];
|
|
|
|
for (const entry of value) {
|
|
const normalized = sanitizeExtractionText(
|
|
entry,
|
|
EXTRACTION_ITEM_MAX_LENGTH,
|
|
);
|
|
if (!normalized) {
|
|
continue;
|
|
}
|
|
|
|
cleaned.push(normalized);
|
|
if (cleaned.length >= EXTRACTION_MAX_ITEMS) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
return cleaned;
|
|
}
|
|
|
|
function uniqueExtractionList(items: Array<string | null | undefined>) {
|
|
const seen = new Set<string>();
|
|
const unique: string[] = [];
|
|
|
|
for (const item of items) {
|
|
const normalized = sanitizeExtractionText(item, EXTRACTION_ITEM_MAX_LENGTH);
|
|
if (!normalized) {
|
|
continue;
|
|
}
|
|
|
|
const signature = normalized.toLowerCase();
|
|
if (seen.has(signature)) {
|
|
continue;
|
|
}
|
|
|
|
seen.add(signature);
|
|
unique.push(normalized);
|
|
|
|
if (unique.length >= EXTRACTION_MAX_ITEMS) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
return unique;
|
|
}
|
|
|
|
function collectTextSignals(filingText: string, patterns: RegExp[]) {
|
|
const lines = filingText
|
|
.replace(/\r/g, "\n")
|
|
.split(/\n+/)
|
|
.map((line) => line.replace(/\s+/g, " ").trim())
|
|
.filter((line) => line.length >= 24);
|
|
|
|
const matches: string[] = [];
|
|
|
|
for (const line of lines) {
|
|
if (!patterns.some((pattern) => pattern.test(line))) {
|
|
continue;
|
|
}
|
|
|
|
matches.push(line);
|
|
if (matches.length >= EXTRACTION_MAX_ITEMS * 2) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
return uniqueExtractionList(matches);
|
|
}
|
|
|
|
function parseExtractionPayload(raw: string): FilingExtraction | null {
|
|
const fencedJson = raw.match(/```(?:json)?\s*([\s\S]*?)```/i)?.[1];
|
|
const candidate =
|
|
fencedJson ??
|
|
(() => {
|
|
const start = raw.indexOf("{");
|
|
const end = raw.lastIndexOf("}");
|
|
return start >= 0 && end > start ? raw.slice(start, end + 1) : null;
|
|
})();
|
|
|
|
if (!candidate) {
|
|
return null;
|
|
}
|
|
|
|
let parsed: unknown;
|
|
try {
|
|
parsed = JSON.parse(candidate);
|
|
} catch {
|
|
return null;
|
|
}
|
|
|
|
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
|
|
return null;
|
|
}
|
|
|
|
const payload = parsed as Record<string, unknown>;
|
|
const keys = Object.keys(payload);
|
|
if (keys.length !== EXTRACTION_REQUIRED_KEYS.length) {
|
|
return null;
|
|
}
|
|
|
|
for (const key of EXTRACTION_REQUIRED_KEYS) {
|
|
if (!(key in payload)) {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
for (const key of keys) {
|
|
if (
|
|
!EXTRACTION_REQUIRED_KEYS.includes(
|
|
key as (typeof EXTRACTION_REQUIRED_KEYS)[number],
|
|
)
|
|
) {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
const summary = sanitizeExtractionText(
|
|
payload.summary,
|
|
EXTRACTION_SUMMARY_MAX_LENGTH,
|
|
);
|
|
const keyPoints = sanitizeExtractionList(payload.keyPoints);
|
|
const redFlags = sanitizeExtractionList(payload.redFlags);
|
|
const followUpQuestions = sanitizeExtractionList(payload.followUpQuestions);
|
|
const portfolioSignals = sanitizeExtractionList(payload.portfolioSignals);
|
|
const segmentSpecificData = sanitizeExtractionList(
|
|
payload.segmentSpecificData,
|
|
);
|
|
const geographicRevenueBreakdown = sanitizeExtractionList(
|
|
payload.geographicRevenueBreakdown,
|
|
);
|
|
const companySpecificData = sanitizeExtractionList(
|
|
payload.companySpecificData,
|
|
);
|
|
const secApiCrossChecks = sanitizeExtractionList(payload.secApiCrossChecks);
|
|
const confidenceRaw =
|
|
typeof payload.confidence === "number"
|
|
? payload.confidence
|
|
: Number(payload.confidence);
|
|
|
|
if (
|
|
!summary ||
|
|
!keyPoints ||
|
|
!redFlags ||
|
|
!followUpQuestions ||
|
|
!portfolioSignals ||
|
|
!segmentSpecificData ||
|
|
!geographicRevenueBreakdown ||
|
|
!companySpecificData ||
|
|
!secApiCrossChecks ||
|
|
!Number.isFinite(confidenceRaw)
|
|
) {
|
|
return null;
|
|
}
|
|
|
|
return {
|
|
summary,
|
|
keyPoints,
|
|
redFlags,
|
|
followUpQuestions,
|
|
portfolioSignals,
|
|
segmentSpecificData,
|
|
geographicRevenueBreakdown,
|
|
companySpecificData,
|
|
secApiCrossChecks,
|
|
confidence: Math.min(Math.max(confidenceRaw, 0), 1),
|
|
};
|
|
}
|
|
|
|
function metricSnapshotLine(label: string, value: number | null | undefined) {
|
|
if (value === null || value === undefined || !Number.isFinite(value)) {
|
|
return `${label}: not reported`;
|
|
}
|
|
|
|
return `${label}: ${Math.round(value).toLocaleString("en-US")}`;
|
|
}
|
|
|
|
function buildSecApiCrossChecks(filing: Filing, filingText: string) {
|
|
const normalizedText = filingText.toLowerCase();
|
|
const checks: string[] = [];
|
|
|
|
for (const descriptor of METRIC_CHECK_PATTERNS) {
|
|
const value = filing.metrics?.[descriptor.key];
|
|
if (value === null || value === undefined || !Number.isFinite(value)) {
|
|
checks.push(
|
|
`${descriptor.label}: SEC API metric unavailable for this filing.`,
|
|
);
|
|
continue;
|
|
}
|
|
|
|
const hasMention = descriptor.patterns.some((pattern) =>
|
|
pattern.test(normalizedText),
|
|
);
|
|
if (hasMention) {
|
|
checks.push(
|
|
`${descriptor.label}: SEC API value ${Math.round(value).toLocaleString("en-US")} appears referenced in filing narrative.`,
|
|
);
|
|
} else {
|
|
checks.push(
|
|
`${descriptor.label}: SEC API value ${Math.round(value).toLocaleString("en-US")} was not confidently located in sampled filing text.`,
|
|
);
|
|
}
|
|
}
|
|
|
|
return uniqueExtractionList(checks);
|
|
}
|
|
|
|
function deterministicExtractionFallback(filing: Filing): FilingExtraction {
|
|
const metrics = filing.metrics;
|
|
|
|
return {
|
|
summary: `${filing.company_name} ${filing.filing_type} filed on ${filing.filing_date}. Deterministic extraction fallback was used because filing text parsing was unavailable or invalid.`,
|
|
keyPoints: [
|
|
`${filing.filing_type} filing recorded for ${filing.ticker}.`,
|
|
metricSnapshotLine("Revenue", metrics?.revenue),
|
|
metricSnapshotLine("Net income", metrics?.netIncome),
|
|
metricSnapshotLine("Total assets", metrics?.totalAssets),
|
|
],
|
|
redFlags: [
|
|
metricSnapshotLine("Cash", metrics?.cash),
|
|
metricSnapshotLine("Debt", metrics?.debt),
|
|
filing.primary_document
|
|
? "Primary document is indexed and available for review."
|
|
: "Primary document reference is unavailable in current filing metadata.",
|
|
],
|
|
followUpQuestions: [
|
|
"What changed versus the prior filing in guidance, margins, or liquidity?",
|
|
"Are any material risks under-emphasized relative to historical filings?",
|
|
"Should portfolio exposure be adjusted before the next reporting cycle?",
|
|
],
|
|
portfolioSignals: [
|
|
"Validate trend direction using at least two prior filings.",
|
|
"Cross-check leverage and liquidity metrics against position sizing rules.",
|
|
"Track language shifts around guidance or demand assumptions.",
|
|
],
|
|
segmentSpecificData: [
|
|
"Segment-level disclosures were not parsed in deterministic fallback mode.",
|
|
],
|
|
geographicRevenueBreakdown: [
|
|
"Geographic revenue disclosures were not parsed in deterministic fallback mode.",
|
|
],
|
|
companySpecificData: [
|
|
"Company-specific operating KPIs (for example same-store sales) were not parsed in deterministic fallback mode.",
|
|
],
|
|
secApiCrossChecks: [
|
|
`${metricSnapshotLine("Revenue", metrics?.revenue)} (SEC API baseline; text verification unavailable).`,
|
|
`${metricSnapshotLine("Net income", metrics?.netIncome)} (SEC API baseline; text verification unavailable).`,
|
|
],
|
|
confidence: 0.2,
|
|
};
|
|
}
|
|
|
|
function buildRuleBasedExtraction(
|
|
filing: Filing,
|
|
filingText: string,
|
|
): FilingExtraction {
|
|
const baseline = deterministicExtractionFallback(filing);
|
|
const segmentSpecificData = collectTextSignals(filingText, SEGMENT_PATTERNS);
|
|
const geographicRevenueBreakdown = collectTextSignals(
|
|
filingText,
|
|
GEOGRAPHIC_PATTERNS,
|
|
);
|
|
const companySpecificData = collectTextSignals(
|
|
filingText,
|
|
COMPANY_SPECIFIC_PATTERNS,
|
|
);
|
|
const secApiCrossChecks = buildSecApiCrossChecks(filing, filingText);
|
|
|
|
const segmentLead = segmentSpecificData[0]
|
|
? `Segment detail: ${segmentSpecificData[0]}`
|
|
: null;
|
|
const geographicLead = geographicRevenueBreakdown[0]
|
|
? `Geographic detail: ${geographicRevenueBreakdown[0]}`
|
|
: null;
|
|
const companyLead = companySpecificData[0]
|
|
? `Company-specific KPI: ${companySpecificData[0]}`
|
|
: null;
|
|
|
|
return {
|
|
summary: `${filing.company_name} ${filing.filing_type} filed on ${filing.filing_date}. SEC API metrics were retained as the baseline and filing text was scanned for segment and company-specific disclosures.`,
|
|
keyPoints: uniqueExtractionList([
|
|
...baseline.keyPoints,
|
|
segmentLead,
|
|
geographicLead,
|
|
companyLead,
|
|
]),
|
|
redFlags: uniqueExtractionList([
|
|
...baseline.redFlags,
|
|
secApiCrossChecks.find((line) => /not confidently located/i.test(line)),
|
|
]),
|
|
followUpQuestions: uniqueExtractionList([
|
|
...baseline.followUpQuestions,
|
|
segmentSpecificData.length > 0
|
|
? "How do segment trends change the consolidated margin outlook?"
|
|
: "Does management provide segment-level KPIs in supplemental exhibits?",
|
|
]),
|
|
portfolioSignals: uniqueExtractionList([
|
|
...baseline.portfolioSignals,
|
|
companySpecificData.length > 0
|
|
? "Incorporate company-specific KPI direction into near-term position sizing."
|
|
: "Track future filings for explicit operating KPI disclosures.",
|
|
]),
|
|
segmentSpecificData:
|
|
segmentSpecificData.length > 0
|
|
? segmentSpecificData
|
|
: baseline.segmentSpecificData,
|
|
geographicRevenueBreakdown:
|
|
geographicRevenueBreakdown.length > 0
|
|
? geographicRevenueBreakdown
|
|
: baseline.geographicRevenueBreakdown,
|
|
companySpecificData:
|
|
companySpecificData.length > 0
|
|
? companySpecificData
|
|
: baseline.companySpecificData,
|
|
secApiCrossChecks:
|
|
secApiCrossChecks.length > 0
|
|
? secApiCrossChecks
|
|
: baseline.secApiCrossChecks,
|
|
confidence:
|
|
segmentSpecificData.length +
|
|
geographicRevenueBreakdown.length +
|
|
companySpecificData.length >
|
|
0
|
|
? 0.4
|
|
: 0.3,
|
|
};
|
|
}
|
|
|
|
function preferExtractionList(primary: string[], fallback: string[]) {
|
|
return primary.length > 0 ? primary : fallback;
|
|
}
|
|
|
|
function mergeExtractionWithFallback(
|
|
primary: FilingExtraction,
|
|
fallback: FilingExtraction,
|
|
): FilingExtraction {
|
|
return {
|
|
summary: primary.summary || fallback.summary,
|
|
keyPoints: preferExtractionList(primary.keyPoints, fallback.keyPoints),
|
|
redFlags: preferExtractionList(primary.redFlags, fallback.redFlags),
|
|
followUpQuestions: preferExtractionList(
|
|
primary.followUpQuestions,
|
|
fallback.followUpQuestions,
|
|
),
|
|
portfolioSignals: preferExtractionList(
|
|
primary.portfolioSignals,
|
|
fallback.portfolioSignals,
|
|
),
|
|
segmentSpecificData: preferExtractionList(
|
|
primary.segmentSpecificData,
|
|
fallback.segmentSpecificData,
|
|
),
|
|
geographicRevenueBreakdown: preferExtractionList(
|
|
primary.geographicRevenueBreakdown,
|
|
fallback.geographicRevenueBreakdown,
|
|
),
|
|
companySpecificData: preferExtractionList(
|
|
primary.companySpecificData,
|
|
fallback.companySpecificData,
|
|
),
|
|
secApiCrossChecks: preferExtractionList(
|
|
primary.secApiCrossChecks,
|
|
fallback.secApiCrossChecks,
|
|
),
|
|
confidence: Math.min(Math.max(primary.confidence, 0), 1),
|
|
};
|
|
}
|
|
|
|
function extractionPrompt(filing: Filing, filingText: string) {
|
|
return [
|
|
"Extract structured signals from the SEC filing text.",
|
|
`Company: ${filing.company_name} (${filing.ticker})`,
|
|
`Form: ${filing.filing_type}`,
|
|
`Filed: ${filing.filing_date}`,
|
|
`SEC API baseline metrics: ${JSON.stringify(filing.metrics ?? {})}`,
|
|
"Use SEC API metrics as canonical numeric values and validate whether each appears consistent with filing text context.",
|
|
"Prioritize company-specific and segment-specific disclosures not covered by SEC endpoint fields (for example same-store sales, geographic mix, segment margin).",
|
|
"Return ONLY valid JSON with exactly these keys and no extra keys:",
|
|
'{"summary":"string","keyPoints":["string"],"redFlags":["string"],"followUpQuestions":["string"],"portfolioSignals":["string"],"segmentSpecificData":["string"],"geographicRevenueBreakdown":["string"],"companySpecificData":["string"],"secApiCrossChecks":["string"],"confidence":0}',
|
|
`Rules: every array max ${EXTRACTION_MAX_ITEMS} items; each item <= ${EXTRACTION_ITEM_MAX_LENGTH} chars; summary <= ${EXTRACTION_SUMMARY_MAX_LENGTH} chars; confidence between 0 and 1.`,
|
|
"Filing text follows:",
|
|
filingText,
|
|
].join("\n\n");
|
|
}
|
|
|
|
function reportPrompt(
|
|
filing: Filing,
|
|
extraction: FilingExtraction,
|
|
extractionMeta: FilingExtractionMeta,
|
|
) {
|
|
return [
|
|
"You are a fiscal research assistant focused on regulatory signals.",
|
|
`Analyze this SEC filing from ${filing.company_name} (${filing.ticker}).`,
|
|
`Form: ${filing.filing_type}`,
|
|
`Filed: ${filing.filing_date}`,
|
|
`SEC API baseline metrics: ${JSON.stringify(filing.metrics ?? {})}`,
|
|
`Structured extraction context (${extractionMeta.source}): ${JSON.stringify(extraction)}`,
|
|
"Use SEC API values as the baseline financials and explicitly reference segment/company-specific details from extraction.",
|
|
"Return concise sections: Thesis, Red Flags, Follow-up Questions, Portfolio Impact.",
|
|
].join("\n");
|
|
}
|
|
|
|
function filingLinks(filing: {
|
|
filingUrl: string | null;
|
|
submissionUrl: string | null;
|
|
}) {
|
|
const links: Array<{ link_type: string; url: string }> = [];
|
|
|
|
if (filing.filingUrl) {
|
|
links.push({ link_type: "primary_document", url: filing.filingUrl });
|
|
}
|
|
|
|
if (filing.submissionUrl) {
|
|
links.push({ link_type: "submission_index", url: filing.submissionUrl });
|
|
}
|
|
|
|
return links;
|
|
}
|
|
|
|
async function processSyncFilings(task: Task) {
|
|
const ticker = parseTicker(task.payload.ticker);
|
|
const limit = parseLimit(task.payload.limit, 20, 1, 50);
|
|
const category = parseOptionalText(task.payload.category);
|
|
const tags = parseTags(task.payload.tags);
|
|
const scopeLabel = [
|
|
category,
|
|
tags.length > 0 ? `tags: ${tags.join(", ")}` : null,
|
|
]
|
|
.filter((entry): entry is string => Boolean(entry))
|
|
.join(" | ");
|
|
|
|
let searchTaskId: string | null = null;
|
|
const tickerSubject = { ticker };
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"sync.fetch_filings",
|
|
`Fetching up to ${limit} filings for ${ticker}${scopeLabel ? ` (${scopeLabel})` : ""}`,
|
|
{ subject: tickerSubject },
|
|
);
|
|
const filings = await fetchRecentFilings(ticker, limit);
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"sync.persist_filings",
|
|
`Persisting ${filings.length} filings and source links`,
|
|
{
|
|
counters: { fetched: filings.length },
|
|
subject: tickerSubject,
|
|
},
|
|
);
|
|
const saveResult = await upsertFilingsRecords(
|
|
filings.map((filing) => ({
|
|
ticker: filing.ticker,
|
|
filing_type: filing.filingType,
|
|
filing_date: filing.filingDate,
|
|
accession_number: filing.accessionNumber,
|
|
cik: filing.cik,
|
|
company_name: filing.companyName,
|
|
filing_url: filing.filingUrl,
|
|
submission_url: filing.submissionUrl,
|
|
primary_document: filing.primaryDocument,
|
|
metrics: null,
|
|
links: filingLinks(filing),
|
|
})),
|
|
);
|
|
|
|
let taxonomySnapshotsHydrated = 0;
|
|
let taxonomySnapshotsFailed = 0;
|
|
const hydrateCandidates = (
|
|
await listFilingsRecords({
|
|
ticker,
|
|
limit: Math.min(Math.max(limit * 3, 40), STATEMENT_HYDRATION_MAX_FILINGS),
|
|
})
|
|
).filter((filing): filing is Filing & { filing_type: "10-K" | "10-Q" } => {
|
|
return isFinancialMetricsForm(filing.filing_type);
|
|
});
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"sync.discover_assets",
|
|
`Discovering taxonomy assets for ${hydrateCandidates.length} candidate filings`,
|
|
buildProgressContext({
|
|
current: 0,
|
|
total: hydrateCandidates.length,
|
|
unit: "filings",
|
|
counters: {
|
|
fetched: filings.length,
|
|
inserted: saveResult.inserted,
|
|
updated: saveResult.updated,
|
|
hydrated: 0,
|
|
failed: 0,
|
|
},
|
|
subject: tickerSubject,
|
|
}),
|
|
);
|
|
const currentOverlay = await getIssuerOverlay(ticker);
|
|
const firstPass = await hydrateTaxonomySnapshotsForCandidates({
|
|
task,
|
|
ticker,
|
|
filingsCount: filings.length,
|
|
saveResult,
|
|
candidates: hydrateCandidates,
|
|
overlayRevisionId: currentOverlay?.active_revision_id ?? null,
|
|
});
|
|
taxonomySnapshotsHydrated += firstPass.hydrated;
|
|
taxonomySnapshotsFailed += firstPass.failed;
|
|
|
|
let overlayPublished = false;
|
|
let activeOverlayRevisionId = currentOverlay?.active_revision_id ?? null;
|
|
try {
|
|
await setProjectionStage(
|
|
task,
|
|
"sync.persist_taxonomy",
|
|
`Building issuer overlay for ${ticker}`,
|
|
{
|
|
counters: {
|
|
sampledFilings: Math.min(hydrateCandidates.length, 12),
|
|
hydrated: taxonomySnapshotsHydrated,
|
|
failed: taxonomySnapshotsFailed,
|
|
},
|
|
subject: tickerSubject,
|
|
},
|
|
);
|
|
const overlayResult = await generateIssuerOverlayForTicker(ticker);
|
|
overlayPublished = overlayResult.published;
|
|
activeOverlayRevisionId = overlayResult.activeRevisionId;
|
|
} catch (error) {
|
|
await recordIssuerOverlayBuildFailure(ticker, error);
|
|
console.error(`[issuer-overlay] failed for ${ticker}:`, error);
|
|
}
|
|
|
|
if (
|
|
overlayPublished &&
|
|
activeOverlayRevisionId !== null &&
|
|
activeOverlayRevisionId !== currentOverlay?.active_revision_id
|
|
) {
|
|
const prioritizedCandidates = [
|
|
...hydrateCandidates.filter((filing) =>
|
|
firstPass.touchedFilingIds.has(filing.id),
|
|
),
|
|
...hydrateCandidates.filter(
|
|
(filing) => !firstPass.touchedFilingIds.has(filing.id),
|
|
),
|
|
];
|
|
const uniqueCandidates = prioritizedCandidates.filter(
|
|
(filing, index, rows) => {
|
|
return (
|
|
rows.findIndex((candidate) => candidate.id === filing.id) === index
|
|
);
|
|
},
|
|
);
|
|
const rehydrateCandidates = uniqueCandidates.slice(
|
|
0,
|
|
Math.max(firstPass.touchedFilingIds.size, 8),
|
|
);
|
|
|
|
const secondPass = await hydrateTaxonomySnapshotsForCandidates({
|
|
task,
|
|
ticker,
|
|
filingsCount: filings.length,
|
|
saveResult,
|
|
candidates: rehydrateCandidates,
|
|
overlayRevisionId: activeOverlayRevisionId,
|
|
});
|
|
taxonomySnapshotsHydrated += secondPass.hydrated;
|
|
taxonomySnapshotsFailed += secondPass.failed;
|
|
}
|
|
|
|
try {
|
|
const searchTask = await enqueueTask({
|
|
userId: task.user_id,
|
|
taskType: "index_search",
|
|
payload: {
|
|
ticker,
|
|
sourceKinds: ["filing_document", "filing_brief"],
|
|
},
|
|
priority: 55,
|
|
resourceKey: `index_search:ticker:${ticker}`,
|
|
});
|
|
searchTaskId = searchTask.id;
|
|
} catch (error) {
|
|
console.error(`[search-index-sync] failed for ${ticker}:`, error);
|
|
}
|
|
|
|
const result = {
|
|
ticker,
|
|
category,
|
|
tags,
|
|
fetched: filings.length,
|
|
inserted: saveResult.inserted,
|
|
updated: saveResult.updated,
|
|
taxonomySnapshotsHydrated,
|
|
taxonomySnapshotsFailed,
|
|
overlayPublished,
|
|
activeOverlayRevisionId,
|
|
searchTaskId,
|
|
};
|
|
|
|
return buildTaskOutcome(
|
|
result,
|
|
`Synced ${filings.length} filings for ${ticker}, hydrated ${taxonomySnapshotsHydrated} taxonomy snapshots, failed ${taxonomySnapshotsFailed}.`,
|
|
buildProgressContext({
|
|
current: hydrateCandidates.length,
|
|
total: hydrateCandidates.length || 1,
|
|
unit: "filings",
|
|
counters: {
|
|
fetched: filings.length,
|
|
inserted: saveResult.inserted,
|
|
updated: saveResult.updated,
|
|
hydrated: taxonomySnapshotsHydrated,
|
|
failed: taxonomySnapshotsFailed,
|
|
},
|
|
subject: tickerSubject,
|
|
}),
|
|
);
|
|
}
|
|
|
|
async function processRefreshPrices(task: Task) {
|
|
const userId = task.user_id;
|
|
if (!userId) {
|
|
throw new Error("Task is missing user scope");
|
|
}
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"refresh.load_holdings",
|
|
"Loading holdings for price refresh",
|
|
);
|
|
const userHoldings = await listHoldingsForPriceRefresh(userId);
|
|
const tickers = [...new Set(userHoldings.map((entry) => entry.ticker))];
|
|
const quotes = new Map<string, number>();
|
|
const failedTickers: string[] = [];
|
|
const staleTickers: string[] = [];
|
|
const baseContext = {
|
|
counters: {
|
|
holdings: userHoldings.length,
|
|
},
|
|
} satisfies TaskStageContext;
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"refresh.load_holdings",
|
|
`Loaded ${userHoldings.length} holdings across ${tickers.length} tickers`,
|
|
baseContext,
|
|
);
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"refresh.fetch_quotes",
|
|
`Fetching quotes for ${tickers.length} tickers`,
|
|
buildProgressContext({
|
|
current: 0,
|
|
total: tickers.length,
|
|
unit: "tickers",
|
|
counters: {
|
|
holdings: userHoldings.length,
|
|
},
|
|
}),
|
|
);
|
|
for (let index = 0; index < tickers.length; index += 1) {
|
|
const ticker = tickers[index];
|
|
const quoteResult = await getQuote(ticker);
|
|
if (quoteResult.value !== null) {
|
|
quotes.set(ticker, quoteResult.value);
|
|
if (quoteResult.stale) {
|
|
staleTickers.push(ticker);
|
|
}
|
|
} else {
|
|
failedTickers.push(ticker);
|
|
}
|
|
await setProjectionStage(
|
|
task,
|
|
"refresh.fetch_quotes",
|
|
`Fetching quotes for ${tickers.length} tickers`,
|
|
buildProgressContext({
|
|
current: index + 1,
|
|
total: tickers.length,
|
|
unit: "tickers",
|
|
counters: {
|
|
holdings: userHoldings.length,
|
|
failed: failedTickers.length,
|
|
stale: staleTickers.length,
|
|
},
|
|
subject: { ticker },
|
|
}),
|
|
);
|
|
}
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"refresh.persist_prices",
|
|
`Writing refreshed prices for ${quotes.size} tickers across ${userHoldings.length} holdings`,
|
|
{
|
|
counters: {
|
|
holdings: userHoldings.length,
|
|
failed: failedTickers.length,
|
|
stale: staleTickers.length,
|
|
},
|
|
},
|
|
);
|
|
const updatedCount = await applyRefreshedPrices(
|
|
userId,
|
|
quotes,
|
|
new Date().toISOString(),
|
|
);
|
|
|
|
const result = {
|
|
updatedCount,
|
|
totalTickers: tickers.length,
|
|
failedTickers,
|
|
staleTickers,
|
|
};
|
|
|
|
const messageParts = [
|
|
`Refreshed prices for ${quotes.size}/${tickers.length} tickers`,
|
|
];
|
|
if (failedTickers.length > 0) {
|
|
messageParts.push(`(${failedTickers.length} unavailable)`);
|
|
}
|
|
if (staleTickers.length > 0) {
|
|
messageParts.push(`(${staleTickers.length} stale)`);
|
|
}
|
|
|
|
return buildTaskOutcome(
|
|
result,
|
|
`${messageParts.join(" ")} across ${userHoldings.length} holdings.`,
|
|
{
|
|
progress: {
|
|
current: tickers.length,
|
|
total: tickers.length || 1,
|
|
unit: "tickers",
|
|
},
|
|
counters: {
|
|
holdings: userHoldings.length,
|
|
updatedCount,
|
|
failed: failedTickers.length,
|
|
stale: staleTickers.length,
|
|
},
|
|
},
|
|
);
|
|
}
|
|
|
|
async function processAnalyzeFiling(task: Task) {
|
|
const accessionNumber =
|
|
typeof task.payload.accessionNumber === "string"
|
|
? task.payload.accessionNumber
|
|
: "";
|
|
|
|
if (!accessionNumber) {
|
|
throw new Error("accessionNumber is required");
|
|
}
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"analyze.load_filing",
|
|
`Loading filing ${accessionNumber}`,
|
|
{
|
|
subject: {
|
|
accessionNumber,
|
|
},
|
|
},
|
|
);
|
|
const filing = await getFilingByAccession(accessionNumber);
|
|
|
|
if (!filing) {
|
|
throw new Error(`Filing ${accessionNumber} not found`);
|
|
}
|
|
|
|
const analyzeSubject = {
|
|
ticker: filing.ticker,
|
|
accessionNumber,
|
|
label: filing.filing_type,
|
|
};
|
|
|
|
const defaultExtraction = deterministicExtractionFallback(filing);
|
|
let extraction = defaultExtraction;
|
|
let extractionMeta: FilingExtractionMeta = {
|
|
provider: "deterministic-fallback",
|
|
model: "metadata-fallback",
|
|
source: "metadata_fallback",
|
|
generatedAt: new Date().toISOString(),
|
|
};
|
|
let filingDocument: Awaited<
|
|
ReturnType<typeof fetchPrimaryFilingText>
|
|
> | null = null;
|
|
|
|
try {
|
|
await setProjectionStage(
|
|
task,
|
|
"analyze.fetch_document",
|
|
"Fetching primary filing document",
|
|
{
|
|
subject: analyzeSubject,
|
|
},
|
|
);
|
|
filingDocument = await fetchPrimaryFilingText({
|
|
filingUrl: filing.filing_url,
|
|
cik: filing.cik,
|
|
accessionNumber: filing.accession_number,
|
|
primaryDocument: filing.primary_document ?? null,
|
|
});
|
|
} catch {
|
|
filingDocument = null;
|
|
}
|
|
|
|
if (filingDocument?.text) {
|
|
await setProjectionStage(
|
|
task,
|
|
"analyze.extract",
|
|
"Generating extraction context from filing text",
|
|
{
|
|
subject: analyzeSubject,
|
|
},
|
|
);
|
|
const ruleBasedExtraction = buildRuleBasedExtraction(
|
|
filing,
|
|
filingDocument.text,
|
|
);
|
|
const extractionResult = await runAiAnalysis(
|
|
extractionPrompt(filing, filingDocument.text),
|
|
"Return strict JSON only.",
|
|
{ workload: "extraction" },
|
|
);
|
|
|
|
const parsed = parseExtractionPayload(extractionResult.text);
|
|
if (!parsed) {
|
|
throw new Error("Extraction output invalid JSON schema");
|
|
}
|
|
|
|
extraction = mergeExtractionWithFallback(parsed, ruleBasedExtraction);
|
|
extractionMeta = {
|
|
provider: "zhipu",
|
|
model: extractionResult.model,
|
|
source: filingDocument.source,
|
|
generatedAt: new Date().toISOString(),
|
|
};
|
|
}
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"analyze.generate_report",
|
|
"Generating final filing analysis report",
|
|
{
|
|
subject: analyzeSubject,
|
|
},
|
|
);
|
|
const analysis = await runAiAnalysis(
|
|
reportPrompt(filing, extraction, extractionMeta),
|
|
"Use concise institutional analyst language.",
|
|
{ workload: "report" },
|
|
);
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"analyze.persist_report",
|
|
"Persisting filing analysis output",
|
|
{
|
|
subject: analyzeSubject,
|
|
},
|
|
);
|
|
await saveFilingAnalysis(accessionNumber, {
|
|
provider: analysis.provider,
|
|
model: analysis.model,
|
|
text: analysis.text,
|
|
extraction,
|
|
extractionMeta,
|
|
});
|
|
|
|
let searchTaskId: string | null = null;
|
|
try {
|
|
const searchTask = await enqueueTask({
|
|
userId: task.user_id,
|
|
taskType: "index_search",
|
|
payload: {
|
|
accessionNumber,
|
|
sourceKinds: ["filing_brief"],
|
|
},
|
|
priority: 58,
|
|
resourceKey: `index_search:filing_brief:${accessionNumber}`,
|
|
});
|
|
searchTaskId = searchTask.id;
|
|
} catch (error) {
|
|
console.error(
|
|
`[search-index-analyze] failed for ${accessionNumber}:`,
|
|
error,
|
|
);
|
|
}
|
|
|
|
const result = {
|
|
ticker: filing.ticker,
|
|
accessionNumber,
|
|
filingType: filing.filing_type,
|
|
provider: analysis.provider,
|
|
model: analysis.model,
|
|
extractionProvider: extractionMeta.provider,
|
|
extractionModel: extractionMeta.model,
|
|
searchTaskId,
|
|
};
|
|
|
|
return buildTaskOutcome(
|
|
result,
|
|
`Analysis report generated for ${filing.ticker} ${filing.filing_type} ${accessionNumber}.`,
|
|
{
|
|
subject: analyzeSubject,
|
|
},
|
|
);
|
|
}
|
|
|
|
async function processIndexSearch(task: Task) {
|
|
await setProjectionStage(
|
|
task,
|
|
"search.collect_sources",
|
|
"Collecting source records for search indexing",
|
|
);
|
|
|
|
const ticker = parseOptionalText(task.payload.ticker);
|
|
const accessionNumber = parseOptionalText(task.payload.accessionNumber);
|
|
const journalEntryId =
|
|
task.payload.journalEntryId === undefined
|
|
? null
|
|
: Number(task.payload.journalEntryId);
|
|
const deleteSourceRefs = Array.isArray(task.payload.deleteSourceRefs)
|
|
? task.payload.deleteSourceRefs.filter(
|
|
(
|
|
entry,
|
|
): entry is {
|
|
sourceKind: string;
|
|
sourceRef: string;
|
|
scope: string;
|
|
userId?: string | null;
|
|
} => {
|
|
return Boolean(
|
|
entry &&
|
|
typeof entry === "object" &&
|
|
typeof (entry as { sourceKind?: unknown }).sourceKind ===
|
|
"string" &&
|
|
typeof (entry as { sourceRef?: unknown }).sourceRef === "string" &&
|
|
typeof (entry as { scope?: unknown }).scope === "string",
|
|
);
|
|
},
|
|
)
|
|
: [];
|
|
const sourceKinds = parseOptionalStringArray(task.payload.sourceKinds).filter(
|
|
(
|
|
sourceKind,
|
|
): sourceKind is "filing_document" | "filing_brief" | "research_note" => {
|
|
return (
|
|
sourceKind === "filing_document" ||
|
|
sourceKind === "filing_brief" ||
|
|
sourceKind === "research_note"
|
|
);
|
|
},
|
|
);
|
|
const validatedJournalEntryId =
|
|
typeof journalEntryId === "number" &&
|
|
Number.isInteger(journalEntryId) &&
|
|
journalEntryId > 0
|
|
? journalEntryId
|
|
: null;
|
|
|
|
const result = await indexSearchDocuments({
|
|
userId: task.user_id,
|
|
ticker,
|
|
accessionNumber,
|
|
journalEntryId: validatedJournalEntryId,
|
|
sourceKinds: sourceKinds.length > 0 ? sourceKinds : undefined,
|
|
deleteSourceRefs: deleteSourceRefs.map((entry) => ({
|
|
sourceKind: entry.sourceKind as
|
|
| "filing_document"
|
|
| "filing_brief"
|
|
| "research_note",
|
|
sourceRef: entry.sourceRef,
|
|
scope: entry.scope === "user" ? "user" : "global",
|
|
userId: typeof entry.userId === "string" ? entry.userId : null,
|
|
})),
|
|
onStage: async (stage, detail, context) => {
|
|
switch (stage) {
|
|
case "collect":
|
|
await setProjectionStage(
|
|
task,
|
|
"search.collect_sources",
|
|
detail,
|
|
context ?? {
|
|
subject: ticker
|
|
? { ticker }
|
|
: accessionNumber
|
|
? { accessionNumber }
|
|
: null,
|
|
},
|
|
);
|
|
break;
|
|
case "fetch":
|
|
await setProjectionStage(
|
|
task,
|
|
"search.fetch_documents",
|
|
detail,
|
|
context ?? null,
|
|
);
|
|
break;
|
|
case "chunk":
|
|
await setProjectionStage(
|
|
task,
|
|
"search.chunk",
|
|
detail,
|
|
context ?? null,
|
|
);
|
|
break;
|
|
case "embed":
|
|
await setProjectionStage(
|
|
task,
|
|
"search.embed",
|
|
detail,
|
|
context ?? null,
|
|
);
|
|
break;
|
|
case "persist":
|
|
await setProjectionStage(
|
|
task,
|
|
"search.persist",
|
|
detail,
|
|
context ?? null,
|
|
);
|
|
break;
|
|
}
|
|
},
|
|
});
|
|
|
|
const taskResult = {
|
|
ticker,
|
|
accessionNumber,
|
|
journalEntryId: validatedJournalEntryId,
|
|
...result,
|
|
};
|
|
|
|
return buildTaskOutcome(
|
|
taskResult,
|
|
`Indexed ${result.indexed} sources, embedded ${result.chunksEmbedded} chunks, skipped ${result.skipped}, deleted ${result.deleted} stale documents.`,
|
|
{
|
|
progress: {
|
|
current: result.sourcesCollected,
|
|
total: result.sourcesCollected || 1,
|
|
unit: "sources",
|
|
},
|
|
counters: {
|
|
sourcesCollected: result.sourcesCollected,
|
|
indexed: result.indexed,
|
|
chunksEmbedded: result.chunksEmbedded,
|
|
skipped: result.skipped,
|
|
deleted: result.deleted,
|
|
},
|
|
subject: ticker
|
|
? { ticker }
|
|
: accessionNumber
|
|
? { accessionNumber }
|
|
: null,
|
|
},
|
|
);
|
|
}
|
|
|
|
function holdingDigest(holdings: Holding[]) {
|
|
return holdings.map((holding) => ({
|
|
ticker: holding.ticker,
|
|
shares: holding.shares,
|
|
avgCost: holding.avg_cost,
|
|
currentPrice: holding.current_price,
|
|
marketValue: holding.market_value,
|
|
gainLoss: holding.gain_loss,
|
|
gainLossPct: holding.gain_loss_pct,
|
|
}));
|
|
}
|
|
|
|
async function processPortfolioInsights(task: Task) {
|
|
const userId = task.user_id;
|
|
if (!userId) {
|
|
throw new Error("Task is missing user scope");
|
|
}
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"insights.load_holdings",
|
|
"Loading holdings for portfolio insight generation",
|
|
);
|
|
const userHoldings = await listUserHoldings(userId);
|
|
const summary = buildPortfolioSummary(userHoldings);
|
|
const holdingsContext = {
|
|
counters: {
|
|
holdings: userHoldings.length,
|
|
},
|
|
} satisfies TaskStageContext;
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"insights.load_holdings",
|
|
`Loaded ${userHoldings.length} holdings for portfolio insight generation`,
|
|
holdingsContext,
|
|
);
|
|
|
|
const prompt = [
|
|
"Generate portfolio intelligence with actionable recommendations.",
|
|
`Portfolio summary: ${JSON.stringify(summary)}`,
|
|
`Holdings: ${JSON.stringify(holdingDigest(userHoldings))}`,
|
|
"Respond with: 1) health score (0-100), 2) top 3 risks, 3) top 3 opportunities, 4) next actions in 7 days.",
|
|
].join("\n");
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"insights.generate",
|
|
"Generating portfolio AI insight",
|
|
holdingsContext,
|
|
);
|
|
const analysis = await runAiAnalysis(
|
|
prompt,
|
|
"Act as a risk-aware buy-side analyst.",
|
|
{ workload: "report" },
|
|
);
|
|
|
|
await setProjectionStage(
|
|
task,
|
|
"insights.persist",
|
|
"Persisting generated portfolio insight",
|
|
holdingsContext,
|
|
);
|
|
await createPortfolioInsight({
|
|
userId,
|
|
provider: analysis.provider,
|
|
model: analysis.model,
|
|
content: analysis.text,
|
|
});
|
|
|
|
const result = {
|
|
provider: analysis.provider,
|
|
model: analysis.model,
|
|
summary,
|
|
};
|
|
|
|
return buildTaskOutcome(
|
|
result,
|
|
`Generated portfolio insight for ${summary.positions} holdings.`,
|
|
{
|
|
counters: {
|
|
holdings: summary.positions,
|
|
},
|
|
},
|
|
);
|
|
}
|
|
|
|
export const __taskProcessorInternals = {
|
|
parseExtractionPayload,
|
|
deterministicExtractionFallback,
|
|
isFinancialMetricsForm,
|
|
};
|
|
|
|
export async function runTaskProcessor(task: Task) {
|
|
switch (task.task_type) {
|
|
case "sync_filings":
|
|
return await processSyncFilings(task);
|
|
case "refresh_prices":
|
|
return await processRefreshPrices(task);
|
|
case "analyze_filing":
|
|
return await processAnalyzeFiling(task);
|
|
case "portfolio_insights":
|
|
return await processPortfolioInsights(task);
|
|
case "index_search":
|
|
return await processIndexSearch(task);
|
|
default:
|
|
throw new Error(`Unsupported task type: ${task.task_type}`);
|
|
}
|
|
}
|