Merge branch 't3code/improve-job-status-notification-details'

This commit is contained in:
2026-03-09 18:54:02 -04:00
22 changed files with 2243 additions and 302 deletions

View File

@@ -89,7 +89,10 @@ function applySqlMigrations(client: { exec: (query: string) => void }) {
'0003_task_stage_event_timeline.sql',
'0004_watchlist_company_taxonomy.sql',
'0005_financial_taxonomy_v3.sql',
'0006_coverage_journal_tracking.sql'
'0006_coverage_journal_tracking.sql',
'0007_company_financial_bundles.sql',
'0008_research_workspace.sql',
'0009_task_notification_context.sql'
];
for (const file of migrationFiles) {
@@ -592,6 +595,159 @@ if (process.env.RUN_TASK_WORKFLOW_E2E === '1') {
expect(resetTask.notification_silenced_at).toBeNull();
});
it('returns enriched stage context and notification payloads for tasks and timelines', async () => {
if (!sqliteClient) {
throw new Error('sqlite client not initialized');
}
const created = await jsonRequest('POST', '/api/filings/0000000000-26-000010/analyze');
const taskId = (created.json as { task: { id: string } }).task.id;
const now = new Date().toISOString();
const stageContext = JSON.stringify({
progress: {
current: 2,
total: 5,
unit: 'steps'
},
subject: {
accessionNumber: '0000000000-26-000010'
}
});
sqliteClient.query(`
UPDATE task_run
SET status = ?, stage = ?, stage_detail = ?, stage_context = ?, workflow_run_id = NULL, updated_at = ?
WHERE id = ?;
`).run(
'running',
'analyze.extract',
'Generating extraction context from filing text',
stageContext,
now,
taskId
);
sqliteClient.query(`
INSERT INTO task_stage_event (task_id, user_id, stage, stage_detail, stage_context, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?);
`).run(
taskId,
TEST_USER_ID,
'analyze.extract',
'Generating extraction context from filing text',
stageContext,
'running',
now
);
const tasksResponse = await jsonRequest('GET', '/api/tasks?limit=5');
expect(tasksResponse.response.status).toBe(200);
const apiTask = (tasksResponse.json as {
tasks: Array<{
id: string;
stage_context: { progress?: { current: number } | null } | null;
notification: { title: string; actions: Array<{ id: string }> };
}>;
}).tasks.find((entry) => entry.id === taskId);
expect(apiTask?.stage_context?.progress?.current).toBe(2);
expect(apiTask?.notification.title).toBe('Filing analysis');
expect(apiTask?.notification.actions.some((action) => action.id === 'open_filings')).toBe(true);
const timeline = await jsonRequest('GET', `/api/tasks/${taskId}/timeline`);
expect(timeline.response.status).toBe(200);
const event = (timeline.json as {
events: Array<{
stage: string;
stage_context: { progress?: { total: number } | null } | null;
}>;
}).events.find((entry) => entry.stage === 'analyze.extract');
expect(event?.stage_context?.progress?.total).toBe(5);
});
it('returns task-specific notification actions for completed and failed analyze tasks', async () => {
if (!sqliteClient) {
throw new Error('sqlite client not initialized');
}
const completedCreate = await jsonRequest('POST', '/api/filings/0000000000-26-000020/analyze');
const completedTaskId = (completedCreate.json as { task: { id: string } }).task.id;
sqliteClient.query(`
UPDATE task_run
SET status = ?, stage = ?, stage_detail = ?, stage_context = ?, result = ?, workflow_run_id = NULL, updated_at = ?, finished_at = ?
WHERE id = ?;
`).run(
'completed',
'completed',
'Analysis report generated for AAPL 10-Q 0000000000-26-000020.',
JSON.stringify({
subject: {
ticker: 'AAPL',
accessionNumber: '0000000000-26-000020',
label: '10-Q'
}
}),
JSON.stringify({
ticker: 'AAPL',
accessionNumber: '0000000000-26-000020',
filingType: '10-Q',
provider: 'test',
model: 'fixture',
extractionProvider: 'test',
extractionModel: 'fixture',
searchTaskId: null
}),
'2026-03-09T15:00:00.000Z',
'2026-03-09T15:00:00.000Z',
completedTaskId
);
const completed = await jsonRequest('GET', `/api/tasks/${completedTaskId}`);
expect(completed.response.status).toBe(200);
const completedActions = (completed.json as {
task: {
notification: { actions: Array<{ id: string; href: string | null }> };
};
}).task.notification.actions;
expect(completedActions[0]?.id).toBe('open_analysis_report');
expect(completedActions[0]?.href).toContain('/analysis/reports/AAPL/0000000000-26-000020');
const failedCreate = await jsonRequest('POST', '/api/filings/0000000000-26-000021/analyze');
const failedTaskId = (failedCreate.json as { task: { id: string } }).task.id;
sqliteClient.query(`
UPDATE task_run
SET status = ?, stage = ?, stage_detail = ?, stage_context = ?, error = ?, workflow_run_id = NULL, updated_at = ?, finished_at = ?
WHERE id = ?;
`).run(
'failed',
'failed',
'Primary filing document fetch failed.',
JSON.stringify({
subject: {
ticker: 'AAPL',
accessionNumber: '0000000000-26-000021'
}
}),
'Primary filing document fetch failed.',
'2026-03-09T15:01:00.000Z',
'2026-03-09T15:01:00.000Z',
failedTaskId
);
const failed = await jsonRequest('GET', `/api/tasks/${failedTaskId}`);
expect(failed.response.status).toBe(200);
const failedTask = (failed.json as {
task: {
notification: {
detailLine: string | null;
actions: Array<{ id: string; href: string | null }>;
};
};
}).task;
expect(failedTask.notification.detailLine).toBe('Primary filing document fetch failed.');
expect(failedTask.notification.actions.some((action) => action.id === 'open_filings')).toBe(true);
});
it('reconciles workflow run status into projection state and degrades health when workflow backend is down', async () => {
const created = await jsonRequest('POST', '/api/filings/0000000000-26-000100/analyze');
const task = (created.json as {

View File

@@ -18,6 +18,7 @@ describe('sqlite schema compatibility bootstrap', () => {
applyMigration(client, '0001_glossy_statement_snapshots.sql');
applyMigration(client, '0002_workflow_task_projection_metadata.sql');
applyMigration(client, '0003_task_stage_event_timeline.sql');
applyMigration(client, '0009_task_notification_context.sql');
expect(__dbInternals.hasColumn(client, 'watchlist_item', 'category')).toBe(false);
expect(__dbInternals.hasColumn(client, 'watchlist_item', 'status')).toBe(false);
@@ -38,6 +39,8 @@ describe('sqlite schema compatibility bootstrap', () => {
expect(__dbInternals.hasColumn(client, 'holding', 'company_name')).toBe(true);
expect(__dbInternals.hasTable(client, 'filing_taxonomy_snapshot')).toBe(true);
expect(__dbInternals.hasTable(client, 'filing_taxonomy_fact')).toBe(true);
expect(__dbInternals.hasColumn(client, 'task_run', 'stage_context')).toBe(true);
expect(__dbInternals.hasColumn(client, 'task_stage_event', 'stage_context')).toBe(true);
expect(__dbInternals.hasTable(client, 'research_journal_entry')).toBe(true);
expect(__dbInternals.hasTable(client, 'search_document')).toBe(true);
expect(__dbInternals.hasTable(client, 'search_chunk')).toBe(true);

View File

@@ -396,6 +396,7 @@ function ensureLocalSqliteSchema(client: Database) {
const missingTaskColumns: Array<{ name: string; sql: string }> = [
{ name: 'stage', sql: "ALTER TABLE `task_run` ADD `stage` text NOT NULL DEFAULT 'queued';" },
{ name: 'stage_detail', sql: 'ALTER TABLE `task_run` ADD `stage_detail` text;' },
{ name: 'stage_context', sql: 'ALTER TABLE `task_run` ADD `stage_context` text;' },
{ name: 'resource_key', sql: 'ALTER TABLE `task_run` ADD `resource_key` text;' },
{ name: 'notification_read_at', sql: 'ALTER TABLE `task_run` ADD `notification_read_at` text;' },
{ name: 'notification_silenced_at', sql: 'ALTER TABLE `task_run` ADD `notification_silenced_at` text;' }
@@ -412,6 +413,12 @@ function ensureLocalSqliteSchema(client: Database) {
applySqlFile(client, '0003_task_stage_event_timeline.sql');
}
if (hasTable(client, 'task_stage_event') && !hasColumn(client, 'task_stage_event', 'stage_context')) {
client.exec('ALTER TABLE `task_stage_event` ADD `stage_context` text;');
}
client.exec('CREATE INDEX IF NOT EXISTS `task_user_updated_idx` ON `task_run` (`user_id`, `updated_at`);');
if (hasTable(client, 'watchlist_item')) {
const missingWatchlistColumns: Array<{ name: string; sql: string }> = [
{ name: 'category', sql: 'ALTER TABLE `watchlist_item` ADD `category` text;' },

View File

@@ -7,6 +7,7 @@ import {
text,
uniqueIndex
} from 'drizzle-orm/sqlite-core';
import type { TaskStageContext } from '@/lib/types';
type FilingMetrics = {
revenue: number | null;
@@ -520,6 +521,7 @@ export const taskRun = sqliteTable('task_run', {
status: text('status').$type<'queued' | 'running' | 'completed' | 'failed'>().notNull(),
stage: text('stage').notNull(),
stage_detail: text('stage_detail'),
stage_context: text('stage_context', { mode: 'json' }).$type<TaskStageContext | null>(),
resource_key: text('resource_key'),
notification_read_at: text('notification_read_at'),
notification_silenced_at: text('notification_silenced_at'),
@@ -535,6 +537,7 @@ export const taskRun = sqliteTable('task_run', {
finished_at: text('finished_at')
}, (table) => ({
taskUserCreatedIndex: index('task_user_created_idx').on(table.user_id, table.created_at),
taskUserUpdatedIndex: index('task_user_updated_idx').on(table.user_id, table.updated_at),
taskStatusIndex: index('task_status_idx').on(table.status),
taskUserResourceStatusIndex: index('task_user_resource_status_idx').on(
table.user_id,
@@ -552,6 +555,7 @@ export const taskStageEvent = sqliteTable('task_stage_event', {
user_id: text('user_id').notNull().references(() => user.id, { onDelete: 'cascade' }),
stage: text('stage').notNull(),
stage_detail: text('stage_detail'),
stage_context: text('stage_context', { mode: 'json' }).$type<TaskStageContext | null>(),
status: text('status').$type<'queued' | 'running' | 'completed' | 'failed'>().notNull(),
created_at: text('created_at').notNull()
}, (table) => ({

View File

@@ -0,0 +1,222 @@
import {
afterAll,
beforeAll,
beforeEach,
describe,
expect,
it
} from 'bun:test';
import { mkdtempSync, readFileSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { Database } from 'bun:sqlite';
const TEST_USER_ID = 'task-test-user';
let tempDir: string | null = null;
let sqliteClient: Database | null = null;
let tasksRepo: typeof import('./tasks') | null = null;
function resetDbSingletons() {
const globalState = globalThis as typeof globalThis & {
__fiscalSqliteClient?: { close?: () => void };
__fiscalDrizzleDb?: unknown;
};
globalState.__fiscalSqliteClient?.close?.();
globalState.__fiscalSqliteClient = undefined;
globalState.__fiscalDrizzleDb = undefined;
}
function applyMigration(client: Database, fileName: string) {
const sql = readFileSync(join(process.cwd(), 'drizzle', fileName), 'utf8');
client.exec(sql);
}
function ensureUser(client: Database) {
const now = Date.now();
client.exec(`
INSERT OR REPLACE INTO user (id, name, email, emailVerified, image, createdAt, updatedAt, role, banned, banReason, banExpires)
VALUES ('${TEST_USER_ID}', 'Task Test User', 'tasks@example.com', 1, NULL, ${now}, ${now}, NULL, 0, NULL, NULL);
`);
}
function clearTasks(client: Database) {
client.exec('DELETE FROM task_stage_event;');
client.exec('DELETE FROM task_run;');
}
describe('task repos', () => {
beforeAll(async () => {
tempDir = mkdtempSync(join(tmpdir(), 'fiscal-task-repo-'));
const env = process.env as Record<string, string | undefined>;
env.DATABASE_URL = `file:${join(tempDir, 'repo.sqlite')}`;
env.NODE_ENV = 'test';
resetDbSingletons();
sqliteClient = new Database(join(tempDir, 'repo.sqlite'), { create: true });
sqliteClient.exec('PRAGMA foreign_keys = ON;');
for (const file of [
'0000_cold_silver_centurion.sql',
'0001_glossy_statement_snapshots.sql',
'0002_workflow_task_projection_metadata.sql',
'0003_task_stage_event_timeline.sql',
'0004_watchlist_company_taxonomy.sql',
'0005_financial_taxonomy_v3.sql',
'0006_coverage_journal_tracking.sql',
'0007_company_financial_bundles.sql',
'0008_research_workspace.sql',
'0009_task_notification_context.sql'
]) {
applyMigration(sqliteClient, file);
}
ensureUser(sqliteClient);
tasksRepo = await import('./tasks');
});
afterAll(() => {
sqliteClient?.close();
resetDbSingletons();
if (tempDir) {
rmSync(tempDir, { recursive: true, force: true });
}
});
beforeEach(() => {
if (!sqliteClient) {
throw new Error('sqlite client not initialized');
}
clearTasks(sqliteClient);
});
it('updates same-stage progress without duplicating stage events', async () => {
if (!tasksRepo) {
throw new Error('tasks repo not initialized');
}
const task = await tasksRepo.createTaskRunRecord({
id: 'task-progress',
user_id: TEST_USER_ID,
task_type: 'index_search',
payload: { ticker: 'AAPL' },
priority: 50,
max_attempts: 3,
resource_key: 'index_search:ticker:AAPL'
});
await tasksRepo.markTaskRunning(task.id);
await tasksRepo.updateTaskStage(task.id, 'search.embed', 'Embedding 1 of 3 sources', {
progress: { current: 1, total: 3, unit: 'sources' },
counters: { chunksEmbedded: 12 },
subject: { ticker: 'AAPL', label: 'doc-1' }
});
await tasksRepo.updateTaskStage(task.id, 'search.embed', 'Embedding 2 of 3 sources', {
progress: { current: 2, total: 3, unit: 'sources' },
counters: { chunksEmbedded: 24 },
subject: { ticker: 'AAPL', label: 'doc-2' }
});
const current = await tasksRepo.getTaskByIdForUser(task.id, TEST_USER_ID);
const events = await tasksRepo.listTaskStageEventsForTask(task.id, TEST_USER_ID);
expect(current?.stage_detail).toBe('Embedding 2 of 3 sources');
expect(current?.stage_context?.progress?.current).toBe(2);
expect(events.filter((event) => event.stage === 'search.embed')).toHaveLength(1);
});
it('lists recent tasks by updated_at descending', async () => {
if (!tasksRepo) {
throw new Error('tasks repo not initialized');
}
const first = await tasksRepo.createTaskRunRecord({
id: 'task-first',
user_id: TEST_USER_ID,
task_type: 'refresh_prices',
payload: {},
priority: 50,
max_attempts: 3,
resource_key: 'refresh_prices:portfolio'
});
await Bun.sleep(5);
const second = await tasksRepo.createTaskRunRecord({
id: 'task-second',
user_id: TEST_USER_ID,
task_type: 'portfolio_insights',
payload: {},
priority: 50,
max_attempts: 3,
resource_key: 'portfolio_insights:portfolio'
});
await Bun.sleep(5);
await tasksRepo.updateTaskStage(first.id, 'refresh.fetch_quotes', 'Fetching quotes', {
progress: { current: 1, total: 3, unit: 'tickers' }
});
const tasks = await tasksRepo.listRecentTasksForUser(TEST_USER_ID, 10);
expect(tasks[0]?.id).toBe(first.id);
expect(tasks[1]?.id).toBe(second.id);
});
it('preserves completion and failure detail/context on terminal tasks', async () => {
if (!tasksRepo) {
throw new Error('tasks repo not initialized');
}
const completedTask = await tasksRepo.createTaskRunRecord({
id: 'task-completed',
user_id: TEST_USER_ID,
task_type: 'analyze_filing',
payload: { accessionNumber: '0000320193-26-000001' },
priority: 50,
max_attempts: 3,
resource_key: 'analyze_filing:0000320193-26-000001'
});
await tasksRepo.completeTask(completedTask.id, {
ticker: 'AAPL',
accessionNumber: '0000320193-26-000001',
filingType: '10-Q'
}, {
detail: 'Analysis report generated for AAPL 10-Q 0000320193-26-000001.',
context: {
subject: {
ticker: 'AAPL',
accessionNumber: '0000320193-26-000001',
label: '10-Q'
}
}
});
const failedTask = await tasksRepo.createTaskRunRecord({
id: 'task-failed',
user_id: TEST_USER_ID,
task_type: 'index_search',
payload: { ticker: 'AAPL' },
priority: 50,
max_attempts: 3,
resource_key: 'index_search:ticker:AAPL'
});
await tasksRepo.markTaskFailure(failedTask.id, 'Embedding request failed', 'failed', {
detail: 'Embedding request failed',
context: {
progress: { current: 2, total: 5, unit: 'sources' },
counters: { chunksEmbedded: 20 },
subject: { ticker: 'AAPL', label: 'doc-2' }
}
});
const completed = await tasksRepo.getTaskByIdForUser(completedTask.id, TEST_USER_ID);
const failed = await tasksRepo.getTaskByIdForUser(failedTask.id, TEST_USER_ID);
expect(completed?.stage_detail).toContain('Analysis report generated');
expect(completed?.stage_context?.subject?.ticker).toBe('AAPL');
expect(failed?.stage_detail).toBe('Embedding request failed');
expect(failed?.stage_context?.progress?.current).toBe(2);
});
});

View File

@@ -1,7 +1,8 @@
import { and, asc, desc, eq, inArray, sql } from 'drizzle-orm';
import type { Task, TaskStage, TaskStageEvent, TaskStatus, TaskType } from '@/lib/types';
import type { Task, TaskStage, TaskStageContext, TaskStageEvent, TaskStatus, TaskType } from '@/lib/types';
import { db } from '@/lib/server/db';
import { taskRun, taskStageEvent } from '@/lib/server/db/schema';
import { buildTaskNotification } from '@/lib/server/task-notifications';
type TaskRow = typeof taskRun.$inferSelect;
type TaskStageEventRow = typeof taskStageEvent.$inferSelect;
@@ -26,20 +27,27 @@ type EventInsertInput = {
user_id: string;
stage: TaskStage;
stage_detail: string | null;
stage_context: TaskStageContext | null;
status: TaskStatus;
created_at: string;
};
type TaskCompletionState = {
detail?: string | null;
context?: TaskStageContext | null;
};
type InsertExecutor = Pick<typeof db, 'insert'>;
function toTask(row: TaskRow): Task {
return {
const task = {
id: row.id,
user_id: row.user_id,
task_type: row.task_type,
status: row.status,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
stage_context: row.stage_context ?? null,
resource_key: row.resource_key,
notification_read_at: row.notification_read_at,
notification_silenced_at: row.notification_silenced_at,
@@ -53,6 +61,11 @@ function toTask(row: TaskRow): Task {
created_at: row.created_at,
updated_at: row.updated_at,
finished_at: row.finished_at
} satisfies Omit<Task, 'notification'>;
return {
...task,
notification: buildTaskNotification(task)
};
}
@@ -63,6 +76,7 @@ function toTaskStageEvent(row: TaskStageEventRow): TaskStageEvent {
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
stage_context: row.stage_context ?? null,
status: row.status as TaskStatus,
created_at: row.created_at
};
@@ -89,6 +103,7 @@ async function insertTaskStageEvent(executor: InsertExecutor, input: EventInsert
user_id: input.user_id,
stage: input.stage,
stage_detail: input.stage_detail,
stage_context: input.stage_context,
status: input.status,
created_at: input.created_at
});
@@ -107,6 +122,7 @@ export async function createTaskRunRecord(input: CreateTaskInput) {
status: 'queued',
stage: 'queued',
stage_detail: null,
stage_context: null,
resource_key: input.resource_key ?? null,
notification_read_at: null,
notification_silenced_at: null,
@@ -128,6 +144,7 @@ export async function createTaskRunRecord(input: CreateTaskInput) {
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
stage_context: row.stage_context ?? null,
status: row.status,
created_at: now
});
@@ -168,13 +185,13 @@ export async function listRecentTasksForUser(
.select()
.from(taskRun)
.where(and(eq(taskRun.user_id, userId), inArray(taskRun.status, statuses)))
.orderBy(desc(taskRun.created_at))
.orderBy(desc(taskRun.updated_at), desc(taskRun.created_at))
.limit(safeLimit)
: await db
.select()
.from(taskRun)
.where(eq(taskRun.user_id, userId))
.orderBy(desc(taskRun.created_at))
.orderBy(desc(taskRun.updated_at), desc(taskRun.created_at))
.limit(safeLimit);
return rows.map(toTask);
@@ -212,7 +229,7 @@ export async function findInFlightTaskByResourceKey(
eq(taskRun.resource_key, resourceKey),
inArray(taskRun.status, ['queued', 'running'])
))
.orderBy(desc(taskRun.created_at))
.orderBy(desc(taskRun.updated_at), desc(taskRun.created_at))
.limit(1);
return row ? toTask(row) : null;
@@ -228,6 +245,7 @@ export async function markTaskRunning(taskId: string) {
status: 'running',
stage: 'running',
stage_detail: 'Workflow task is now running',
stage_context: null,
attempts: sql`${taskRun.attempts} + 1`,
updated_at: now,
finished_at: null
@@ -244,6 +262,7 @@ export async function markTaskRunning(taskId: string) {
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
stage_context: row.stage_context ?? null,
status: row.status,
created_at: now
});
@@ -252,15 +271,31 @@ export async function markTaskRunning(taskId: string) {
});
}
export async function updateTaskStage(taskId: string, stage: TaskStage, detail: string | null = null) {
export async function updateTaskStage(
taskId: string,
stage: TaskStage,
detail: string | null = null,
context: TaskStageContext | null = null
) {
const now = new Date().toISOString();
return await db.transaction(async (tx) => {
const [current] = await tx
.select()
.from(taskRun)
.where(eq(taskRun.id, taskId))
.limit(1);
if (!current) {
return null;
}
const [row] = await tx
.update(taskRun)
.set({
stage,
stage_detail: detail,
stage_context: context,
updated_at: now
})
.where(eq(taskRun.id, taskId))
@@ -270,20 +305,27 @@ export async function updateTaskStage(taskId: string, stage: TaskStage, detail:
return null;
}
await insertTaskStageEvent(tx, {
task_id: row.id,
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
status: row.status,
created_at: now
});
if (current.stage !== stage) {
await insertTaskStageEvent(tx, {
task_id: row.id,
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
stage_context: row.stage_context ?? null,
status: row.status,
created_at: now
});
}
return toTask(row);
});
}
export async function completeTask(taskId: string, result: Record<string, unknown>) {
export async function completeTask(
taskId: string,
result: Record<string, unknown>,
completion: TaskCompletionState = {}
) {
const now = new Date().toISOString();
return await db.transaction(async (tx) => {
@@ -292,7 +334,8 @@ export async function completeTask(taskId: string, result: Record<string, unknow
.set({
status: 'completed',
stage: 'completed',
stage_detail: null,
stage_detail: completion.detail ?? 'Task finished successfully.',
stage_context: completion.context ?? null,
result,
error: null,
updated_at: now,
@@ -310,6 +353,7 @@ export async function completeTask(taskId: string, result: Record<string, unknow
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
stage_context: row.stage_context ?? null,
status: row.status,
created_at: now
});
@@ -318,7 +362,12 @@ export async function completeTask(taskId: string, result: Record<string, unknow
});
}
export async function markTaskFailure(taskId: string, reason: string, stage: TaskStage = 'failed') {
export async function markTaskFailure(
taskId: string,
reason: string,
stage: TaskStage = 'failed',
failure: TaskCompletionState = {}
) {
const now = new Date().toISOString();
return await db.transaction(async (tx) => {
@@ -327,7 +376,8 @@ export async function markTaskFailure(taskId: string, reason: string, stage: Tas
.set({
status: 'failed',
stage,
stage_detail: null,
stage_detail: failure.detail ?? reason,
stage_context: failure.context ?? null,
error: reason,
updated_at: now,
finished_at: now
@@ -344,6 +394,7 @@ export async function markTaskFailure(taskId: string, reason: string, stage: Tas
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
stage_context: row.stage_context ?? null,
status: row.status,
created_at: now
});
@@ -375,7 +426,8 @@ export async function setTaskStatusFromWorkflow(
const hasNoStateChange = current.status === status
&& current.stage === nextStage
&& (current.error ?? null) === nextError
&& current.stage_detail === null
&& (current.stage_detail ?? null) === (nextStatusDetail(status, nextError) ?? null)
&& (current.stage_context ?? null) === null
&& (isTerminal ? current.finished_at !== null : current.finished_at === null);
if (hasNoStateChange) {
@@ -388,7 +440,8 @@ export async function setTaskStatusFromWorkflow(
.set({
status,
stage: nextStage,
stage_detail: null,
stage_detail: nextStatusDetail(status, nextError),
stage_context: null,
error: nextError,
updated_at: now,
finished_at: isTerminal ? now : null
@@ -405,6 +458,7 @@ export async function setTaskStatusFromWorkflow(
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
stage_context: row.stage_context ?? null,
status: row.status,
created_at: now
});
@@ -452,6 +506,22 @@ export async function updateTaskNotificationState(
return row ? toTask(row) : null;
}
function nextStatusDetail(status: TaskStatus, error?: string | null) {
if (status === 'failed') {
return error ?? 'Workflow run failed';
}
if (status === 'completed') {
return 'Workflow run completed.';
}
if (status === 'running') {
return 'Workflow task is now running';
}
return null;
}
export async function listTaskStageEventsForTask(taskId: string, userId: string) {
const rows = await db
.select()

View File

@@ -5,7 +5,8 @@ import type {
SearchAnswerResponse,
SearchCitation,
SearchResult,
SearchSource
SearchSource,
TaskStageContext
} from '@/lib/types';
import { runAiAnalysis, runAiEmbeddings } from '@/lib/server/ai';
import { __dbInternals, getSqliteClient } from '@/lib/server/db';
@@ -90,7 +91,11 @@ type IndexSearchDocumentsInput = {
journalEntryId?: number | null;
sourceKinds?: SearchDocumentSourceKind[];
deleteSourceRefs?: DeleteSourceRef[];
onStage?: (stage: 'collect' | 'fetch' | 'chunk' | 'embed' | 'persist', detail: string) => Promise<void> | void;
onStage?: (
stage: 'collect' | 'fetch' | 'chunk' | 'embed' | 'persist',
detail: string,
context?: TaskStageContext | null
) => Promise<void> | void;
};
type SearchInput = {
@@ -834,22 +839,94 @@ export async function indexSearchDocuments(input: IndexSearchDocumentsInput) {
let skipped = 0;
let deleted = 0;
let chunksEmbedded = 0;
const totalDocuments = materialized.length;
const stageContext = (current: number, subject?: TaskStageContext['subject'] | null): TaskStageContext => ({
progress: {
current,
total: totalDocuments || 1,
unit: 'sources'
},
counters: {
sourcesCollected: totalDocuments,
indexed,
skipped,
deleted,
chunksEmbedded
},
subject: subject ?? (input.ticker ? { ticker: input.ticker } : input.accessionNumber ? { accessionNumber: input.accessionNumber } : null)
});
if (input.deleteSourceRefs && input.deleteSourceRefs.length > 0) {
deleted += deleteSourceRefs(client, input.deleteSourceRefs);
}
for (const document of materialized) {
await input.onStage?.('fetch', `Preparing ${document.sourceKind} ${document.sourceRef}`);
await input.onStage?.(
'collect',
`Collected ${materialized.length} source records for search indexing`,
{
counters: {
sourcesCollected: materialized.length,
deleted
},
subject: input.ticker ? { ticker: input.ticker } : input.accessionNumber ? { accessionNumber: input.accessionNumber } : null
}
);
for (let index = 0; index < materialized.length; index += 1) {
const document = materialized[index];
await input.onStage?.(
'fetch',
`Preparing ${document.sourceKind} ${document.sourceRef}`,
stageContext(index + 1, {
ticker: document.ticker ?? undefined,
accessionNumber: document.accessionNumber ?? undefined,
label: document.sourceRef
})
);
const chunks = chunkDocument(document);
if (chunks.length === 0) {
continue;
}
await input.onStage?.('chunk', `Chunking ${document.sourceKind} ${document.sourceRef}`);
await input.onStage?.('embed', `Embedding ${chunks.length} chunks for ${document.sourceRef}`);
await input.onStage?.(
'chunk',
`Chunking ${document.sourceKind} ${document.sourceRef}`,
stageContext(index + 1, {
ticker: document.ticker ?? undefined,
accessionNumber: document.accessionNumber ?? undefined,
label: document.sourceRef
})
);
await input.onStage?.(
'embed',
`Embedding ${chunks.length} chunks for ${document.sourceRef}`,
{
...stageContext(index + 1, {
ticker: document.ticker ?? undefined,
accessionNumber: document.accessionNumber ?? undefined,
label: document.sourceRef
}),
counters: {
sourcesCollected: totalDocuments,
indexed,
skipped,
deleted,
chunksEmbedded
}
}
);
const embeddings = await runAiEmbeddings(chunks.map((chunk) => chunk.chunkText));
await input.onStage?.('persist', `Persisting indexed chunks for ${document.sourceRef}`);
await input.onStage?.(
'persist',
`Persisting indexed chunks for ${document.sourceRef}`,
stageContext(index + 1, {
ticker: document.ticker ?? undefined,
accessionNumber: document.accessionNumber ?? undefined,
label: document.sourceRef
})
);
const result = persistDocumentIndex(client, document, chunks, embeddings);
if (result.skipped) {

View File

@@ -0,0 +1,122 @@
import { describe, expect, it } from 'bun:test';
import type { Task } from '@/lib/types';
import { buildTaskNotification } from '@/lib/server/task-notifications';
function baseTask(overrides: Partial<Omit<Task, 'notification'>> = {}): Omit<Task, 'notification'> {
return {
id: 'task-1',
user_id: 'user-1',
task_type: 'sync_filings',
status: 'running',
stage: 'sync.extract_taxonomy',
stage_detail: 'Extracting XBRL taxonomy for 0000320193-26-000001',
stage_context: {
progress: {
current: 2,
total: 5,
unit: 'filings'
},
counters: {
hydrated: 1,
failed: 0
},
subject: {
ticker: 'AAPL',
accessionNumber: '0000320193-26-000001'
}
},
resource_key: 'sync_filings:AAPL',
notification_read_at: null,
notification_silenced_at: null,
priority: 50,
payload: {
ticker: 'AAPL',
limit: 20
},
result: null,
error: null,
attempts: 1,
max_attempts: 3,
workflow_run_id: 'run-1',
created_at: '2026-03-09T10:00:00.000Z',
updated_at: '2026-03-09T10:05:00.000Z',
finished_at: null,
...overrides
};
}
describe('task notification builder', () => {
it('builds progress-driven notifications for running sync jobs', () => {
const notification = buildTaskNotification(baseTask());
expect(notification.title).toBe('Filing sync');
expect(notification.statusLine).toContain('Running');
expect(notification.progress?.percent).toBe(40);
expect(notification.stats.some((stat) => stat.label === 'Hydrated' && stat.value === '1')).toBe(true);
expect(notification.actions[0]).toMatchObject({
id: 'open_filings',
primary: true,
href: '/filings?ticker=AAPL'
});
});
it('builds report actions for completed analyze jobs', () => {
const notification = buildTaskNotification(baseTask({
task_type: 'analyze_filing',
status: 'completed',
stage: 'completed',
stage_detail: 'Analysis report generated for AAPL 10-Q 0000320193-26-000001.',
stage_context: {
subject: {
ticker: 'AAPL',
accessionNumber: '0000320193-26-000001',
label: '10-Q'
}
},
payload: {
accessionNumber: '0000320193-26-000001'
},
result: {
ticker: 'AAPL',
accessionNumber: '0000320193-26-000001',
filingType: '10-Q',
model: 'test-model'
},
finished_at: '2026-03-09T10:06:00.000Z'
}));
expect(notification.tone).toBe('success');
expect(notification.actions[0]).toMatchObject({
id: 'open_analysis_report',
label: 'Open summary',
primary: true
});
expect(notification.actions[0]?.href).toContain('/analysis/reports/AAPL/0000320193-26-000001');
expect(notification.stats.some((stat) => stat.label === 'Form' && stat.value === '10-Q')).toBe(true);
});
it('keeps filings navigation available for failed analyze jobs', () => {
const notification = buildTaskNotification(baseTask({
task_type: 'analyze_filing',
status: 'failed',
stage: 'failed',
stage_detail: 'Primary filing document fetch failed.',
error: 'Primary filing document fetch failed.',
stage_context: {
subject: {
ticker: 'AAPL',
accessionNumber: '0000320193-26-000001'
}
},
payload: {
accessionNumber: '0000320193-26-000001'
},
result: null,
finished_at: '2026-03-09T10:06:00.000Z'
}));
expect(notification.tone).toBe('error');
expect(notification.detailLine).toBe('Primary filing document fetch failed.');
expect(notification.actions.some((action) => action.id === 'open_filings')).toBe(true);
});
});

View File

@@ -0,0 +1,236 @@
import {
fallbackStageProgress,
stageLabel,
taskTypeLabel
} from '@/lib/task-workflow';
import type {
Task,
TaskNotificationAction,
TaskNotificationStat,
TaskNotificationView
} from '@/lib/types';
type TaskCore = Omit<Task, 'notification'>;
function asRecord(value: unknown) {
return value && typeof value === 'object' && !Array.isArray(value)
? value as Record<string, unknown>
: null;
}
function asString(value: unknown) {
return typeof value === 'string' && value.trim().length > 0 ? value.trim() : null;
}
function asNumber(value: unknown) {
if (typeof value === 'number' && Number.isFinite(value)) {
return value;
}
if (typeof value === 'string' && value.trim().length > 0) {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : null;
}
return null;
}
function formatInteger(value: number) {
return new Intl.NumberFormat('en-US', { maximumFractionDigits: 0 }).format(value);
}
function buildProgress(task: TaskCore) {
const currentProgress = task.stage_context?.progress ?? fallbackStageProgress(task);
if (!currentProgress || currentProgress.total <= 0) {
return null;
}
const current = Math.min(Math.max(Math.trunc(currentProgress.current), 0), Math.trunc(currentProgress.total));
const total = Math.max(Math.trunc(currentProgress.total), 1);
const percent = total > 0 ? Math.min(100, Math.max(0, Math.round((current / total) * 100))) : null;
return {
current,
total,
unit: currentProgress.unit,
percent
};
}
function makeStat(label: string, value: number | string | null | undefined): TaskNotificationStat | null {
if (value === null || value === undefined) {
return null;
}
if (typeof value === 'number') {
return { label, value: formatInteger(value) };
}
const normalized = value.trim();
return normalized ? { label, value: normalized } : null;
}
function buildStats(task: TaskCore): TaskNotificationStat[] {
const result = asRecord(task.result);
const counters = task.stage_context?.counters ?? {};
const stats: Array<TaskNotificationStat | null> = [];
switch (task.task_type) {
case 'sync_filings':
stats.push(
makeStat('Fetched', asNumber(result?.fetched) ?? counters.fetched ?? task.stage_context?.progress?.total ?? null),
makeStat('Inserted', asNumber(result?.inserted) ?? counters.inserted ?? null),
makeStat('Updated', asNumber(result?.updated) ?? counters.updated ?? null),
makeStat('Hydrated', asNumber(result?.taxonomySnapshotsHydrated) ?? counters.hydrated ?? null),
makeStat('Failed', asNumber(result?.taxonomySnapshotsFailed) ?? counters.failed ?? null)
);
break;
case 'refresh_prices':
stats.push(
makeStat('Tickers', asNumber(result?.totalTickers) ?? task.stage_context?.progress?.total ?? null),
makeStat('Updated', asNumber(result?.updatedCount) ?? counters.updatedCount ?? null),
makeStat('Holdings', counters.holdings ?? null)
);
break;
case 'analyze_filing':
stats.push(
makeStat('Ticker', asString(result?.ticker) ?? task.stage_context?.subject?.ticker ?? null),
makeStat('Form', asString(result?.filingType) ?? null),
makeStat('Model', asString(result?.model) ?? null)
);
break;
case 'index_search':
stats.push(
makeStat('Sources', asNumber(result?.sourcesCollected) ?? counters.sourcesCollected ?? task.stage_context?.progress?.total ?? null),
makeStat('Indexed', asNumber(result?.indexed) ?? counters.indexed ?? null),
makeStat('Chunks', asNumber(result?.chunksEmbedded) ?? counters.chunksEmbedded ?? null),
makeStat('Skipped', asNumber(result?.skipped) ?? counters.skipped ?? null),
makeStat('Deleted', asNumber(result?.deleted) ?? counters.deleted ?? null)
);
break;
case 'portfolio_insights': {
const summary = asRecord(result?.summary);
stats.push(
makeStat('Positions', asNumber(summary?.positions) ?? counters.holdings ?? null),
makeStat('Provider', asString(result?.provider) ?? null),
makeStat('Model', asString(result?.model) ?? null)
);
break;
}
}
if (stats.every((stat) => stat === null)) {
const fallbackStats: Array<TaskNotificationStat | null> = [];
for (const [label, value] of Object.entries(counters)) {
fallbackStats.push(makeStat(label, value));
}
return fallbackStats.filter((stat): stat is TaskNotificationStat => Boolean(stat));
}
return stats.filter((stat): stat is TaskNotificationStat => Boolean(stat));
}
function buildTaskHref(task: TaskCore) {
const result = asRecord(task.result);
const payload = asRecord(task.payload);
const ticker = asString(result?.ticker)
?? task.stage_context?.subject?.ticker
?? asString(payload?.ticker);
const accessionNumber = asString(result?.accessionNumber)
?? task.stage_context?.subject?.accessionNumber
?? asString(payload?.accessionNumber);
return {
ticker,
accessionNumber
};
}
function buildActions(task: TaskCore): TaskNotificationAction[] {
const { ticker, accessionNumber } = buildTaskHref(task);
const actions: TaskNotificationAction[] = [];
switch (task.task_type) {
case 'sync_filings':
actions.push({
id: 'open_filings',
label: 'Open filings',
href: ticker ? `/filings?ticker=${encodeURIComponent(ticker)}` : '/filings',
primary: true
});
break;
case 'analyze_filing':
if (ticker && accessionNumber) {
actions.push({
id: 'open_analysis_report',
label: 'Open summary',
href: `/analysis/reports/${encodeURIComponent(ticker)}/${encodeURIComponent(accessionNumber)}`,
primary: true
});
}
actions.push({
id: 'open_filings',
label: 'Open filings',
href: ticker ? `/filings?ticker=${encodeURIComponent(ticker)}` : '/filings',
primary: actions.length === 0
});
break;
case 'refresh_prices':
case 'portfolio_insights':
actions.push({
id: 'open_portfolio',
label: 'Open portfolio',
href: '/portfolio',
primary: true
});
break;
case 'index_search':
actions.push({
id: 'open_search',
label: 'Open search',
href: ticker ? `/search?ticker=${encodeURIComponent(ticker)}` : '/search',
primary: true
});
break;
}
actions.push({
id: 'open_details',
label: 'Open details',
href: null
});
return actions;
}
function buildStatusLine(task: TaskCore, progress: TaskNotificationView['progress']) {
switch (task.status) {
case 'queued':
return 'Queued for execution';
case 'running':
return progress?.percent !== null && progress?.percent !== undefined
? `Running ${stageLabel(task.stage).toLowerCase()} · ${progress.percent}%`
: `Running ${stageLabel(task.stage).toLowerCase()}`;
case 'completed':
return 'Finished successfully';
case 'failed':
return 'Failed';
}
}
export function buildTaskNotification(task: TaskCore): TaskNotificationView {
const progress = buildProgress(task);
const detailLine = task.error ?? task.stage_detail;
return {
title: taskTypeLabel(task.task_type),
statusLine: buildStatusLine(task, progress),
detailLine,
tone: task.status === 'failed' ? 'error' : task.status === 'completed' ? 'success' : 'info',
progress,
stats: buildStats(task),
actions: buildActions(task)
};
}

View File

@@ -0,0 +1,413 @@
import {
beforeEach,
describe,
expect,
it,
mock
} from 'bun:test';
import type { Filing, Holding, Task } from '@/lib/types';
const stageUpdates: Array<{
taskId: string;
stage: string;
detail: string | null;
context: Record<string, unknown> | null;
}> = [];
const mockRunAiAnalysis = mock(async (_prompt: string, _instruction: string, options?: { workload?: string }) => {
if (options?.workload === 'extraction') {
return {
provider: 'zhipu',
model: 'glm-extract',
text: JSON.stringify({
summary: 'Revenue growth remained resilient despite FX pressure.',
keyPoints: ['Revenue up year-over-year'],
redFlags: ['Debt service burden is rising'],
followUpQuestions: ['Is margin guidance sustainable?'],
portfolioSignals: ['Monitor leverage trend'],
segmentSpecificData: ['Services segment outgrew hardware segment.'],
geographicRevenueBreakdown: ['EMEA revenue grew faster than Americas.'],
companySpecificData: ['Same-store sales increased 4.2%.'],
secApiCrossChecks: ['Revenue from SEC API aligns with filing narrative.'],
confidence: 0.72
})
};
}
return {
provider: 'zhipu',
model: options?.workload === 'report' ? 'glm-report' : 'glm-generic',
text: 'Structured output'
};
});
const mockBuildPortfolioSummary = mock((_holdings: Holding[]) => ({
positions: 14,
total_value: '100000',
total_gain_loss: '1000',
total_cost_basis: '99000',
avg_return_pct: '0.01'
}));
const mockGetQuote = mock(async (ticker: string) => {
return ticker === 'MSFT' ? 410 : 205;
});
const mockIndexSearchDocuments = mock(async (input: {
onStage?: (stage: 'collect' | 'fetch' | 'chunk' | 'embed' | 'persist', detail: string, context?: Record<string, unknown> | null) => Promise<void> | void;
}) => {
await input.onStage?.('collect', 'Collected 12 source records for search indexing', {
counters: {
sourcesCollected: 12,
deleted: 3
}
});
await input.onStage?.('fetch', 'Preparing filing_brief 0000320193-26-000001', {
progress: {
current: 1,
total: 12,
unit: 'sources'
},
subject: {
ticker: 'AAPL',
accessionNumber: '0000320193-26-000001'
}
});
await input.onStage?.('embed', 'Embedding 248 chunks for 0000320193-26-000001', {
progress: {
current: 1,
total: 12,
unit: 'sources'
},
counters: {
chunksEmbedded: 248
}
});
return {
sourcesCollected: 12,
indexed: 12,
skipped: 1,
deleted: 3,
chunksEmbedded: 248
};
});
const sampleFiling = (): Filing => ({
id: 1,
ticker: 'AAPL',
filing_type: '10-Q',
filing_date: '2026-01-30',
accession_number: '0000320193-26-000001',
cik: '0000320193',
company_name: 'Apple Inc.',
filing_url: 'https://www.sec.gov/Archives/edgar/data/320193/000032019326000001/a10q.htm',
submission_url: 'https://data.sec.gov/submissions/CIK0000320193.json',
primary_document: 'a10q.htm',
metrics: {
revenue: 120_000_000_000,
netIncome: 25_000_000_000,
totalAssets: 410_000_000_000,
cash: 70_000_000_000,
debt: 98_000_000_000
},
analysis: null,
created_at: '2026-01-30T00:00:00.000Z',
updated_at: '2026-01-30T00:00:00.000Z'
});
const mockGetFilingByAccession = mock(async () => sampleFiling());
const mockListFilingsRecords = mock(async () => [sampleFiling(), {
...sampleFiling(),
id: 2,
accession_number: '0000320193-26-000002',
filing_date: '2026-02-28'
}]);
const mockSaveFilingAnalysis = mock(async () => {});
const mockUpdateFilingMetricsById = mock(async () => {});
const mockUpsertFilingsRecords = mock(async () => ({
inserted: 2,
updated: 0
}));
const mockDeleteCompanyFinancialBundlesForTicker = mock(async () => {});
const mockGetFilingTaxonomySnapshotByFilingId = mock(async () => null);
const mockUpsertFilingTaxonomySnapshot = mock(async () => {});
const mockApplyRefreshedPrices = mock(async () => 24);
const mockListHoldingsForPriceRefresh = mock(async () => [
{
id: 1,
user_id: 'user-1',
ticker: 'AAPL',
company_name: 'Apple Inc.',
shares: '10',
avg_cost: '150',
current_price: '200',
market_value: '2000',
gain_loss: '500',
gain_loss_pct: '0.33',
last_price_at: null,
created_at: '2026-03-09T00:00:00.000Z',
updated_at: '2026-03-09T00:00:00.000Z'
},
{
id: 2,
user_id: 'user-1',
ticker: 'MSFT',
company_name: 'Microsoft Corporation',
shares: '4',
avg_cost: '300',
current_price: '400',
market_value: '1600',
gain_loss: '400',
gain_loss_pct: '0.25',
last_price_at: null,
created_at: '2026-03-09T00:00:00.000Z',
updated_at: '2026-03-09T00:00:00.000Z'
}
]);
const mockListUserHoldings = mock(async () => await mockListHoldingsForPriceRefresh());
const mockCreatePortfolioInsight = mock(async () => {});
const mockUpdateTaskStage = mock(async (taskId: string, stage: string, detail: string | null, context?: Record<string, unknown> | null) => {
stageUpdates.push({
taskId,
stage,
detail,
context: context ?? null
});
});
const mockFetchPrimaryFilingText = mock(async () => ({
text: 'Revenue accelerated in services and margins improved.',
source: 'primary_document' as const
}));
const mockFetchRecentFilings = mock(async () => ([
{
ticker: 'AAPL',
filingType: '10-Q',
filingDate: '2026-01-30',
accessionNumber: '0000320193-26-000001',
cik: '0000320193',
companyName: 'Apple Inc.',
filingUrl: 'https://www.sec.gov/Archives/edgar/data/320193/000032019326000001/a10q.htm',
submissionUrl: 'https://data.sec.gov/submissions/CIK0000320193.json',
primaryDocument: 'a10q.htm'
},
{
ticker: 'AAPL',
filingType: '10-K',
filingDate: '2025-10-30',
accessionNumber: '0000320193-25-000001',
cik: '0000320193',
companyName: 'Apple Inc.',
filingUrl: 'https://www.sec.gov/Archives/edgar/data/320193/000032019325000001/a10k.htm',
submissionUrl: 'https://data.sec.gov/submissions/CIK0000320193.json',
primaryDocument: 'a10k.htm'
}
]));
const mockEnqueueTask = mock(async () => ({
id: 'search-task-1'
}));
const mockHydrateFilingTaxonomySnapshot = mock(async (input: { filingId: number }) => ({
filing_id: input.filingId,
ticker: 'AAPL',
filing_date: '2026-01-30',
filing_type: '10-Q',
parse_status: 'ready',
parse_error: null,
source: 'xbrl_instance',
periods: [],
statement_rows: {
income: [],
balance: [],
cash_flow: [],
equity: [],
comprehensive_income: []
},
derived_metrics: {
revenue: 120_000_000_000
},
validation_result: {
status: 'matched',
checks: [],
validatedAt: '2026-03-09T00:00:00.000Z'
},
facts_count: 1,
concepts_count: 1,
dimensions_count: 0,
assets: [],
concepts: [],
facts: [],
metric_validations: []
}));
mock.module('@/lib/server/ai', () => ({
runAiAnalysis: mockRunAiAnalysis
}));
mock.module('@/lib/server/portfolio', () => ({
buildPortfolioSummary: mockBuildPortfolioSummary
}));
mock.module('@/lib/server/prices', () => ({
getQuote: mockGetQuote
}));
mock.module('@/lib/server/search', () => ({
indexSearchDocuments: mockIndexSearchDocuments
}));
mock.module('@/lib/server/repos/filings', () => ({
getFilingByAccession: mockGetFilingByAccession,
listFilingsRecords: mockListFilingsRecords,
saveFilingAnalysis: mockSaveFilingAnalysis,
updateFilingMetricsById: mockUpdateFilingMetricsById,
upsertFilingsRecords: mockUpsertFilingsRecords
}));
mock.module('@/lib/server/repos/company-financial-bundles', () => ({
deleteCompanyFinancialBundlesForTicker: mockDeleteCompanyFinancialBundlesForTicker
}));
mock.module('@/lib/server/repos/filing-taxonomy', () => ({
getFilingTaxonomySnapshotByFilingId: mockGetFilingTaxonomySnapshotByFilingId,
upsertFilingTaxonomySnapshot: mockUpsertFilingTaxonomySnapshot
}));
mock.module('@/lib/server/repos/holdings', () => ({
applyRefreshedPrices: mockApplyRefreshedPrices,
listHoldingsForPriceRefresh: mockListHoldingsForPriceRefresh,
listUserHoldings: mockListUserHoldings
}));
mock.module('@/lib/server/repos/insights', () => ({
createPortfolioInsight: mockCreatePortfolioInsight
}));
mock.module('@/lib/server/repos/tasks', () => ({
updateTaskStage: mockUpdateTaskStage
}));
mock.module('@/lib/server/sec', () => ({
fetchPrimaryFilingText: mockFetchPrimaryFilingText,
fetchRecentFilings: mockFetchRecentFilings
}));
mock.module('@/lib/server/tasks', () => ({
enqueueTask: mockEnqueueTask
}));
mock.module('@/lib/server/taxonomy/engine', () => ({
hydrateFilingTaxonomySnapshot: mockHydrateFilingTaxonomySnapshot
}));
const { runTaskProcessor } = await import('./task-processors');
function taskFactory(overrides: Partial<Task> = {}): Task {
return {
id: 'task-1',
user_id: 'user-1',
task_type: 'sync_filings',
status: 'running',
stage: 'running',
stage_detail: 'Running',
stage_context: null,
resource_key: null,
notification_read_at: null,
notification_silenced_at: null,
priority: 50,
payload: {},
result: null,
error: null,
attempts: 1,
max_attempts: 3,
workflow_run_id: 'run-1',
created_at: '2026-03-09T00:00:00.000Z',
updated_at: '2026-03-09T00:00:00.000Z',
finished_at: null,
notification: {
title: 'Task',
statusLine: 'Running',
detailLine: null,
tone: 'info',
progress: null,
stats: [],
actions: []
},
...overrides
};
}
describe('task processor outcomes', () => {
beforeEach(() => {
stageUpdates.length = 0;
mockRunAiAnalysis.mockClear();
mockGetQuote.mockClear();
mockIndexSearchDocuments.mockClear();
mockSaveFilingAnalysis.mockClear();
mockCreatePortfolioInsight.mockClear();
mockUpdateTaskStage.mockClear();
mockEnqueueTask.mockClear();
});
it('returns sync filing completion detail and progress context', async () => {
const outcome = await runTaskProcessor(taskFactory({
task_type: 'sync_filings',
payload: {
ticker: 'AAPL',
limit: 2
}
}));
expect(outcome.completionDetail).toContain('Synced 2 filings for AAPL');
expect(outcome.result.fetched).toBe(2);
expect(outcome.result.searchTaskId).toBe('search-task-1');
expect(outcome.completionContext?.counters?.hydrated).toBe(2);
expect(stageUpdates.some((entry) => entry.stage === 'sync.extract_taxonomy' && entry.context?.subject)).toBe(true);
});
it('returns refresh price completion detail with live quote progress', async () => {
const outcome = await runTaskProcessor(taskFactory({
task_type: 'refresh_prices'
}));
expect(outcome.completionDetail).toBe('Refreshed prices for 2 tickers across 2 holdings.');
expect(outcome.result.updatedCount).toBe(24);
expect(stageUpdates.filter((entry) => entry.stage === 'refresh.fetch_quotes')).toHaveLength(3);
expect(stageUpdates.at(-1)?.context?.counters).toBeDefined();
});
it('returns analyze filing completion detail with report metadata', async () => {
const outcome = await runTaskProcessor(taskFactory({
task_type: 'analyze_filing',
payload: {
accessionNumber: '0000320193-26-000001'
}
}));
expect(outcome.completionDetail).toBe('Analysis report generated for AAPL 10-Q 0000320193-26-000001.');
expect(outcome.result.ticker).toBe('AAPL');
expect(outcome.result.filingType).toBe('10-Q');
expect(outcome.result.model).toBe('glm-report');
expect(mockSaveFilingAnalysis).toHaveBeenCalled();
});
it('returns index search completion detail and counters', async () => {
const outcome = await runTaskProcessor(taskFactory({
task_type: 'index_search',
payload: {
ticker: 'AAPL',
sourceKinds: ['filing_brief']
}
}));
expect(outcome.completionDetail).toBe('Indexed 12 sources, embedded 248 chunks, skipped 1, deleted 3 stale documents.');
expect(outcome.result.indexed).toBe(12);
expect(outcome.completionContext?.counters?.chunksEmbedded).toBe(248);
expect(stageUpdates.some((entry) => entry.stage === 'search.embed')).toBe(true);
});
it('returns portfolio insight completion detail and summary payload', async () => {
const outcome = await runTaskProcessor(taskFactory({
task_type: 'portfolio_insights'
}));
expect(outcome.completionDetail).toBe('Generated portfolio insight for 14 holdings.');
expect(outcome.result.provider).toBe('zhipu');
expect(outcome.result.summary).toEqual({
positions: 14,
total_value: '100000',
total_gain_loss: '1000',
total_cost_basis: '99000',
avg_return_pct: '0.01'
});
expect(mockCreatePortfolioInsight).toHaveBeenCalled();
});
});

View File

@@ -4,7 +4,8 @@ import type {
FilingExtractionMeta,
Holding,
Task,
TaskStage
TaskStage,
TaskStageContext
} from '@/lib/types';
import { runAiAnalysis } from '@/lib/server/ai';
import { buildPortfolioSummary } from '@/lib/server/portfolio';
@@ -137,8 +138,49 @@ function toTaskResult(value: unknown): Record<string, unknown> {
return value as Record<string, unknown>;
}
async function setProjectionStage(task: Task, stage: TaskStage, detail: string | null = null) {
await updateTaskStage(task.id, stage, detail);
export type TaskExecutionOutcome = {
result: Record<string, unknown>;
completionDetail: string;
completionContext?: TaskStageContext | null;
};
function buildTaskOutcome(
result: unknown,
completionDetail: string,
completionContext: TaskStageContext | null = null
): TaskExecutionOutcome {
return {
result: toTaskResult(result),
completionDetail,
completionContext
};
}
async function setProjectionStage(
task: Task,
stage: TaskStage,
detail: string | null = null,
context: TaskStageContext | null = null
) {
await updateTaskStage(task.id, stage, detail, context);
}
function buildProgressContext(input: {
current: number;
total: number;
unit: string;
counters?: Record<string, number>;
subject?: TaskStageContext['subject'];
}): TaskStageContext {
return {
progress: {
current: input.current,
total: input.total,
unit: input.unit
},
counters: input.counters,
subject: input.subject
};
}
function parseTicker(raw: unknown) {
@@ -576,15 +618,25 @@ async function processSyncFilings(task: Task) {
.join(' | ');
let searchTaskId: string | null = null;
const tickerSubject = { ticker };
await setProjectionStage(
task,
'sync.fetch_filings',
`Fetching up to ${limit} filings for ${ticker}${scopeLabel ? ` (${scopeLabel})` : ''}`
`Fetching up to ${limit} filings for ${ticker}${scopeLabel ? ` (${scopeLabel})` : ''}`,
{ subject: tickerSubject }
);
const filings = await fetchRecentFilings(ticker, limit);
await setProjectionStage(task, 'sync.persist_filings', 'Persisting filings and links');
await setProjectionStage(
task,
'sync.persist_filings',
`Persisting ${filings.length} filings and source links`,
{
counters: { fetched: filings.length },
subject: tickerSubject
}
);
const saveResult = await upsertFilingsRecords(
filings.map((filing) => ({
ticker: filing.ticker,
@@ -611,8 +663,26 @@ async function processSyncFilings(task: Task) {
return isFinancialMetricsForm(filing.filing_type);
});
await setProjectionStage(task, 'sync.discover_assets', `Discovering taxonomy assets for ${hydrateCandidates.length} candidate filings`);
for (const filing of hydrateCandidates) {
await setProjectionStage(
task,
'sync.discover_assets',
`Discovering taxonomy assets for ${hydrateCandidates.length} candidate filings`,
buildProgressContext({
current: 0,
total: hydrateCandidates.length,
unit: 'filings',
counters: {
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,
hydrated: 0,
failed: 0
},
subject: tickerSubject
})
);
for (let index = 0; index < hydrateCandidates.length; index += 1) {
const filing = hydrateCandidates[index];
const existingSnapshot = await getFilingTaxonomySnapshotByFilingId(filing.id);
const shouldRefresh = !existingSnapshot
|| Date.parse(existingSnapshot.updated_at) < Date.parse(filing.updated_at);
@@ -621,8 +691,31 @@ async function processSyncFilings(task: Task) {
continue;
}
const stageContext = (stage: TaskStage) => buildProgressContext({
current: index + 1,
total: hydrateCandidates.length,
unit: 'filings',
counters: {
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,
hydrated: taxonomySnapshotsHydrated,
failed: taxonomySnapshotsFailed
},
subject: {
ticker,
accessionNumber: filing.accession_number,
label: stage
}
});
try {
await setProjectionStage(task, 'sync.extract_taxonomy', `Extracting XBRL taxonomy for ${filing.accession_number}`);
await setProjectionStage(
task,
'sync.extract_taxonomy',
`Extracting XBRL taxonomy for ${filing.accession_number}`,
stageContext('sync.extract_taxonomy')
);
const snapshot = await hydrateFilingTaxonomySnapshot({
filingId: filing.id,
ticker: filing.ticker,
@@ -634,10 +727,30 @@ async function processSyncFilings(task: Task) {
primaryDocument: filing.primary_document ?? null
});
await setProjectionStage(task, 'sync.normalize_taxonomy', `Materializing statements for ${filing.accession_number}`);
await setProjectionStage(task, 'sync.derive_metrics', `Deriving taxonomy metrics for ${filing.accession_number}`);
await setProjectionStage(task, 'sync.validate_pdf_metrics', `Validating metrics via PDF + LLM for ${filing.accession_number}`);
await setProjectionStage(task, 'sync.persist_taxonomy', `Persisting taxonomy snapshot for ${filing.accession_number}`);
await setProjectionStage(
task,
'sync.normalize_taxonomy',
`Materializing statements for ${filing.accession_number}`,
stageContext('sync.normalize_taxonomy')
);
await setProjectionStage(
task,
'sync.derive_metrics',
`Deriving taxonomy metrics for ${filing.accession_number}`,
stageContext('sync.derive_metrics')
);
await setProjectionStage(
task,
'sync.validate_pdf_metrics',
`Validating metrics via PDF + LLM for ${filing.accession_number}`,
stageContext('sync.validate_pdf_metrics')
);
await setProjectionStage(
task,
'sync.persist_taxonomy',
`Persisting taxonomy snapshot for ${filing.accession_number}`,
stageContext('sync.persist_taxonomy')
);
await upsertFilingTaxonomySnapshot(snapshot);
await updateFilingMetricsById(filing.id, snapshot.derived_metrics);
@@ -698,7 +811,7 @@ async function processSyncFilings(task: Task) {
console.error(`[search-index-sync] failed for ${ticker}:`, error);
}
return {
const result = {
ticker,
category,
tags,
@@ -709,6 +822,24 @@ async function processSyncFilings(task: Task) {
taxonomySnapshotsFailed,
searchTaskId
};
return buildTaskOutcome(
result,
`Synced ${filings.length} filings for ${ticker}, hydrated ${taxonomySnapshotsHydrated} taxonomy snapshots, failed ${taxonomySnapshotsFailed}.`,
buildProgressContext({
current: hydrateCandidates.length,
total: hydrateCandidates.length || 1,
unit: 'filings',
counters: {
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,
hydrated: taxonomySnapshotsHydrated,
failed: taxonomySnapshotsFailed
},
subject: tickerSubject
})
);
}
async function processRefreshPrices(task: Task) {
@@ -721,20 +852,84 @@ async function processRefreshPrices(task: Task) {
const userHoldings = await listHoldingsForPriceRefresh(userId);
const tickers = [...new Set(userHoldings.map((entry) => entry.ticker))];
const quotes = new Map<string, number>();
const baseContext = {
counters: {
holdings: userHoldings.length
}
} satisfies TaskStageContext;
await setProjectionStage(task, 'refresh.fetch_quotes', `Fetching quotes for ${tickers.length} tickers`);
for (const ticker of tickers) {
await setProjectionStage(
task,
'refresh.load_holdings',
`Loaded ${userHoldings.length} holdings across ${tickers.length} tickers`,
baseContext
);
await setProjectionStage(
task,
'refresh.fetch_quotes',
`Fetching quotes for ${tickers.length} tickers`,
buildProgressContext({
current: 0,
total: tickers.length,
unit: 'tickers',
counters: {
holdings: userHoldings.length
}
})
);
for (let index = 0; index < tickers.length; index += 1) {
const ticker = tickers[index];
const quote = await getQuote(ticker);
quotes.set(ticker, quote);
await setProjectionStage(
task,
'refresh.fetch_quotes',
`Fetching quotes for ${tickers.length} tickers`,
buildProgressContext({
current: index + 1,
total: tickers.length,
unit: 'tickers',
counters: {
holdings: userHoldings.length
},
subject: { ticker }
})
);
}
await setProjectionStage(task, 'refresh.persist_prices', 'Writing refreshed prices to holdings');
await setProjectionStage(
task,
'refresh.persist_prices',
`Writing refreshed prices for ${tickers.length} tickers across ${userHoldings.length} holdings`,
{
counters: {
holdings: userHoldings.length
}
}
);
const updatedCount = await applyRefreshedPrices(userId, quotes, new Date().toISOString());
return {
const result = {
updatedCount,
totalTickers: tickers.length
};
return buildTaskOutcome(
result,
`Refreshed prices for ${tickers.length} tickers across ${userHoldings.length} holdings.`,
{
progress: {
current: tickers.length,
total: tickers.length || 1,
unit: 'tickers'
},
counters: {
holdings: userHoldings.length,
updatedCount
}
}
);
}
async function processAnalyzeFiling(task: Task) {
@@ -746,13 +941,23 @@ async function processAnalyzeFiling(task: Task) {
throw new Error('accessionNumber is required');
}
await setProjectionStage(task, 'analyze.load_filing', `Loading filing ${accessionNumber}`);
await setProjectionStage(task, 'analyze.load_filing', `Loading filing ${accessionNumber}`, {
subject: {
accessionNumber
}
});
const filing = await getFilingByAccession(accessionNumber);
if (!filing) {
throw new Error(`Filing ${accessionNumber} not found`);
}
const analyzeSubject = {
ticker: filing.ticker,
accessionNumber,
label: filing.filing_type
};
const defaultExtraction = deterministicExtractionFallback(filing);
let extraction = defaultExtraction;
let extractionMeta: FilingExtractionMeta = {
@@ -764,7 +969,9 @@ async function processAnalyzeFiling(task: Task) {
let filingDocument: Awaited<ReturnType<typeof fetchPrimaryFilingText>> | null = null;
try {
await setProjectionStage(task, 'analyze.fetch_document', 'Fetching primary filing document');
await setProjectionStage(task, 'analyze.fetch_document', 'Fetching primary filing document', {
subject: analyzeSubject
});
filingDocument = await fetchPrimaryFilingText({
filingUrl: filing.filing_url,
cik: filing.cik,
@@ -776,7 +983,9 @@ async function processAnalyzeFiling(task: Task) {
}
if (filingDocument?.text) {
await setProjectionStage(task, 'analyze.extract', 'Generating extraction context from filing text');
await setProjectionStage(task, 'analyze.extract', 'Generating extraction context from filing text', {
subject: analyzeSubject
});
const ruleBasedExtraction = buildRuleBasedExtraction(filing, filingDocument.text);
const extractionResult = await runAiAnalysis(
extractionPrompt(filing, filingDocument.text),
@@ -798,14 +1007,18 @@ async function processAnalyzeFiling(task: Task) {
};
}
await setProjectionStage(task, 'analyze.generate_report', 'Generating final filing analysis report');
await setProjectionStage(task, 'analyze.generate_report', 'Generating final filing analysis report', {
subject: analyzeSubject
});
const analysis = await runAiAnalysis(
reportPrompt(filing, extraction, extractionMeta),
'Use concise institutional analyst language.',
{ workload: 'report' }
);
await setProjectionStage(task, 'analyze.persist_report', 'Persisting filing analysis output');
await setProjectionStage(task, 'analyze.persist_report', 'Persisting filing analysis output', {
subject: analyzeSubject
});
await saveFilingAnalysis(accessionNumber, {
provider: analysis.provider,
model: analysis.model,
@@ -831,14 +1044,24 @@ async function processAnalyzeFiling(task: Task) {
console.error(`[search-index-analyze] failed for ${accessionNumber}:`, error);
}
return {
const result = {
ticker: filing.ticker,
accessionNumber,
filingType: filing.filing_type,
provider: analysis.provider,
model: analysis.model,
extractionProvider: extractionMeta.provider,
extractionModel: extractionMeta.model,
searchTaskId
};
return buildTaskOutcome(
result,
`Analysis report generated for ${filing.ticker} ${filing.filing_type} ${accessionNumber}.`,
{
subject: analyzeSubject
}
);
}
async function processIndexSearch(task: Task) {
@@ -890,33 +1113,55 @@ async function processIndexSearch(task: Task) {
scope: entry.scope === 'user' ? 'user' : 'global',
userId: typeof entry.userId === 'string' ? entry.userId : null
})),
onStage: async (stage, detail) => {
onStage: async (stage, detail, context) => {
switch (stage) {
case 'collect':
await setProjectionStage(task, 'search.collect_sources', detail);
await setProjectionStage(task, 'search.collect_sources', detail, context ?? {
subject: ticker ? { ticker } : accessionNumber ? { accessionNumber } : null
});
break;
case 'fetch':
await setProjectionStage(task, 'search.fetch_documents', detail);
await setProjectionStage(task, 'search.fetch_documents', detail, context ?? null);
break;
case 'chunk':
await setProjectionStage(task, 'search.chunk', detail);
await setProjectionStage(task, 'search.chunk', detail, context ?? null);
break;
case 'embed':
await setProjectionStage(task, 'search.embed', detail);
await setProjectionStage(task, 'search.embed', detail, context ?? null);
break;
case 'persist':
await setProjectionStage(task, 'search.persist', detail);
await setProjectionStage(task, 'search.persist', detail, context ?? null);
break;
}
}
});
return {
const taskResult = {
ticker,
accessionNumber,
journalEntryId: validatedJournalEntryId,
...result
};
return buildTaskOutcome(
taskResult,
`Indexed ${result.indexed} sources, embedded ${result.chunksEmbedded} chunks, skipped ${result.skipped}, deleted ${result.deleted} stale documents.`,
{
progress: {
current: result.sourcesCollected,
total: result.sourcesCollected || 1,
unit: 'sources'
},
counters: {
sourcesCollected: result.sourcesCollected,
indexed: result.indexed,
chunksEmbedded: result.chunksEmbedded,
skipped: result.skipped,
deleted: result.deleted
},
subject: ticker ? { ticker } : accessionNumber ? { accessionNumber } : null
}
);
}
function holdingDigest(holdings: Holding[]) {
@@ -940,6 +1185,18 @@ async function processPortfolioInsights(task: Task) {
await setProjectionStage(task, 'insights.load_holdings', 'Loading holdings for portfolio insight generation');
const userHoldings = await listUserHoldings(userId);
const summary = buildPortfolioSummary(userHoldings);
const holdingsContext = {
counters: {
holdings: userHoldings.length
}
} satisfies TaskStageContext;
await setProjectionStage(
task,
'insights.load_holdings',
`Loaded ${userHoldings.length} holdings for portfolio insight generation`,
holdingsContext
);
const prompt = [
'Generate portfolio intelligence with actionable recommendations.',
@@ -948,14 +1205,14 @@ async function processPortfolioInsights(task: Task) {
'Respond with: 1) health score (0-100), 2) top 3 risks, 3) top 3 opportunities, 4) next actions in 7 days.'
].join('\n');
await setProjectionStage(task, 'insights.generate', 'Generating portfolio AI insight');
await setProjectionStage(task, 'insights.generate', 'Generating portfolio AI insight', holdingsContext);
const analysis = await runAiAnalysis(
prompt,
'Act as a risk-aware buy-side analyst.',
{ workload: 'report' }
);
await setProjectionStage(task, 'insights.persist', 'Persisting generated portfolio insight');
await setProjectionStage(task, 'insights.persist', 'Persisting generated portfolio insight', holdingsContext);
await createPortfolioInsight({
userId,
provider: analysis.provider,
@@ -963,11 +1220,21 @@ async function processPortfolioInsights(task: Task) {
content: analysis.text
});
return {
const result = {
provider: analysis.provider,
model: analysis.model,
summary
};
return buildTaskOutcome(
result,
`Generated portfolio insight for ${summary.positions} holdings.`,
{
counters: {
holdings: summary.positions
}
}
);
}
export const __taskProcessorInternals = {
@@ -979,15 +1246,15 @@ export const __taskProcessorInternals = {
export async function runTaskProcessor(task: Task) {
switch (task.task_type) {
case 'sync_filings':
return toTaskResult(await processSyncFilings(task));
return await processSyncFilings(task);
case 'refresh_prices':
return toTaskResult(await processRefreshPrices(task));
return await processRefreshPrices(task);
case 'analyze_filing':
return toTaskResult(await processAnalyzeFiling(task));
return await processAnalyzeFiling(task);
case 'portfolio_insights':
return toTaskResult(await processPortfolioInsights(task));
return await processPortfolioInsights(task);
case 'index_search':
return toTaskResult(await processIndexSearch(task));
return await processIndexSearch(task);
default:
throw new Error(`Unsupported task type: ${task.task_type}`);
}

View File

@@ -15,6 +15,7 @@ import {
setTaskWorkflowRunId,
updateTaskNotificationState
} from '@/lib/server/repos/tasks';
import { buildTaskNotification } from '@/lib/server/task-notifications';
type EnqueueTaskInput = {
userId: string;
@@ -72,15 +73,21 @@ async function reconcileTaskWithWorkflow(task: Task) {
const updated = await setTaskStatusFromWorkflow(task.id, nextStatus, nextError);
return updated ?? {
const fallbackTask = {
...task,
status: nextStatus,
stage: nextStatus,
stage_detail: null,
stage_detail: nextStatus === 'failed' ? nextError : 'Workflow run completed.',
stage_context: null,
error: nextError,
finished_at: nextStatus === 'queued' || nextStatus === 'running'
? null
: task.finished_at ?? new Date().toISOString()
} satisfies Omit<Task, 'notification'>;
return updated ?? {
...fallbackTask,
notification: buildTaskNotification(fallbackTask)
};
} catch {
return task;

222
lib/task-workflow.ts Normal file
View File

@@ -0,0 +1,222 @@
import type {
Task,
TaskStage,
TaskStageEvent,
TaskType
} from '@/lib/types';
export type StageTimelineItem = {
stage: TaskStage;
label: string;
state: 'completed' | 'active' | 'pending';
detail: string | null;
timestamp: string | null;
context: Task['stage_context'] | null;
};
const TASK_TYPE_LABELS: Record<TaskType, string> = {
sync_filings: 'Filing sync',
refresh_prices: 'Price refresh',
analyze_filing: 'Filing analysis',
portfolio_insights: 'Portfolio insight',
index_search: 'Search indexing'
};
const STAGE_LABELS: Record<TaskStage, string> = {
queued: 'Queued',
running: 'Running',
completed: 'Completed',
failed: 'Failed',
'sync.fetch_filings': 'Fetch filings',
'sync.discover_assets': 'Discover taxonomy assets',
'sync.extract_taxonomy': 'Extract taxonomy',
'sync.normalize_taxonomy': 'Normalize taxonomy',
'sync.derive_metrics': 'Derive metrics',
'sync.validate_pdf_metrics': 'Validate PDF metrics',
'sync.persist_taxonomy': 'Persist taxonomy',
'sync.fetch_metrics': 'Fetch filing metrics',
'sync.persist_filings': 'Persist filings',
'sync.hydrate_statements': 'Hydrate statements',
'refresh.load_holdings': 'Load holdings',
'refresh.fetch_quotes': 'Fetch quotes',
'refresh.persist_prices': 'Persist prices',
'analyze.load_filing': 'Load filing',
'analyze.fetch_document': 'Fetch primary document',
'analyze.extract': 'Extract context',
'analyze.generate_report': 'Generate report',
'analyze.persist_report': 'Persist report',
'search.collect_sources': 'Collect sources',
'search.fetch_documents': 'Fetch documents',
'search.chunk': 'Chunk content',
'search.embed': 'Generate embeddings',
'search.persist': 'Persist search index',
'insights.load_holdings': 'Load holdings',
'insights.generate': 'Generate insight',
'insights.persist': 'Persist insight'
};
const TASK_STAGE_ORDER: Record<TaskType, TaskStage[]> = {
sync_filings: [
'queued',
'running',
'sync.fetch_filings',
'sync.persist_filings',
'sync.discover_assets',
'sync.extract_taxonomy',
'sync.normalize_taxonomy',
'sync.derive_metrics',
'sync.validate_pdf_metrics',
'sync.persist_taxonomy',
'completed'
],
refresh_prices: [
'queued',
'running',
'refresh.load_holdings',
'refresh.fetch_quotes',
'refresh.persist_prices',
'completed'
],
analyze_filing: [
'queued',
'running',
'analyze.load_filing',
'analyze.fetch_document',
'analyze.extract',
'analyze.generate_report',
'analyze.persist_report',
'completed'
],
index_search: [
'queued',
'running',
'search.collect_sources',
'search.fetch_documents',
'search.chunk',
'search.embed',
'search.persist',
'completed'
],
portfolio_insights: [
'queued',
'running',
'insights.load_holdings',
'insights.generate',
'insights.persist',
'completed'
]
};
export function taskTypeLabel(taskType: TaskType) {
return TASK_TYPE_LABELS[taskType];
}
export function stageLabel(stage: TaskStage) {
return STAGE_LABELS[stage] ?? stage;
}
export function taskStageOrder(taskType: TaskType) {
return TASK_STAGE_ORDER[taskType] ?? ['queued', 'running', 'completed'];
}
export function fallbackStageProgress(task: Pick<Task, 'task_type' | 'stage' | 'status'>) {
const orderedStages = taskStageOrder(task.task_type);
const stageIndex = orderedStages.indexOf(task.stage);
if (stageIndex === -1) {
return null;
}
if (task.status === 'completed') {
return {
current: orderedStages.length,
total: orderedStages.length,
unit: 'steps'
};
}
if (task.status === 'failed') {
return {
current: Math.max(stageIndex + 1, 1),
total: orderedStages.length,
unit: 'steps'
};
}
return {
current: Math.max(stageIndex + 1, 1),
total: orderedStages.length,
unit: 'steps'
};
}
export function buildStageTimeline(task: Task, events: TaskStageEvent[]): StageTimelineItem[] {
const baseOrder = taskStageOrder(task.task_type);
const orderedStages = [...baseOrder];
if (task.status === 'failed' && !orderedStages.includes('failed')) {
orderedStages.push('failed');
}
const latestEventByStage = new Map<TaskStage, TaskStageEvent>();
for (const event of events) {
latestEventByStage.set(event.stage, event);
}
return orderedStages.map((stage) => {
const event = latestEventByStage.get(stage);
if (task.status === 'queued' || task.status === 'running') {
if (stage === task.stage) {
return {
stage,
label: stageLabel(stage),
state: 'active' as const,
detail: event?.stage_detail ?? task.stage_detail,
timestamp: event?.created_at ?? task.updated_at,
context: task.stage_context ?? event?.stage_context ?? null
};
}
if (event) {
return {
stage,
label: stageLabel(stage),
state: 'completed' as const,
detail: event.stage_detail,
timestamp: event.created_at,
context: event.stage_context ?? null
};
}
return {
stage,
label: stageLabel(stage),
state: 'pending' as const,
detail: null,
timestamp: null,
context: null
};
}
if (stage === task.stage || event) {
return {
stage,
label: stageLabel(stage),
state: 'completed' as const,
detail: event?.stage_detail ?? task.stage_detail,
timestamp: event?.created_at ?? task.finished_at,
context: event?.stage_context ?? (stage === task.stage ? task.stage_context : null) ?? null
};
}
return {
stage,
label: stageLabel(stage),
state: 'pending' as const,
detail: null,
timestamp: null,
context: null
};
});
}

View File

@@ -152,6 +152,53 @@ export type TaskStage =
| 'insights.generate'
| 'insights.persist';
export type TaskStageContext = {
progress?: {
current: number;
total: number;
unit: string;
} | null;
counters?: Record<string, number>;
subject?: {
ticker?: string;
accessionNumber?: string;
label?: string;
} | null;
};
export type TaskNotificationStat = {
label: string;
value: string;
};
export type TaskNotificationAction = {
id:
| 'open_details'
| 'open_filings'
| 'open_analysis'
| 'open_analysis_report'
| 'open_search'
| 'open_portfolio';
label: string;
href: string | null;
primary?: boolean;
};
export type TaskNotificationView = {
title: string;
statusLine: string;
detailLine: string | null;
tone: 'info' | 'success' | 'error';
progress: {
current: number;
total: number;
unit: string;
percent: number | null;
} | null;
stats: TaskNotificationStat[];
actions: TaskNotificationAction[];
};
export type Task = {
id: string;
user_id: string;
@@ -159,6 +206,7 @@ export type Task = {
status: TaskStatus;
stage: TaskStage;
stage_detail: string | null;
stage_context: TaskStageContext | null;
resource_key: string | null;
notification_read_at: string | null;
notification_silenced_at: string | null;
@@ -172,6 +220,7 @@ export type Task = {
created_at: string;
updated_at: string;
finished_at: string | null;
notification: TaskNotificationView;
};
export type TaskStageEvent = {
@@ -180,6 +229,7 @@ export type TaskStageEvent = {
user_id: string;
stage: TaskStage;
stage_detail: string | null;
stage_context: TaskStageContext | null;
status: TaskStatus;
created_at: string;
};