65 lines
1.6 KiB
TypeScript
65 lines
1.6 KiB
TypeScript
import { sleep } from 'workflow';
|
|
import { start } from 'workflow/api';
|
|
import { runTaskProcessor } from '@/lib/server/task-processors';
|
|
import {
|
|
claimQueuedTask,
|
|
completeTask,
|
|
markTaskFailure
|
|
} from '@/lib/server/repos/tasks';
|
|
import type { Task } from '@/lib/types';
|
|
|
|
export async function runTaskWorkflow(taskId: string) {
|
|
'use workflow';
|
|
|
|
const task = await claimQueuedTaskStep(taskId);
|
|
if (!task) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const result = await processTaskStep(task);
|
|
await completeTaskStep(task.id, result);
|
|
} catch (error) {
|
|
const reason = error instanceof Error
|
|
? error.message
|
|
: 'Task failed unexpectedly';
|
|
|
|
const nextState = await markTaskFailureStep(task.id, reason);
|
|
|
|
if (nextState.shouldRetry) {
|
|
await sleep('1200ms');
|
|
await restartTaskWorkflowStep(task.id);
|
|
}
|
|
}
|
|
}
|
|
|
|
async function claimQueuedTaskStep(taskId: string) {
|
|
'use step';
|
|
return await claimQueuedTask(taskId);
|
|
}
|
|
|
|
async function processTaskStep(task: Task) {
|
|
'use step';
|
|
return await runTaskProcessor(task);
|
|
}
|
|
|
|
// Step-level retries duplicate task-level retry handling and can create noisy AI failure loops.
|
|
(
|
|
processTaskStep as ((task: Task) => Promise<Record<string, unknown>>) & { maxRetries?: number }
|
|
).maxRetries = 0;
|
|
|
|
async function completeTaskStep(taskId: string, result: Record<string, unknown>) {
|
|
'use step';
|
|
await completeTask(taskId, result);
|
|
}
|
|
|
|
async function markTaskFailureStep(taskId: string, reason: string) {
|
|
'use step';
|
|
return await markTaskFailure(taskId, reason);
|
|
}
|
|
|
|
async function restartTaskWorkflowStep(taskId: string) {
|
|
'use step';
|
|
await start(runTaskWorkflow, [taskId]);
|
|
}
|