import { runTaskProcessor, type TaskExecutionOutcome } from '@/lib/server/task-processors'; import { describeTaskFailure } from '@/lib/server/task-errors'; import { completeTask, getTaskById, markTaskFailure, markTaskRunning } from '@/lib/server/repos/tasks'; import type { Task } from '@/lib/types'; export async function runTaskWorkflow(taskId: string) { 'use workflow'; const task = await loadTaskStep(taskId); if (!task) { return; } await markTaskRunningStep(task.id); try { const refreshedTask = await loadTaskStep(task.id); if (!refreshedTask) { return; } const outcome = await processTaskStep(refreshedTask); await completeTaskStep(task.id, outcome); } catch (error) { const latestTask = await loadTaskStep(task.id); const failure = describeTaskFailure(latestTask ?? task, error); await markTaskFailureStep(task.id, failure, latestTask ?? task); throw error; } } async function loadTaskStep(taskId: string) { 'use step'; return await getTaskById(taskId); } async function markTaskRunningStep(taskId: string) { 'use step'; await markTaskRunning(taskId); } async function processTaskStep(task: Task) { 'use step'; return await runTaskProcessor(task); } // Keep retries at the projection workflow level to avoid duplicate side effects. ( processTaskStep as ((task: Task) => Promise) & { maxRetries?: number } ).maxRetries = 0; async function completeTaskStep(taskId: string, outcome: TaskExecutionOutcome) { 'use step'; await completeTask(taskId, outcome.result, { detail: outcome.completionDetail, context: outcome.completionContext ?? null }); } async function markTaskFailureStep(taskId: string, failure: { summary: string; detail: string }, latestTask: Task) { 'use step'; await markTaskFailure(taskId, failure.detail, latestTask.stage === 'completed' ? 'failed' : latestTask.stage, { detail: failure.summary, context: latestTask.stage_context ?? null }); }