Files
Neon-Desk/app/workflows/task-runner.ts

72 lines
2.0 KiB
TypeScript

import { runTaskProcessor, type TaskExecutionOutcome } from '@/lib/server/task-processors';
import { describeTaskFailure } from '@/lib/server/task-errors';
import {
completeTask,
getTaskById,
markTaskFailure,
markTaskRunning
} from '@/lib/server/repos/tasks';
import type { Task } from '@/lib/types';
export async function runTaskWorkflow(taskId: string) {
'use workflow';
const task = await loadTaskStep(taskId);
if (!task) {
return;
}
await markTaskRunningStep(task.id);
try {
const refreshedTask = await loadTaskStep(task.id);
if (!refreshedTask) {
return;
}
const outcome = await processTaskStep(refreshedTask);
await completeTaskStep(task.id, outcome);
} catch (error) {
const latestTask = await loadTaskStep(task.id);
const failure = describeTaskFailure(latestTask ?? task, error);
await markTaskFailureStep(task.id, failure, latestTask ?? task);
throw error;
}
}
async function loadTaskStep(taskId: string) {
'use step';
return await getTaskById(taskId);
}
async function markTaskRunningStep(taskId: string) {
'use step';
await markTaskRunning(taskId);
}
async function processTaskStep(task: Task) {
'use step';
return await runTaskProcessor(task);
}
// Keep retries at the projection workflow level to avoid duplicate side effects.
(
processTaskStep as ((task: Task) => Promise<TaskExecutionOutcome>) & { maxRetries?: number }
).maxRetries = 0;
async function completeTaskStep(taskId: string, outcome: TaskExecutionOutcome) {
'use step';
await completeTask(taskId, outcome.result, {
detail: outcome.completionDetail,
context: outcome.completionContext ?? null
});
}
async function markTaskFailureStep(taskId: string, failure: { summary: string; detail: string }, latestTask: Task) {
'use step';
await markTaskFailure(taskId, failure.detail, latestTask.stage === 'completed' ? 'failed' : latestTask.stage, {
detail: failure.summary,
context: latestTask.stage_context ?? null
});
}