67 lines
1.5 KiB
TypeScript
67 lines
1.5 KiB
TypeScript
import { runTaskProcessor } from '@/lib/server/task-processors';
|
|
import {
|
|
completeTask,
|
|
getTaskById,
|
|
markTaskFailure,
|
|
markTaskRunning
|
|
} from '@/lib/server/repos/tasks';
|
|
import type { Task } from '@/lib/types';
|
|
|
|
export async function runTaskWorkflow(taskId: string) {
|
|
'use workflow';
|
|
|
|
const task = await loadTaskStep(taskId);
|
|
if (!task) {
|
|
return;
|
|
}
|
|
|
|
await markTaskRunningStep(task.id);
|
|
|
|
try {
|
|
const refreshedTask = await loadTaskStep(task.id);
|
|
if (!refreshedTask) {
|
|
return;
|
|
}
|
|
|
|
const result = await processTaskStep(refreshedTask);
|
|
await completeTaskStep(task.id, result);
|
|
} catch (error) {
|
|
const reason = error instanceof Error
|
|
? error.message
|
|
: 'Task failed unexpectedly';
|
|
|
|
await markTaskFailureStep(task.id, reason);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async function loadTaskStep(taskId: string) {
|
|
'use step';
|
|
return await getTaskById(taskId);
|
|
}
|
|
|
|
async function markTaskRunningStep(taskId: string) {
|
|
'use step';
|
|
await markTaskRunning(taskId);
|
|
}
|
|
|
|
async function processTaskStep(task: Task) {
|
|
'use step';
|
|
return await runTaskProcessor(task);
|
|
}
|
|
|
|
// Keep retries at the projection workflow level to avoid duplicate side effects.
|
|
(
|
|
processTaskStep as ((task: Task) => Promise<Record<string, unknown>>) & { maxRetries?: number }
|
|
).maxRetries = 0;
|
|
|
|
async function completeTaskStep(taskId: string, result: Record<string, unknown>) {
|
|
'use step';
|
|
await completeTask(taskId, result);
|
|
}
|
|
|
|
async function markTaskFailureStep(taskId: string, reason: string) {
|
|
'use step';
|
|
await markTaskFailure(taskId, reason);
|
|
}
|