feat: migrate task jobs to workflow notifications + timeline

This commit is contained in:
2026-03-02 14:29:31 -05:00
parent 36c4ed2ee2
commit d81a681905
33 changed files with 2437 additions and 292 deletions

View File

@@ -12,6 +12,8 @@ import type {
PortfolioInsight,
PortfolioSummary,
Task,
TaskStatus,
TaskTimeline,
User,
WatchlistItem
} from './types';
@@ -244,10 +246,29 @@ export async function getTask(taskId: string) {
return await unwrapData<{ task: Task }>(result, 'Unable to fetch task');
}
export async function listRecentTasks(limit = 20) {
export async function getTaskTimeline(taskId: string) {
const result = await client.api.tasks[taskId].timeline.get();
return await unwrapData<TaskTimeline>(result, 'Unable to fetch task timeline');
}
export async function updateTaskNotificationState(
taskId: string,
input: { read?: boolean; silenced?: boolean }
) {
const result = await client.api.tasks[taskId].notification.patch(input);
return await unwrapData<{ task: Task }>(result, 'Unable to update task notification state');
}
export async function listRecentTasks(input: {
limit?: number;
statuses?: TaskStatus[];
} = {}) {
const result = await client.api.tasks.get({
$query: {
limit
limit: input.limit ?? 20,
...(input.statuses && input.statuses.length > 0
? { status: input.statuses }
: {})
}
});

View File

@@ -16,5 +16,6 @@ export const queryKeys = {
portfolioSummary: () => ['portfolio', 'summary'] as const,
latestPortfolioInsight: () => ['portfolio', 'insights', 'latest'] as const,
task: (taskId: string) => ['tasks', 'detail', taskId] as const,
taskTimeline: (taskId: string) => ['tasks', 'timeline', taskId] as const,
recentTasks: (limit: number) => ['tasks', 'recent', limit] as const
};

View File

@@ -6,6 +6,7 @@ import {
getLatestPortfolioInsight,
getPortfolioSummary,
getTask,
getTaskTimeline,
listFilings,
listHoldings,
listRecentTasks,
@@ -126,10 +127,18 @@ export function taskQueryOptions(taskId: string) {
});
}
export function recentTasksQueryOptions(limit = 20) {
export function taskTimelineQueryOptions(taskId: string) {
return queryOptions({
queryKey: queryKeys.recentTasks(limit),
queryFn: () => listRecentTasks(limit),
queryKey: queryKeys.taskTimeline(taskId),
queryFn: () => getTaskTimeline(taskId),
staleTime: 5_000
});
}
export function recentTasksQueryOptions(limit = 20) {
return queryOptions({
queryKey: queryKeys.recentTasks(limit),
queryFn: () => listRecentTasks({ limit }),
staleTime: 5_000
});
}

View File

@@ -1,4 +1,5 @@
import { Elysia, t } from 'elysia';
import { getWorld } from 'workflow/runtime';
import type {
Filing,
FinancialHistoryWindow,
@@ -31,9 +32,12 @@ import {
import { getPriceHistory, getQuote } from '@/lib/server/prices';
import {
enqueueTask,
findInFlightTask,
getTaskById,
getTaskTimeline,
getTaskQueueSnapshot,
listRecentTasks
listRecentTasks,
updateTaskNotification
} from '@/lib/server/tasks';
const ALLOWED_STATUSES: TaskStatus[] = ['queued', 'running', 'completed', 'failed'];
@@ -120,7 +124,8 @@ async function queueAutoFilingSync(userId: string, ticker: string) {
ticker,
limit: AUTO_FILING_SYNC_LIMIT
},
priority: 90
priority: 90,
resourceKey: `sync_filings:${ticker}`
});
return true;
@@ -132,18 +137,63 @@ async function queueAutoFilingSync(userId: string, ticker: string) {
const authHandler = ({ request }: { request: Request }) => auth.handler(request);
async function checkWorkflowBackend() {
try {
const world = getWorld();
await world.runs.list({
pagination: { limit: 1 },
resolveData: 'none'
});
return { ok: true } as const;
} catch (error) {
return {
ok: false,
reason: asErrorMessage(error, 'Workflow backend unavailable')
} as const;
}
}
export const app = new Elysia({ prefix: '/api' })
.all('/auth', authHandler)
.all('/auth/*', authHandler)
.get('/health', async () => {
const queue = await getTaskQueueSnapshot();
try {
const [queue, workflowBackend] = await Promise.all([
getTaskQueueSnapshot(),
checkWorkflowBackend()
]);
return Response.json({
status: 'ok',
version: '4.0.0',
timestamp: new Date().toISOString(),
queue
});
if (!workflowBackend.ok) {
return Response.json({
status: 'degraded',
version: '4.0.0',
timestamp: new Date().toISOString(),
queue,
workflow: {
ok: false,
reason: workflowBackend.reason
}
}, { status: 503 });
}
return Response.json({
status: 'ok',
version: '4.0.0',
timestamp: new Date().toISOString(),
queue,
workflow: {
ok: true
}
});
} catch (error) {
return Response.json({
status: 'degraded',
version: '4.0.0',
timestamp: new Date().toISOString(),
error: asErrorMessage(error, 'Health check failed')
}, { status: 503 });
}
})
.get('/me', async () => {
const { session, response } = await requireAuthenticatedSession();
@@ -375,7 +425,8 @@ export const app = new Elysia({ prefix: '/api' })
userId: session.user.id,
taskType: 'refresh_prices',
payload: {},
priority: 80
priority: 80,
resourceKey: 'refresh_prices:portfolio'
});
return Response.json({ task });
@@ -394,7 +445,8 @@ export const app = new Elysia({ prefix: '/api' })
userId: session.user.id,
taskType: 'portfolio_insights',
payload: {},
priority: 70
priority: 70,
resourceKey: 'portfolio_insights:portfolio'
});
return Response.json({ task });
@@ -543,7 +595,8 @@ export const app = new Elysia({ prefix: '/api' })
ticker,
limit: defaultFinancialSyncLimit(window)
},
priority: 88
priority: 88,
resourceKey: `sync_filings:${ticker}`
});
queuedSync = true;
} catch (error) {
@@ -668,7 +721,8 @@ export const app = new Elysia({ prefix: '/api' })
ticker,
limit: Number.isFinite(limit) ? limit : 20
},
priority: 90
priority: 90,
resourceKey: `sync_filings:${ticker}`
});
return Response.json({ task });
@@ -693,11 +747,23 @@ export const app = new Elysia({ prefix: '/api' })
}
try {
const resourceKey = `analyze_filing:${accessionNumber}`;
const existing = await findInFlightTask(
session.user.id,
'analyze_filing',
resourceKey
);
if (existing) {
return Response.json({ task: existing });
}
const task = await enqueueTask({
userId: session.user.id,
taskType: 'analyze_filing',
payload: { accessionNumber },
priority: 65
priority: 65,
resourceKey
});
return Response.json({ task });
@@ -760,6 +826,56 @@ export const app = new Elysia({ prefix: '/api' })
params: t.Object({
taskId: t.String({ minLength: 1 })
})
})
.get('/tasks/:taskId/timeline', async ({ params }) => {
const { session, response } = await requireAuthenticatedSession();
if (response) {
return response;
}
const timeline = await getTaskTimeline(params.taskId, session.user.id);
if (!timeline) {
return jsonError('Task not found', 404);
}
return Response.json(timeline);
}, {
params: t.Object({
taskId: t.String({ minLength: 1 })
})
})
.patch('/tasks/:taskId/notification', async ({ params, body }) => {
const { session, response } = await requireAuthenticatedSession();
if (response) {
return response;
}
const payload = asRecord(body);
const read = typeof payload.read === 'boolean' ? payload.read : undefined;
const silenced = typeof payload.silenced === 'boolean' ? payload.silenced : undefined;
if (read === undefined && silenced === undefined) {
return jsonError('read or silenced must be provided');
}
const task = await updateTaskNotification(session.user.id, params.taskId, {
read,
silenced
});
if (!task) {
return jsonError('Task not found', 404);
}
return Response.json({ task });
}, {
params: t.Object({
taskId: t.String({ minLength: 1 })
}),
body: t.Object({
read: t.Optional(t.Boolean()),
silenced: t.Optional(t.Boolean())
})
});
export type App = typeof app;

