import { runTaskProcessor } from '@/lib/server/task-processors'; import { completeTask, getTaskById, markTaskFailure, markTaskRunning } from '@/lib/server/repos/tasks'; import type { Task } from '@/lib/types'; export async function runTaskWorkflow(taskId: string) { 'use workflow'; const task = await loadTaskStep(taskId); if (!task) { return; } await markTaskRunningStep(task.id); try { const refreshedTask = await loadTaskStep(task.id); if (!refreshedTask) { return; } const result = await processTaskStep(refreshedTask); await completeTaskStep(task.id, result); } catch (error) { const reason = error instanceof Error ? error.message : 'Task failed unexpectedly'; await markTaskFailureStep(task.id, reason); throw error; } } async function loadTaskStep(taskId: string) { 'use step'; return await getTaskById(taskId); } async function markTaskRunningStep(taskId: string) { 'use step'; await markTaskRunning(taskId); } async function processTaskStep(task: Task) { 'use step'; return await runTaskProcessor(task); } // Keep retries at the projection workflow level to avoid duplicate side effects. ( processTaskStep as ((task: Task) => Promise>) & { maxRetries?: number } ).maxRetries = 0; async function completeTaskStep(taskId: string, result: Record) { 'use step'; await completeTask(taskId, result); } async function markTaskFailureStep(taskId: string, reason: string) { 'use step'; await markTaskFailure(taskId, reason); }