From f2c25fb9c61234433816628af7e93f1a70727131 Mon Sep 17 00:00:00 2001 From: francy51 Date: Mon, 9 Mar 2026 23:51:37 -0400 Subject: [PATCH] Improve workflow error messaging --- app/workflows/task-runner.ts | 15 +- .../notifications/task-detail-modal.tsx | 8 +- .../api/task-workflow-hybrid.e2e.test.ts | 10 +- lib/server/repos/tasks.test.ts | 8 +- lib/server/repos/tasks.ts | 12 +- lib/server/task-errors.test.ts | 56 +++++++ lib/server/task-errors.ts | 154 ++++++++++++++++++ lib/server/task-notifications.test.ts | 9 +- lib/server/task-notifications.ts | 6 +- lib/server/tasks.ts | 31 ++-- lib/task-workflow.ts | 4 +- 11 files changed, 272 insertions(+), 41 deletions(-) create mode 100644 lib/server/task-errors.test.ts create mode 100644 lib/server/task-errors.ts diff --git a/app/workflows/task-runner.ts b/app/workflows/task-runner.ts index 6d98f7f..159c234 100644 --- a/app/workflows/task-runner.ts +++ b/app/workflows/task-runner.ts @@ -1,4 +1,5 @@ import { runTaskProcessor, type TaskExecutionOutcome } from '@/lib/server/task-processors'; +import { describeTaskFailure } from '@/lib/server/task-errors'; import { completeTask, getTaskById, @@ -26,11 +27,9 @@ export async function runTaskWorkflow(taskId: string) { const outcome = await processTaskStep(refreshedTask); await completeTaskStep(task.id, outcome); } catch (error) { - const reason = error instanceof Error - ? error.message - : 'Task failed unexpectedly'; const latestTask = await loadTaskStep(task.id); - await markTaskFailureStep(task.id, reason, latestTask); + const failure = describeTaskFailure(latestTask ?? task, error); + await markTaskFailureStep(task.id, failure, latestTask ?? task); throw error; } } @@ -63,10 +62,10 @@ async function completeTaskStep(taskId: string, outcome: TaskExecutionOutcome) { }); } -async function markTaskFailureStep(taskId: string, reason: string, latestTask: Task | null) { +async function markTaskFailureStep(taskId: string, failure: { summary: string; detail: string }, latestTask: Task) { 'use step'; - await markTaskFailure(taskId, reason, 'failed', { - detail: reason, - context: latestTask?.stage_context ?? null + await markTaskFailure(taskId, failure.detail, latestTask.stage === 'completed' ? 'failed' : latestTask.stage, { + detail: failure.summary, + context: latestTask.stage_context ?? null }); } diff --git a/components/notifications/task-detail-modal.tsx b/components/notifications/task-detail-modal.tsx index af09a61..dc1dfde 100644 --- a/components/notifications/task-detail-modal.tsx +++ b/components/notifications/task-detail-modal.tsx @@ -125,10 +125,14 @@ export function TaskDetailModal({ isOpen, taskId, onClose }: TaskDetailModalProp const [expandedStage, setExpandedStage] = useState(null); const defaultExpandedStage = useMemo(() => { - if (task?.status === 'completed' || task?.status === 'failed') { + if (task?.status === 'completed') { return null; } + if (task?.status === 'failed') { + return task.stage; + } + for (const item of timeline) { if (item.state === 'active') { return item.stage; @@ -278,6 +282,8 @@ export function TaskDetailModal({ isOpen, taskId, onClose }: TaskDetailModalProp
; }; }; }).task; - expect(failedTask.notification.detailLine).toBe('Primary filing document fetch failed.'); + expect(failedTask.notification.statusLine).toBe('Failed during fetch primary document'); + expect(failedTask.notification.detailLine).toBe('Could not load the primary filing document.'); expect(failedTask.notification.actions.some((action) => action.id === 'open_filings')).toBe(true); }); diff --git a/lib/server/repos/tasks.test.ts b/lib/server/repos/tasks.test.ts index 4c3e422..4940512 100644 --- a/lib/server/repos/tasks.test.ts +++ b/lib/server/repos/tasks.test.ts @@ -202,8 +202,8 @@ describe('task repos', () => { resource_key: 'index_search:ticker:AAPL' }); - await tasksRepo.markTaskFailure(failedTask.id, 'Embedding request failed', 'failed', { - detail: 'Embedding request failed', + await tasksRepo.markTaskFailure(failedTask.id, 'Search indexing could not generate embeddings for AAPL · doc-2. The AI provider returned an empty response.', 'search.embed', { + detail: 'Embedding generation failed.', context: { progress: { current: 2, total: 5, unit: 'sources' }, counters: { chunksEmbedded: 20 }, @@ -216,7 +216,9 @@ describe('task repos', () => { expect(completed?.stage_detail).toContain('Analysis report generated'); expect(completed?.stage_context?.subject?.ticker).toBe('AAPL'); - expect(failed?.stage_detail).toBe('Embedding request failed'); + expect(failed?.stage).toBe('search.embed'); + expect(failed?.stage_detail).toBe('Embedding generation failed.'); + expect(failed?.error).toContain('Search indexing could not generate embeddings'); expect(failed?.stage_context?.progress?.current).toBe(2); }); }); diff --git a/lib/server/repos/tasks.ts b/lib/server/repos/tasks.ts index 1d84a35..86fd1c7 100644 --- a/lib/server/repos/tasks.ts +++ b/lib/server/repos/tasks.ts @@ -406,11 +406,13 @@ export async function markTaskFailure( export async function setTaskStatusFromWorkflow( taskId: string, status: TaskStatus, - error?: string | null + error?: string | null, + detail?: string | null ) { const isTerminal = status === 'completed' || status === 'failed'; const nextStage = statusToStage(status); const nextError = status === 'failed' ? (error ?? 'Workflow run failed') : null; + const nextDetail = nextStatusDetail(status, nextError, detail); return await db.transaction(async (tx) => { const [current] = await tx @@ -426,7 +428,7 @@ export async function setTaskStatusFromWorkflow( const hasNoStateChange = current.status === status && current.stage === nextStage && (current.error ?? null) === nextError - && (current.stage_detail ?? null) === (nextStatusDetail(status, nextError) ?? null) + && (current.stage_detail ?? null) === (nextDetail ?? null) && (current.stage_context ?? null) === null && (isTerminal ? current.finished_at !== null : current.finished_at === null); @@ -440,7 +442,7 @@ export async function setTaskStatusFromWorkflow( .set({ status, stage: nextStage, - stage_detail: nextStatusDetail(status, nextError), + stage_detail: nextDetail, stage_context: null, error: nextError, updated_at: now, @@ -506,9 +508,9 @@ export async function updateTaskNotificationState( return row ? toTask(row) : null; } -function nextStatusDetail(status: TaskStatus, error?: string | null) { +function nextStatusDetail(status: TaskStatus, error?: string | null, detail?: string | null) { if (status === 'failed') { - return error ?? 'Workflow run failed'; + return detail ?? error ?? 'Workflow run failed'; } if (status === 'completed') { diff --git a/lib/server/task-errors.test.ts b/lib/server/task-errors.test.ts new file mode 100644 index 0000000..c152c68 --- /dev/null +++ b/lib/server/task-errors.test.ts @@ -0,0 +1,56 @@ +import { describe, expect, it } from 'bun:test'; +import { describeTaskFailure } from '@/lib/server/task-errors'; + +describe('task error formatter', () => { + it('formats missing AI credentials for search indexing', () => { + const failure = describeTaskFailure({ + task_type: 'index_search', + stage: 'search.embed', + stage_context: { + subject: { + ticker: 'AAPL', + label: 'doc-2' + } + }, + payload: { + ticker: 'AAPL' + } + }, 'ZHIPU_API_KEY is required for AI workloads'); + + expect(failure.summary).toBe('Search indexing could not generate embeddings.'); + expect(failure.detail).toContain('ZHIPU_API_KEY is not configured'); + expect(failure.detail).toContain('AAPL · doc-2'); + }); + + it('formats empty AI responses with stage context', () => { + const failure = describeTaskFailure({ + task_type: 'analyze_filing', + stage: 'analyze.generate_report', + stage_context: { + subject: { + ticker: 'AAPL', + accessionNumber: '0000320193-26-000001' + } + }, + payload: { + accessionNumber: '0000320193-26-000001' + } + }, 'AI SDK returned an empty response'); + + expect(failure.summary).toBe('No usable AI response during generate report.'); + expect(failure.detail).toContain('during generate report'); + expect(failure.detail).toContain('Retry the job'); + }); + + it('formats workflow cancellations as descriptive operational failures', () => { + const failure = describeTaskFailure({ + task_type: 'portfolio_insights', + stage: 'insights.generate', + stage_context: null, + payload: {} + }, 'Workflow run cancelled'); + + expect(failure.summary).toBe('The background workflow was cancelled.'); + expect(failure.detail).toContain('before portfolio insight could finish'); + }); +}); diff --git a/lib/server/task-errors.ts b/lib/server/task-errors.ts new file mode 100644 index 0000000..060061f --- /dev/null +++ b/lib/server/task-errors.ts @@ -0,0 +1,154 @@ +import { stageLabel, taskTypeLabel } from '@/lib/task-workflow'; +import type { TaskStage, TaskStageContext, TaskType } from '@/lib/types'; + +type TaskErrorSource = { + task_type: TaskType; + stage?: TaskStage | null; + stage_context?: TaskStageContext | null; + payload?: Record | null; +}; + +export type TaskFailureMessage = { + summary: string; + detail: string; +}; + +function rawMessage(error: unknown) { + if (error instanceof Error && error.message.trim().length > 0) { + return error.message.trim(); + } + + if (typeof error === 'string' && error.trim().length > 0) { + return error.trim(); + } + + return 'Task failed unexpectedly'; +} + +function normalizeSentence(value: string) { + const collapsed = value + .split('\n')[0]! + .replace(/\s+/g, ' ') + .trim(); + + if (!collapsed) { + return 'Task failed unexpectedly.'; + } + + const sentence = collapsed[0]!.toUpperCase() + collapsed.slice(1); + return /[.!?]$/.test(sentence) ? sentence : `${sentence}.`; +} + +function asString(value: unknown) { + return typeof value === 'string' && value.trim().length > 0 ? value.trim() : null; +} + +function subjectLabel(task: TaskErrorSource) { + const subject = task.stage_context?.subject; + const ticker = subject?.ticker ?? asString(task.payload?.ticker); + const accessionNumber = subject?.accessionNumber ?? asString(task.payload?.accessionNumber); + const label = subject?.label ?? asString(task.payload?.label); + const parts = [ticker, accessionNumber, label].filter((part): part is string => Boolean(part)); + + if (parts.length === 0) { + return null; + } + + return parts.join(' · '); +} + +function stagePhrase(stage?: TaskStage | null) { + if (!stage || stage === 'queued' || stage === 'running' || stage === 'completed' || stage === 'failed') { + return null; + } + + return stageLabel(stage).toLowerCase(); +} + +function genericFailure(task: TaskErrorSource, message: string): TaskFailureMessage { + const failedStage = stagePhrase(task.stage); + const subject = subjectLabel(task); + const summary = failedStage + ? `Failed during ${failedStage}.` + : `${taskTypeLabel(task.task_type)} failed.`; + const detail = [ + failedStage + ? `${taskTypeLabel(task.task_type)} failed during ${failedStage}` + : `${taskTypeLabel(task.task_type)} failed`, + subject ? `for ${subject}` : null, + '. ', + normalizeSentence(message) + ].filter(Boolean).join(''); + + return { + summary, + detail + }; +} + +export function describeTaskFailure(task: TaskErrorSource, error: unknown): TaskFailureMessage { + const message = rawMessage(error); + const normalized = message.toLowerCase(); + const failedStage = stagePhrase(task.stage); + const subject = subjectLabel(task); + + if (normalized.includes('zhipu_api_key is required')) { + if (task.task_type === 'index_search') { + return { + summary: 'Search indexing could not generate embeddings.', + detail: `Search indexing could not generate embeddings${subject ? ` for ${subject}` : ''} because ZHIPU_API_KEY is not configured. Add the API key and retry the job.` + }; + } + + return { + summary: 'AI configuration is incomplete.', + detail: `${taskTypeLabel(task.task_type)} could not continue${subject ? ` for ${subject}` : ''} because ZHIPU_API_KEY is not configured. Add the API key and retry the job.` + }; + } + + if (normalized.includes('ai sdk returned an empty response')) { + return { + summary: failedStage + ? `No usable AI response during ${failedStage}.` + : 'The AI provider returned an empty response.', + detail: `The AI provider returned an empty response${failedStage ? ` during ${failedStage}` : ''}${subject ? ` for ${subject}` : ''}. Retry the job. If this keeps happening, verify the model configuration and provider health.` + }; + } + + if (normalized.includes('extraction output invalid json schema')) { + return { + summary: 'The extraction response was not valid JSON.', + detail: `The AI model returned extraction data in an unexpected format${subject ? ` for ${subject}` : ''}, so filing analysis could not continue. Retry the job. If it repeats, inspect the model output or prompt.` + }; + } + + if (normalized.includes('workflow run cancelled')) { + return { + summary: 'The background workflow was cancelled.', + detail: `The background workflow was cancelled before ${taskTypeLabel(task.task_type).toLowerCase()} could finish${subject ? ` for ${subject}` : ''}. Retry the job if you still need the result.` + }; + } + + if (normalized.includes('workflow run failed')) { + return { + summary: 'The background workflow stopped unexpectedly.', + detail: `The background workflow stopped unexpectedly before ${taskTypeLabel(task.task_type).toLowerCase()} could finish${subject ? ` for ${subject}` : ''}. Retry the job. If it fails again, inspect the task details for the last completed step.` + }; + } + + if (normalized.includes('failed to start workflow')) { + return { + summary: 'The background worker could not start this job.', + detail: `The background worker could not start ${taskTypeLabel(task.task_type).toLowerCase()}${subject ? ` for ${subject}` : ''}. The workflow backend may be unavailable. Retry the job once the workflow service is healthy.` + }; + } + + if (normalized.includes('embedding')) { + return { + summary: 'Embedding generation failed.', + detail: `Search indexing could not generate embeddings${subject ? ` for ${subject}` : ''}. ${normalizeSentence(message)}` + }; + } + + return genericFailure(task, message); +} diff --git a/lib/server/task-notifications.test.ts b/lib/server/task-notifications.test.ts index a809023..5adda0f 100644 --- a/lib/server/task-notifications.test.ts +++ b/lib/server/task-notifications.test.ts @@ -99,9 +99,9 @@ describe('task notification builder', () => { const notification = buildTaskNotification(baseTask({ task_type: 'analyze_filing', status: 'failed', - stage: 'failed', - stage_detail: 'Primary filing document fetch failed.', - error: 'Primary filing document fetch failed.', + stage: 'analyze.fetch_document', + stage_detail: 'Could not load the primary filing document.', + error: 'Could not load the primary filing document for AAPL · 0000320193-26-000001. Retry the job after confirming the SEC source is reachable.', stage_context: { subject: { ticker: 'AAPL', @@ -116,7 +116,8 @@ describe('task notification builder', () => { })); expect(notification.tone).toBe('error'); - expect(notification.detailLine).toBe('Primary filing document fetch failed.'); + expect(notification.statusLine).toBe('Failed during fetch primary document'); + expect(notification.detailLine).toBe('Could not load the primary filing document.'); expect(notification.actions.some((action) => action.id === 'open_filings')).toBe(true); }); }); diff --git a/lib/server/task-notifications.ts b/lib/server/task-notifications.ts index 05b3a62..4f86d5a 100644 --- a/lib/server/task-notifications.ts +++ b/lib/server/task-notifications.ts @@ -216,13 +216,15 @@ function buildStatusLine(task: TaskCore, progress: TaskNotificationView['progres case 'completed': return 'Finished successfully'; case 'failed': - return 'Failed'; + return task.stage !== 'failed' + ? `Failed during ${stageLabel(task.stage).toLowerCase()}` + : 'Failed'; } } export function buildTaskNotification(task: TaskCore): TaskNotificationView { const progress = buildProgress(task); - const detailLine = task.error ?? task.stage_detail; + const detailLine = task.stage_detail ?? task.error; return { title: taskTypeLabel(task.task_type), diff --git a/lib/server/tasks.ts b/lib/server/tasks.ts index 2d057be..6841eb2 100644 --- a/lib/server/tasks.ts +++ b/lib/server/tasks.ts @@ -3,6 +3,7 @@ import { getRun, start } from 'workflow/api'; import type { WorkflowRunStatus } from '@workflow/world'; import type { Task, TaskStatus, TaskTimeline, TaskType } from '@/lib/types'; import { runTaskWorkflow } from '@/app/workflows/task-runner'; +import { describeTaskFailure } from '@/lib/server/task-errors'; import { countTasksByStatus, createTaskRunRecord, @@ -65,21 +66,24 @@ async function reconcileTaskWithWorkflow(task: Task) { return task; } - const nextError = nextStatus === 'failed' - ? workflowStatus === 'cancelled' - ? 'Workflow run cancelled' - : 'Workflow run failed' + const failure = nextStatus === 'failed' + ? describeTaskFailure(task, workflowStatus === 'cancelled' ? 'Workflow run cancelled' : 'Workflow run failed') : null; - const updated = await setTaskStatusFromWorkflow(task.id, nextStatus, nextError); + const updated = await setTaskStatusFromWorkflow( + task.id, + nextStatus, + failure?.detail ?? null, + failure?.summary ?? null + ); const fallbackTask = { ...task, status: nextStatus, stage: nextStatus, - stage_detail: nextStatus === 'failed' ? nextError : 'Workflow run completed.', + stage_detail: nextStatus === 'failed' ? (failure?.summary ?? 'The background workflow stopped unexpectedly.') : 'Workflow run completed.', stage_context: null, - error: nextError, + error: failure?.detail ?? null, finished_at: nextStatus === 'queued' || nextStatus === 'running' ? null : task.finished_at ?? new Date().toISOString() @@ -114,11 +118,14 @@ export async function enqueueTask(input: EnqueueTaskInput) { workflow_run_id: run.runId } satisfies Task; } catch (error) { - const reason = error instanceof Error - ? error.message - : 'Failed to start workflow'; - await markTaskFailure(task.id, reason, 'failed'); - throw error; + const failure = describeTaskFailure(task, 'Failed to start workflow'); + await markTaskFailure(task.id, failure.detail, 'failed', { + detail: failure.summary + }); + + const wrapped = new Error(failure.detail); + (wrapped as Error & { cause?: unknown }).cause = error; + throw wrapped; } } diff --git a/lib/task-workflow.ts b/lib/task-workflow.ts index e88d0ce..87c2b35 100644 --- a/lib/task-workflow.ts +++ b/lib/task-workflow.ts @@ -8,7 +8,7 @@ import type { export type StageTimelineItem = { stage: TaskStage; label: string; - state: 'completed' | 'active' | 'pending'; + state: 'completed' | 'active' | 'pending' | 'failed'; detail: string | null; timestamp: string | null; context: Task['stage_context'] | null; @@ -203,7 +203,7 @@ export function buildStageTimeline(task: Task, events: TaskStageEvent[]): StageT return { stage, label: stageLabel(stage), - state: 'completed' as const, + state: task.status === 'failed' && stage === task.stage ? 'failed' as const : 'completed' as const, detail: event?.stage_detail ?? task.stage_detail, timestamp: event?.created_at ?? task.finished_at, context: event?.stage_context ?? (stage === task.stage ? task.stage_context : null) ?? null