View File

@@ -0,0 +1,319 @@
import {
afterAll,
beforeAll,
beforeEach,
describe,
expect,
it,
mock
} from 'bun:test';
import { mkdtempSync, readFileSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { WorkflowRunStatus } from '@workflow/world';
const TEST_USER_ID = 'e2e-user';
const TEST_USER_EMAIL = 'e2e@example.com';
const TEST_USER_NAME = 'E2E User';
const runStatuses = new Map<string, WorkflowRunStatus>();
let runCounter = 0;
let workflowBackendHealthy = true;
let tempDir: string | null = null;
let sqliteClient: { exec: (query: string) => void; close: () => void } | null = null;
let app: { handle: (request: Request) => Promise<Response> } | null = null;
mock.module('workflow/api', () => ({
start: mock(async () => {
runCounter += 1;
const runId = `run-${runCounter}`;
runStatuses.set(runId, 'pending');
return { runId };
}),
getRun: mock((runId: string) => ({
get status() {
return Promise.resolve(runStatuses.get(runId) ?? 'pending');
}
}))
}));
mock.module('workflow/runtime', () => ({
getWorld: () => ({
runs: {
list: async () => {
if (!workflowBackendHealthy) {
throw new Error('Workflow backend unavailable');
}
return {
data: []
};
}
}
})
}));
mock.module('@/lib/server/auth-session', () => ({
requireAuthenticatedSession: async () => ({
session: {
user: {
id: TEST_USER_ID,
email: TEST_USER_EMAIL,
name: TEST_USER_NAME,
image: null
}
},
response: null
})
}));
function resetDbSingletons() {
const globalState = globalThis as typeof globalThis & {
__fiscalSqliteClient?: { close?: () => void };
__fiscalDrizzleDb?: unknown;
};
globalState.__fiscalSqliteClient?.close?.();
globalState.__fiscalSqliteClient = undefined;
globalState.__fiscalDrizzleDb = undefined;
}
function applySqlMigrations(client: { exec: (query: string) => void }) {
const migrationFiles = [
'0000_cold_silver_centurion.sql',
'0001_glossy_statement_snapshots.sql',
'0002_workflow_task_projection_metadata.sql',
'0003_task_stage_event_timeline.sql'
];
for (const file of migrationFiles) {
const sql = readFileSync(join(process.cwd(), 'drizzle', file), 'utf8');
client.exec(sql);
}
}
function ensureTestUser(client: { exec: (query: string) => void }) {
const now = Date.now();
client.exec(`
INSERT OR REPLACE INTO user (
id, name, email, emailVerified, image, createdAt, updatedAt, role, banned, banReason, banExpires
) VALUES (
'${TEST_USER_ID}',
'${TEST_USER_NAME}',
'${TEST_USER_EMAIL}',
1,
NULL,
${now},
${now},
NULL,
0,
NULL,
NULL
);
`);
}
function clearProjectionTables(client: { exec: (query: string) => void }) {
client.exec('DELETE FROM task_stage_event;');
client.exec('DELETE FROM task_run;');
}
async function jsonRequest(
method: 'GET' | 'POST' | 'PATCH',
path: string,
body?: Record<string, unknown>
) {
if (!app) {
throw new Error('app not initialized');
}
const response = await app.handle(new Request(`http://localhost${path}`, {
method,
headers: body ? { 'content-type': 'application/json' } : undefined,
body: body ? JSON.stringify(body) : undefined
}));
return {
response,
json: await response.json()
};
}
if (process.env.RUN_TASK_WORKFLOW_E2E === '1') {
describe('task workflow hybrid migration e2e', () => {
beforeAll(async () => {
tempDir = mkdtempSync(join(tmpdir(), 'fiscal-task-e2e-'));
const env = process.env as Record<string, string | undefined>;
env.DATABASE_URL = `file:${join(tempDir, 'e2e.sqlite')}`;
env.NODE_ENV = 'test';
resetDbSingletons();
const dbModule = await import('@/lib/server/db');
sqliteClient = dbModule.getSqliteClient();
applySqlMigrations(sqliteClient);
ensureTestUser(sqliteClient);
const appModule = await import('./app');
app = appModule.app;
});
afterAll(() => {
resetDbSingletons();
if (tempDir) {
rmSync(tempDir, { recursive: true, force: true });
}
});
beforeEach(() => {
if (!sqliteClient) {
throw new Error('sqlite client not initialized');
}
clearProjectionTables(sqliteClient);
runStatuses.clear();
runCounter = 0;
workflowBackendHealthy = true;
});
it('queues multiple analyze jobs and suppresses duplicate in-flight analyze jobs', async () => {
const first = await jsonRequest('POST', '/api/filings/0000000000-26-000001/analyze');
expect(first.response.status).toBe(200);
const firstTaskId = (first.json as { task: { id: string } }).task.id;
const [second, third] = await Promise.all([
jsonRequest('POST', '/api/filings/0000000000-26-000002/analyze'),
jsonRequest('POST', '/api/filings/0000000000-26-000003/analyze')
]);
expect(second.response.status).toBe(200);
expect(third.response.status).toBe(200);
const duplicate = await jsonRequest('POST', '/api/filings/0000000000-26-000001/analyze');
expect(duplicate.response.status).toBe(200);
expect((duplicate.json as { task: { id: string } }).task.id).toBe(firstTaskId);
const tasksResponse = await jsonRequest('GET', '/api/tasks?limit=10');
expect(tasksResponse.response.status).toBe(200);
const tasks = (tasksResponse.json as {
tasks: Array<{
id: string;
status: string;
stage: string;
workflow_run_id?: string | null;
}>;
}).tasks;
expect(tasks.length).toBe(3);
expect(tasks.every((task) => task.status === 'queued')).toBe(true);
expect(tasks.every((task) => task.stage === 'queued')).toBe(true);
expect(tasks.every((task) => typeof task.workflow_run_id === 'string' && task.workflow_run_id.length > 0)).toBe(true);
});
it('updates notification read and silenced state via patch endpoint', async () => {
const created = await jsonRequest('POST', '/api/filings/0000000000-26-000010/analyze');
const taskId = (created.json as { task: { id: string } }).task.id;
const readUpdate = await jsonRequest('PATCH', `/api/tasks/${taskId}/notification`, { read: true });
expect(readUpdate.response.status).toBe(200);
const readTask = (readUpdate.json as {
task: {
notification_read_at: string | null;
notification_silenced_at: string | null;
};
}).task;
expect(readTask.notification_read_at).toBeTruthy();
expect(readTask.notification_silenced_at).toBeNull();
const silencedUpdate = await jsonRequest('PATCH', `/api/tasks/${taskId}/notification`, {
silenced: true
});
expect(silencedUpdate.response.status).toBe(200);
const silencedTask = (silencedUpdate.json as {
task: {
notification_read_at: string | null;
notification_silenced_at: string | null;
};
}).task;
expect(silencedTask.notification_read_at).toBeTruthy();
expect(silencedTask.notification_silenced_at).toBeTruthy();
const resetUpdate = await jsonRequest('PATCH', `/api/tasks/${taskId}/notification`, {
read: false,
silenced: false
});
expect(resetUpdate.response.status).toBe(200);
const resetTask = (resetUpdate.json as {
task: {
notification_read_at: string | null;
notification_silenced_at: string | null;
};
}).task;
expect(resetTask.notification_read_at).toBeNull();
expect(resetTask.notification_silenced_at).toBeNull();
});
it('reconciles workflow run status into projection state and degrades health when workflow backend is down', async () => {
const created = await jsonRequest('POST', '/api/filings/0000000000-26-000100/analyze');
const task = (created.json as {
task: { id: string; workflow_run_id: string };
}).task;
runStatuses.set(task.workflow_run_id, 'running');
const running = await jsonRequest('GET', `/api/tasks/${task.id}`);
expect(running.response.status).toBe(200);
const runningTask = (running.json as { task: { status: string; stage: string } }).task;
expect(runningTask.status).toBe('running');
expect(runningTask.stage).toBe('running');
runStatuses.set(task.workflow_run_id, 'completed');
const completed = await jsonRequest('GET', `/api/tasks/${task.id}`);
expect(completed.response.status).toBe(200);
const completedTask = (completed.json as {
task: {
status: string;
stage: string;
finished_at: string | null;
};
}).task;
expect(completedTask.status).toBe('completed');
expect(completedTask.stage).toBe('completed');
expect(completedTask.finished_at).toBeTruthy();
const timeline = await jsonRequest('GET', `/api/tasks/${task.id}/timeline`);
expect(timeline.response.status).toBe(200);
const events = (timeline.json as {
events: Array<{
stage: string;
status: string;
}>;
}).events;
expect(events.length).toBeGreaterThanOrEqual(3);
expect(events.some((event) => event.status === 'queued')).toBe(true);
expect(events.some((event) => event.status === 'running')).toBe(true);
expect(events.some((event) => event.status === 'completed')).toBe(true);
const healthy = await jsonRequest('GET', '/api/health');
expect(healthy.response.status).toBe(200);
expect((healthy.json as { status: string; workflow: { ok: boolean } }).status).toBe('ok');
expect((healthy.json as { status: string; workflow: { ok: boolean } }).workflow.ok).toBe(true);
workflowBackendHealthy = false;
const degraded = await jsonRequest('GET', '/api/health');
expect(degraded.response.status).toBe(503);
expect((degraded.json as {
status: string;
workflow: { ok: boolean; reason: string };
}).status).toBe('degraded');
expect((degraded.json as {
status: string;
workflow: { ok: boolean; reason: string };
}).workflow.ok).toBe(false);
});
});
}

View File

@@ -288,6 +288,11 @@ export const taskRun = sqliteTable('task_run', {
user_id: text('user_id').notNull().references(() => user.id, { onDelete: 'cascade' }),
task_type: text('task_type').$type<'sync_filings' | 'refresh_prices' | 'analyze_filing' | 'portfolio_insights'>().notNull(),
status: text('status').$type<'queued' | 'running' | 'completed' | 'failed'>().notNull(),
stage: text('stage').notNull(),
stage_detail: text('stage_detail'),
resource_key: text('resource_key'),
notification_read_at: text('notification_read_at'),
notification_silenced_at: text('notification_silenced_at'),
priority: integer('priority').notNull(),
payload: text('payload', { mode: 'json' }).$type<Record<string, unknown>>().notNull(),
result: text('result', { mode: 'json' }).$type<Record<string, unknown> | null>(),
@@ -301,9 +306,29 @@ export const taskRun = sqliteTable('task_run', {
}, (table) => ({
taskUserCreatedIndex: index('task_user_created_idx').on(table.user_id, table.created_at),
taskStatusIndex: index('task_status_idx').on(table.status),
taskUserResourceStatusIndex: index('task_user_resource_status_idx').on(
table.user_id,
table.task_type,
table.resource_key,
table.status,
table.created_at
),
taskWorkflowRunUnique: uniqueIndex('task_workflow_run_uidx').on(table.workflow_run_id)
}));
export const taskStageEvent = sqliteTable('task_stage_event', {
id: integer('id').primaryKey({ autoIncrement: true }),
task_id: text('task_id').notNull().references(() => taskRun.id, { onDelete: 'cascade' }),
user_id: text('user_id').notNull().references(() => user.id, { onDelete: 'cascade' }),
stage: text('stage').notNull(),
stage_detail: text('stage_detail'),
status: text('status').$type<'queued' | 'running' | 'completed' | 'failed'>().notNull(),
created_at: text('created_at').notNull()
}, (table) => ({
taskStageEventTaskCreatedIndex: index('task_stage_event_task_created_idx').on(table.task_id, table.created_at),
taskStageEventUserCreatedIndex: index('task_stage_event_user_created_idx').on(table.user_id, table.created_at)
}));
export const portfolioInsight = sqliteTable('portfolio_insight', {
id: integer('id').primaryKey({ autoIncrement: true }),
user_id: text('user_id').notNull().references(() => user.id, { onDelete: 'cascade' }),
@@ -332,6 +357,7 @@ export const appSchema = {
filingStatementSnapshot,
filingLink,
taskRun,
taskStageEvent,
portfolioInsight
};

View File

@@ -1,9 +1,10 @@
import { and, desc, eq, inArray, sql } from 'drizzle-orm';
import type { Task, TaskStatus, TaskType } from '@/lib/types';
import { and, asc, desc, eq, inArray, sql } from 'drizzle-orm';
import type { Task, TaskStage, TaskStageEvent, TaskStatus, TaskType } from '@/lib/types';
import { db } from '@/lib/server/db';
import { taskRun } from '@/lib/server/db/schema';
import { taskRun, taskStageEvent } from '@/lib/server/db/schema';
type TaskRow = typeof taskRun.$inferSelect;
type TaskStageEventRow = typeof taskStageEvent.$inferSelect;
type CreateTaskInput = {
id: string;
@@ -12,14 +13,36 @@ type CreateTaskInput = {
payload: Record<string, unknown>;
priority: number;
max_attempts: number;
resource_key?: string | null;
};
type UpdateTaskNotificationStateInput = {
read?: boolean;
silenced?: boolean;
};
type EventInsertInput = {
task_id: string;
user_id: string;
stage: TaskStage;
stage_detail: string | null;
status: TaskStatus;
created_at: string;
};
type InsertExecutor = Pick<typeof db, 'insert'>;
function toTask(row: TaskRow): Task {
return {
id: row.id,
user_id: row.user_id,
task_type: row.task_type,
status: row.status,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
resource_key: row.resource_key,
notification_read_at: row.notification_read_at,
notification_silenced_at: row.notification_silenced_at,
priority: row.priority,
payload: row.payload,
result: row.result,
@@ -33,30 +56,84 @@ function toTask(row: TaskRow): Task {
};
}
function toTaskStageEvent(row: TaskStageEventRow): TaskStageEvent {
return {
id: row.id,
task_id: row.task_id,
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
status: row.status as TaskStatus,
created_at: row.created_at
};
}
function statusToStage(status: TaskStatus): TaskStage {
switch (status) {
case 'queued':
return 'queued';
case 'running':
return 'running';
case 'completed':
return 'completed';
case 'failed':
return 'failed';
default:
return 'failed';
}
}
async function insertTaskStageEvent(executor: InsertExecutor, input: EventInsertInput) {
await executor.insert(taskStageEvent).values({
task_id: input.task_id,
user_id: input.user_id,
stage: input.stage,
stage_detail: input.stage_detail,
status: input.status,
created_at: input.created_at
});
}
export async function createTaskRunRecord(input: CreateTaskInput) {
const now = new Date().toISOString();
const [row] = await db
.insert(taskRun)
.values({
id: input.id,
user_id: input.user_id,
task_type: input.task_type,
status: 'queued',
priority: input.priority,
payload: input.payload,
result: null,
error: null,
attempts: 0,
max_attempts: input.max_attempts,
workflow_run_id: null,
created_at: now,
updated_at: now,
finished_at: null
})
.returning();
return await db.transaction(async (tx) => {
const [row] = await tx
.insert(taskRun)
.values({
id: input.id,
user_id: input.user_id,
task_type: input.task_type,
status: 'queued',
stage: 'queued',
stage_detail: null,
resource_key: input.resource_key ?? null,
notification_read_at: null,
notification_silenced_at: null,
priority: input.priority,
payload: input.payload,
result: null,
error: null,
attempts: 0,
max_attempts: input.max_attempts,
workflow_run_id: null,
created_at: now,
updated_at: now,
finished_at: null
})
.returning();
return toTask(row);
await insertTaskStageEvent(tx, {
task_id: row.id,
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
status: row.status,
created_at: now
});
return toTask(row);
});
}
export async function setTaskWorkflowRunId(taskId: string, workflowRunId: string) {
@@ -121,67 +198,268 @@ export async function countTasksByStatus() {
return queue;
}
export async function claimQueuedTask(taskId: string) {
export async function findInFlightTaskByResourceKey(
userId: string,
taskType: TaskType,
resourceKey: string
) {
const [row] = await db
.update(taskRun)
.set({
status: 'running',
attempts: sql`${taskRun.attempts} + 1`,
updated_at: new Date().toISOString()
})
.where(and(eq(taskRun.id, taskId), eq(taskRun.status, 'queued')))
.returning();
.select()
.from(taskRun)
.where(and(
eq(taskRun.user_id, userId),
eq(taskRun.task_type, taskType),
eq(taskRun.resource_key, resourceKey),
inArray(taskRun.status, ['queued', 'running'])
))
.orderBy(desc(taskRun.created_at))
.limit(1);
return row ? toTask(row) : null;
}
export async function markTaskRunning(taskId: string) {
const now = new Date().toISOString();
return await db.transaction(async (tx) => {
const [row] = await tx
.update(taskRun)
.set({
status: 'running',
stage: 'running',
stage_detail: 'Workflow task is now running',
attempts: sql`${taskRun.attempts} + 1`,
updated_at: now,
finished_at: null
})
.where(eq(taskRun.id, taskId))
.returning();
if (!row) {
return null;
}
await insertTaskStageEvent(tx, {
task_id: row.id,
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
status: row.status,
created_at: now
});
return toTask(row);
});
}
export async function updateTaskStage(taskId: string, stage: TaskStage, detail: string | null = null) {
const now = new Date().toISOString();
return await db.transaction(async (tx) => {
const [row] = await tx
.update(taskRun)
.set({
stage,
stage_detail: detail,
updated_at: now
})
.where(eq(taskRun.id, taskId))
.returning();
if (!row) {
return null;
}
await insertTaskStageEvent(tx, {
task_id: row.id,
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
status: row.status,
created_at: now
});
return toTask(row);
});
}
export async function completeTask(taskId: string, result: Record<string, unknown>) {
const now = new Date().toISOString();
return await db.transaction(async (tx) => {
const [row] = await tx
.update(taskRun)
.set({
status: 'completed',
stage: 'completed',
stage_detail: null,
result,
error: null,
updated_at: now,
finished_at: now
})
.where(eq(taskRun.id, taskId))
.returning();
if (!row) {
return null;
}
await insertTaskStageEvent(tx, {
task_id: row.id,
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
status: row.status,
created_at: now
});
return toTask(row);
});
}
export async function markTaskFailure(taskId: string, reason: string, stage: TaskStage = 'failed') {
const now = new Date().toISOString();
return await db.transaction(async (tx) => {
const [row] = await tx
.update(taskRun)
.set({
status: 'failed',
stage,
stage_detail: null,
error: reason,
updated_at: now,
finished_at: now
})
.where(eq(taskRun.id, taskId))
.returning();
if (!row) {
return null;
}
await insertTaskStageEvent(tx, {
task_id: row.id,
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
status: row.status,
created_at: now
});
return toTask(row);
});
}
export async function setTaskStatusFromWorkflow(
taskId: string,
status: TaskStatus,
error?: string | null
) {
const isTerminal = status === 'completed' || status === 'failed';
const nextStage = statusToStage(status);
const nextError = status === 'failed' ? (error ?? 'Workflow run failed') : null;
return await db.transaction(async (tx) => {
const [current] = await tx
.select()
.from(taskRun)
.where(eq(taskRun.id, taskId))
.limit(1);
if (!current) {
return null;
}
const hasNoStateChange = current.status === status
&& current.stage === nextStage
&& (current.error ?? null) === nextError
&& current.stage_detail === null
&& (isTerminal ? current.finished_at !== null : current.finished_at === null);
if (hasNoStateChange) {
return toTask(current);
}
const now = new Date().toISOString();
const [row] = await tx
.update(taskRun)
.set({
status,
stage: nextStage,
stage_detail: null,
error: nextError,
updated_at: now,
finished_at: isTerminal ? now : null
})
.where(eq(taskRun.id, taskId))
.returning();
if (!row) {
return null;
}
await insertTaskStageEvent(tx, {
task_id: row.id,
user_id: row.user_id,
stage: row.stage as TaskStage,
stage_detail: row.stage_detail,
status: row.status,
created_at: now
});
return toTask(row);
});
}
export async function updateTaskNotificationState(
taskId: string,
userId: string,
input: UpdateTaskNotificationStateInput
) {
const now = new Date().toISOString();
const patch: Partial<typeof taskRun.$inferInsert> = {
updated_at: now
};
let hasMutation = false;
if (typeof input.read === 'boolean') {
patch.notification_read_at = input.read ? now : null;
hasMutation = true;
}
if (typeof input.silenced === 'boolean') {
patch.notification_silenced_at = input.silenced ? now : null;
hasMutation = true;
if (input.silenced) {
patch.notification_read_at = now;
}
}
if (!hasMutation) {
return await getTaskByIdForUser(taskId, userId);
}
const [row] = await db
.update(taskRun)
.set({
status: 'completed',
result,
error: null,
updated_at: new Date().toISOString(),
finished_at: new Date().toISOString()
})
.where(eq(taskRun.id, taskId))
.set(patch)
.where(and(eq(taskRun.id, taskId), eq(taskRun.user_id, userId)))
.returning();
return row ? toTask(row) : null;
}
export async function markTaskFailure(taskId: string, reason: string) {
const [current] = await db
export async function listTaskStageEventsForTask(taskId: string, userId: string) {
const rows = await db
.select()
.from(taskRun)
.where(eq(taskRun.id, taskId))
.limit(1);
.from(taskStageEvent)
.where(and(eq(taskStageEvent.task_id, taskId), eq(taskStageEvent.user_id, userId)))
.orderBy(asc(taskStageEvent.created_at), asc(taskStageEvent.id));
if (!current) {
return {
task: null,
shouldRetry: false
};
}
const shouldRetry = current.attempts < current.max_attempts;
const [updated] = await db
.update(taskRun)
.set({
status: shouldRetry ? 'queued' : 'failed',
error: reason,
updated_at: new Date().toISOString(),
finished_at: shouldRetry ? null : new Date().toISOString()
})
.where(eq(taskRun.id, taskId))
.returning();
return {
task: updated ? toTask(updated) : null,
shouldRetry
};
return rows.map(toTaskStageEvent);
}
export async function getTaskById(taskId: string) {

View File

@@ -3,7 +3,8 @@ import type {
FilingExtraction,
FilingExtractionMeta,
Holding,
Task
Task,
TaskStage
} from '@/lib/types';
import { runAiAnalysis } from '@/lib/server/ai';
import { buildPortfolioSummary } from '@/lib/server/portfolio';
@@ -24,6 +25,7 @@ import {
listUserHoldings
} from '@/lib/server/repos/holdings';
import { createPortfolioInsight } from '@/lib/server/repos/insights';
import { updateTaskStage } from '@/lib/server/repos/tasks';
import {
fetchFilingMetricsForFilings,
fetchPrimaryFilingText,
@@ -130,6 +132,10 @@ function toTaskResult(value: unknown): Record<string, unknown> {
return value as Record<string, unknown>;
}
async function setProjectionStage(task: Task, stage: TaskStage, detail: string | null = null) {
await updateTaskStage(task.id, stage, detail);
}
function parseTicker(raw: unknown) {
if (typeof raw !== 'string' || raw.trim().length < 1) {
throw new Error('Ticker is required');
@@ -513,6 +519,8 @@ function filingLinks(filing: {
async function processSyncFilings(task: Task) {
const ticker = parseTicker(task.payload.ticker);
const limit = parseLimit(task.payload.limit, 20, 1, 50);
await setProjectionStage(task, 'sync.fetch_filings', `Fetching up to ${limit} filings for ${ticker}`);
const filings = await fetchRecentFilings(ticker, limit);
const metricsByAccession = new Map<string, Filing['metrics']>();
const filingsByCik = new Map<string, typeof filings>();
@@ -527,6 +535,7 @@ async function processSyncFilings(task: Task) {
filingsByCik.set(filing.cik, [filing]);
}
await setProjectionStage(task, 'sync.fetch_metrics', `Computing financial metrics for ${filings.length} filings`);
for (const [cik, filingsForCik] of filingsByCik) {
const filingsForFinancialMetrics = filingsForCik.filter((filing) => isFinancialMetricsForm(filing.filingType));
if (filingsForFinancialMetrics.length === 0) {
@@ -548,6 +557,7 @@ async function processSyncFilings(task: Task) {
}
}
await setProjectionStage(task, 'sync.persist_filings', 'Persisting filings and links');
const saveResult = await upsertFilingsRecords(
filings.map((filing) => ({
ticker: filing.ticker,
@@ -574,6 +584,7 @@ async function processSyncFilings(task: Task) {
return filing.filing_type === '10-K' || filing.filing_type === '10-Q';
});
await setProjectionStage(task, 'sync.hydrate_statements', `Hydrating statement snapshots for ${hydrateCandidates.length} candidate filings`);
for (const filing of hydrateCandidates) {
const existingSnapshot = await getFilingStatementSnapshotByFilingId(filing.id);
const shouldRefresh = !existingSnapshot
@@ -634,15 +645,18 @@ async function processRefreshPrices(task: Task) {
throw new Error('Task is missing user scope');
}
await setProjectionStage(task, 'refresh.load_holdings', 'Loading holdings for price refresh');
const userHoldings = await listHoldingsForPriceRefresh(userId);
const tickers = [...new Set(userHoldings.map((entry) => entry.ticker))];
const quotes = new Map<string, number>();
await setProjectionStage(task, 'refresh.fetch_quotes', `Fetching quotes for ${tickers.length} tickers`);
for (const ticker of tickers) {
const quote = await getQuote(ticker);
quotes.set(ticker, quote);
}
await setProjectionStage(task, 'refresh.persist_prices', 'Writing refreshed prices to holdings');
const updatedCount = await applyRefreshedPrices(userId, quotes, new Date().toISOString());
return {
@@ -660,6 +674,7 @@ async function processAnalyzeFiling(task: Task) {
throw new Error('accessionNumber is required');
}
await setProjectionStage(task, 'analyze.load_filing', `Loading filing ${accessionNumber}`);
const filing = await getFilingByAccession(accessionNumber);
if (!filing) {
@@ -676,6 +691,7 @@ async function processAnalyzeFiling(task: Task) {
};
try {
await setProjectionStage(task, 'analyze.fetch_document', 'Fetching primary filing document');
const filingDocument = await fetchPrimaryFilingText({
filingUrl: filing.filing_url,
cik: filing.cik,
@@ -684,6 +700,7 @@ async function processAnalyzeFiling(task: Task) {
});
if (filingDocument?.text) {
await setProjectionStage(task, 'analyze.extract', 'Generating extraction context from filing text');
const ruleBasedExtraction = buildRuleBasedExtraction(filing, filingDocument.text);
extraction = ruleBasedExtraction;
extractionMeta = {
@@ -720,12 +737,14 @@ async function processAnalyzeFiling(task: Task) {
};
}
await setProjectionStage(task, 'analyze.generate_report', 'Generating final filing analysis report');
const analysis = await runAiAnalysis(
reportPrompt(filing, extraction, extractionMeta),
'Use concise institutional analyst language.',
{ workload: 'report' }
);
await setProjectionStage(task, 'analyze.persist_report', 'Persisting filing analysis output');
await saveFilingAnalysis(accessionNumber, {
provider: analysis.provider,
model: analysis.model,
@@ -761,6 +780,7 @@ async function processPortfolioInsights(task: Task) {
throw new Error('Task is missing user scope');
}
await setProjectionStage(task, 'insights.load_holdings', 'Loading holdings for portfolio insight generation');
const userHoldings = await listUserHoldings(userId);
const summary = buildPortfolioSummary(userHoldings);
@@ -771,12 +791,14 @@ async function processPortfolioInsights(task: Task) {
'Respond with: 1) health score (0-100), 2) top 3 risks, 3) top 3 opportunities, 4) next actions in 7 days.'
].join('\n');
await setProjectionStage(task, 'insights.generate', 'Generating portfolio AI insight');
const analysis = await runAiAnalysis(
prompt,
'Act as a risk-aware buy-side analyst.',
{ workload: 'report' }
);
await setProjectionStage(task, 'insights.persist', 'Persisting generated portfolio insight');
await createPortfolioInsight({
userId,
provider: analysis.provider,

View File

@@ -1,14 +1,19 @@
import { randomUUID } from 'node:crypto';
import { start } from 'workflow/api';
import type { Task, TaskStatus, TaskType } from '@/lib/types';
import { getRun, start } from 'workflow/api';
import type { WorkflowRunStatus } from '@workflow/world';
import type { Task, TaskStatus, TaskTimeline, TaskType } from '@/lib/types';
import { runTaskWorkflow } from '@/app/workflows/task-runner';
import {
countTasksByStatus,
createTaskRunRecord,
findInFlightTaskByResourceKey,
getTaskByIdForUser,
listTaskStageEventsForTask,
listRecentTasksForUser,
markTaskFailure,
setTaskWorkflowRunId
setTaskStatusFromWorkflow,
setTaskWorkflowRunId,
updateTaskNotificationState
} from '@/lib/server/repos/tasks';
type EnqueueTaskInput = {
@@ -17,8 +22,71 @@ type EnqueueTaskInput = {
payload?: Record<string, unknown>;
priority?: number;
maxAttempts?: number;
resourceKey?: string;
};
type UpdateTaskNotificationInput = {
read?: boolean;
silenced?: boolean;
};
function mapWorkflowStatus(status: WorkflowRunStatus): TaskStatus {
switch (status) {
case 'pending':
return 'queued';
case 'running':
return 'running';
case 'completed':
return 'completed';
case 'failed':
case 'cancelled':
return 'failed';
default:
return 'failed';
}
}
function isProjectionPendingSync(task: Task) {
return task.status === 'queued' || task.status === 'running';
}
async function reconcileTaskWithWorkflow(task: Task) {
if (!task.workflow_run_id || !isProjectionPendingSync(task)) {
return task;
}
try {
const run = getRun(task.workflow_run_id);
const workflowStatus = await run.status;
const nextStatus = mapWorkflowStatus(workflowStatus);
if (nextStatus === task.status) {
return task;
}
const nextError = nextStatus === 'failed'
? workflowStatus === 'cancelled'
? 'Workflow run cancelled'
: 'Workflow run failed'
: null;
const updated = await setTaskStatusFromWorkflow(task.id, nextStatus, nextError);
return updated ?? {
...task,
status: nextStatus,
stage: nextStatus,
stage_detail: null,
error: nextError,
finished_at: nextStatus === 'queued' || nextStatus === 'running'
? null
: task.finished_at ?? new Date().toISOString()
};
} catch {
return task;
}
}
export async function enqueueTask(input: EnqueueTaskInput) {
const task = await createTaskRunRecord({
id: randomUUID(),
@@ -26,7 +94,8 @@ export async function enqueueTask(input: EnqueueTaskInput) {
task_type: input.taskType,
payload: input.payload ?? {},
priority: input.priority ?? 50,
max_attempts: input.maxAttempts ?? 3
max_attempts: input.maxAttempts ?? 3,
resource_key: input.resourceKey ?? null
});
try {
@@ -41,17 +110,61 @@ export async function enqueueTask(input: EnqueueTaskInput) {
const reason = error instanceof Error
? error.message
: 'Failed to start workflow';
await markTaskFailure(task.id, reason);
await markTaskFailure(task.id, reason, 'failed');
throw error;
}
}
export async function findInFlightTask(userId: string, taskType: TaskType, resourceKey: string) {
const task = await findInFlightTaskByResourceKey(userId, taskType, resourceKey);
if (!task) {
return null;
}
return await reconcileTaskWithWorkflow(task);
}
export async function getTaskById(taskId: string, userId: string) {
return await getTaskByIdForUser(taskId, userId);
const task = await getTaskByIdForUser(taskId, userId);
if (!task) {
return null;
}
return await reconcileTaskWithWorkflow(task);
}
export async function listRecentTasks(userId: string, limit = 20, statuses?: TaskStatus[]) {
return await listRecentTasksForUser(userId, limit, statuses);
const tasks = await listRecentTasksForUser(userId, limit, statuses);
return await Promise.all(tasks.map((task) => reconcileTaskWithWorkflow(task)));
}
export async function updateTaskNotification(
userId: string,
taskId: string,
input: UpdateTaskNotificationInput
) {
const task = await updateTaskNotificationState(taskId, userId, input);
if (!task) {
return null;
}
return await reconcileTaskWithWorkflow(task);
}
export async function getTaskTimeline(taskId: string, userId: string): Promise<TaskTimeline | null> {
const task = await getTaskById(taskId, userId);
if (!task) {
return null;
}
const events = await listTaskStageEventsForTask(taskId, userId);
return {
task,
events
};
}
export async function getTaskQueueSnapshot() {

View File

@@ -90,12 +90,37 @@ export type Filing = {
export type TaskStatus = 'queued' | 'running' | 'completed' | 'failed';
export type TaskType = 'sync_filings' | 'refresh_prices' | 'analyze_filing' | 'portfolio_insights';
export type TaskStage =
| 'queued'
| 'running'
| 'completed'
| 'failed'
| 'sync.fetch_filings'
| 'sync.fetch_metrics'
| 'sync.persist_filings'
| 'sync.hydrate_statements'
| 'refresh.load_holdings'
| 'refresh.fetch_quotes'
| 'refresh.persist_prices'
| 'analyze.load_filing'
| 'analyze.fetch_document'
| 'analyze.extract'
| 'analyze.generate_report'
| 'analyze.persist_report'
| 'insights.load_holdings'
| 'insights.generate'
| 'insights.persist';
export type Task = {
id: string;
user_id: string;
task_type: TaskType;
status: TaskStatus;
stage: TaskStage;
stage_detail: string | null;
resource_key: string | null;
notification_read_at: string | null;
notification_silenced_at: string | null;
priority: number;
payload: Record<string, unknown>;
result: Record<string, unknown> | null;
@@ -108,6 +133,21 @@ export type Task = {
finished_at: string | null;
};
export type TaskStageEvent = {
id: number;
task_id: string;
user_id: string;
stage: TaskStage;
stage_detail: string | null;
status: TaskStatus;
created_at: string;
};
export type TaskTimeline = {
task: Task;
events: TaskStageEvent[];
};
export type PortfolioInsight = {
id: number;
user_id: string;